Skip to content

并行代理

ParallelAgent

ParallelAgent 是一种能够并发执行子代理的工作流代理。这可以显著加速那些任务可独立执行的工作流程。

适用场景:当需要优先考虑执行速度,且涉及独立、资源密集型任务时,ParallelAgent 能实现高效的并行执行。当子代理之间不存在依赖关系时,它们的任务可以并发执行,从而大幅减少总体处理时间。

与其他工作流代理相同,ParallelAgent 并非由大模型驱动,因此其执行过程是确定性的。需要注意的是,工作流代理仅关注执行方式(即并行),而不涉及内部逻辑——工作流代理的工具或子代理可能会使用大模型,也可能不会。

示例说明

这种方法特别适用于多源数据检索或重型计算等操作场景,通过并行化能获得显著的性能提升。需注意的是,该策略默认假设并行执行的代理之间不需要共享状态或直接进行信息交换。

工作原理

当调用 ParallelAgentrun_async() 方法时:

  1. 并发执行:立即并发启动 sub_agents 列表中每个子代理的 run() 方法,这意味着所有代理会(近似)同时开始运行
  2. 独立分支:每个子代理在独立的执行分支中运行,执行过程中分支之间不会自动共享对话历史或状态
  3. 结果收集ParallelAgent 会管理并行执行过程,通常提供获取各子代理运行结果的途径(例如通过结果列表或事件)。结果的顺序可能不具备确定性

独立执行与状态管理

必须明确:ParallelAgent 中的子代理是独立运行的。若需实现代理间通信或数据共享,必须显式实现。可选方案包括:

  • 共享 InvocationContext:向每个子代理传递共享的 InvocationContext 对象作为公共数据存储。但需要谨慎管理对此共享上下文的并发访问(例如使用锁机制)以避免竞态条件
  • 外部状态管理:使用外部数据库、消息队列等机制管理共享状态并实现代理间通信
  • 后处理协调:先收集各分支结果,再通过逻辑实现数据协调

Parallel Agent

完整示例:并行网络调研

假设需要同时研究多个主题:

  1. 调研代理1:研究"可再生能源"的 LlmAgent
  2. 调研代理2:研究"电动汽车技术"的 LlmAgent
  3. 调研代理3:研究"碳捕获方法"的 LlmAgent

    ParallelAgent(sub_agents=[ResearcherAgent1, ResearcherAgent2, ResearcherAgent3])
    

这些调研任务相互独立。使用 ParallelAgent 可实现并发执行,相比串行执行能显著减少总调研时间。每个代理的结果将在完成后被单独收集。

Code
from google.adk.agents.parallel_agent import ParallelAgent
from google.adk.agents.llm_agent import LlmAgent
from google.adk.sessions import InMemorySessionService
from google.adk.runners import Runner
from google.adk.tools import google_search
from google.genai import types

APP_NAME = "parallel_research_app"
USER_ID = "research_user_01"
SESSION_ID = "parallel_research_session"
GEMINI_MODEL = "gemini-2.0-flash"

# --- Define Researcher Sub-Agents ---

# Researcher 1: Renewable Energy
researcher_agent_1 = LlmAgent(
    name="RenewableEnergyResearcher",
    model=GEMINI_MODEL,
    instruction="""You are an AI Research Assistant specializing in energy.
    Research the latest advancements in 'renewable energy sources'.
    Use the Google Search tool provided.
    Summarize your key findings concisely (1-2 sentences).
    Output *only* the summary.
    """,
    description="Researches renewable energy sources.",
    tools=[google_search], # Provide the search tool
    # Save the result to session state
    output_key="renewable_energy_result"
)

# Researcher 2: Electric Vehicles
researcher_agent_2 = LlmAgent(
    name="EVResearcher",
    model=GEMINI_MODEL,
    instruction="""You are an AI Research Assistant specializing in transportation.
    Research the latest developments in 'electric vehicle technology'.
    Use the Google Search tool provided.
    Summarize your key findings concisely (1-2 sentences).
    Output *only* the summary.
    """,
    description="Researches electric vehicle technology.",
    tools=[google_search], # Provide the search tool
    # Save the result to session state
    output_key="ev_technology_result"
)

# Researcher 3: Carbon Capture
researcher_agent_3 = LlmAgent(
    name="CarbonCaptureResearcher",
    model=GEMINI_MODEL,
    instruction="""You are an AI Research Assistant specializing in climate solutions.
    Research the current state of 'carbon capture methods'.
    Use the Google Search tool provided.
    Summarize your key findings concisely (1-2 sentences).
    Output *only* the summary.
    """,
    description="Researches carbon capture methods.",
    tools=[google_search], # Provide the search tool
    # Save the result to session state
    output_key="carbon_capture_result"
)

# --- Create the ParallelAgent ---
# This agent orchestrates the concurrent execution of the researchers.
parallel_research_agent = ParallelAgent(
    name="ParallelWebResearchAgent",
    sub_agents=[researcher_agent_1, researcher_agent_2, researcher_agent_3]
)

# Session and Runner
session_service = InMemorySessionService()
session = session_service.create_session(app_name=APP_NAME, user_id=USER_ID, session_id=SESSION_ID)
runner = Runner(agent=parallel_research_agent, app_name=APP_NAME, session_service=session_service)


# Agent Interaction
def call_agent(query):
    '''
    Helper function to call the agent with a query.
    '''
    content = types.Content(role='user', parts=[types.Part(text=query)])
    events = runner.run(user_id=USER_ID, session_id=SESSION_ID, new_message=content)

    for event in events:
        if event.is_final_response():
            final_response = event.content.parts[0].text
            print("Agent Response: ", final_response)

call_agent("research latest trends")