回调类型
该框架提供了多种回调类型,它们会在智能体执行的不同阶段触发。理解每种回调的触发时机及其接收的上下文信息,是高效使用它们的关键。
智能体生命周期回调
这些回调适用于任何继承自 BaseAgent
的智能体(包括 LlmAgent
、SequentialAgent
、ParallelAgent
、LoopAgent
等)。
智能体执行前回调
触发时机: 在智能体的 _run_async_impl
(或 _run_live_impl
)方法执行之前立即调用。该回调在智能体的 InvocationContext
创建完成之后运行,但早于其核心逻辑开始执行。
用途: 适用于以下场景: - 设置仅本次智能体运行所需的资源或状态 - 在执行开始前对会话状态(callback_context.state)进行验证检查 - 记录智能体活动的入口点 - 在核心逻辑使用前修改调用上下文
代码示例
from google.adk.agents import LlmAgent
from google.adk.agents.callback_context import CallbackContext
from google.adk.runners import Runner
from typing import Optional
from google.genai import types
from google.adk.sessions import InMemorySessionService
GEMINI_2_FLASH="gemini-2.0-flash"
# --- Define the Callback Function ---
def simple_before_agent_logger(callback_context: CallbackContext) -> Optional[types.Content]:
"""Logs entry into an agent and checks a condition."""
agent_name = callback_context.agent_name
invocation_id = callback_context.invocation_id
print(f"[Callback] Entering agent: {agent_name} (Invocation: {invocation_id})")
# Example: Check a condition in state
if callback_context.state.get("skip_agent", False):
print(f"[Callback] Condition met: Skipping agent {agent_name}.")
# Return Content to skip the agent's run
return types.Content(parts=[types.Part(text=f"Agent {agent_name} was skipped by callback.")])
else:
print(f"[Callback] Condition not met: Proceeding with agent {agent_name}.")
# Return None to allow the agent's run to execute
return None
# Create LlmAgent and Assign Callback
my_llm_agent = LlmAgent(
name="SimpleLlmAgent",
model=GEMINI_2_FLASH,
instruction="You are a simple agent. Just say 'Hello!'",
description="An LLM agent demonstrating before_agent_callback",
before_agent_callback=simple_before_agent_logger
)
APP_NAME = "guardrail_app"
USER_ID = "user_1"
SESSION_ID = "session_001"
# Session and Runner
session_service = InMemorySessionService()
session = session_service.create_session(app_name=APP_NAME, user_id=USER_ID, session_id=SESSION_ID)
runner = Runner(agent=my_llm_agent, app_name=APP_NAME, session_service=session_service)
# Agent Interaction
def call_agent(query):
content = types.Content(role='user', parts=[types.Part(text=query)])
events = runner.run(user_id=USER_ID, session_id=SESSION_ID, new_message=content)
for event in events:
if event.is_final_response():
final_response = event.content.parts[0].text
print("Agent Response: ", final_response)
call_agent("callback example")
智能体执行后回调
触发时机: 在智能体的 _run_async_impl
(或 _run_live_impl
)方法成功完成后立即调用。如果智能体因 before_agent_callback
返回内容而被跳过,或是在运行期间设置了 end_invocation
,则不会触发此回调。
用途: 适用于以下场景: - 执行清理任务 - 进行事后验证 - 记录智能体活动的完成状态 - 修改最终状态 - 增强/替换智能体的最终输出
代码示例
from google.adk.agents import LlmAgent
from google.adk.agents.callback_context import CallbackContext
from google.adk.runners import Runner
from typing import Optional
from google.genai import types
from google.adk.sessions import InMemorySessionService
GEMINI_2_FLASH="gemini-2.0-flash"
# --- Define the Callback Function ---
def simple_after_agent_logger(callback_context: CallbackContext) -> Optional[types.Content]:
"""Logs exit from an agent and optionally appends a message."""
agent_name = callback_context.agent_name
invocation_id = callback_context.invocation_id
print(f"[Callback] Exiting agent: {agent_name} (Invocation: {invocation_id})")
# Example: Check state potentially modified during the agent's run
final_status = callback_context.state.get("agent_run_status", "Completed Normally")
print(f"[Callback] Agent run status from state: {final_status}")
# Example: Optionally return Content to append a message
if callback_context.state.get("add_concluding_note", False):
print(f"[Callback] Adding concluding note for agent {agent_name}.")
# Return Content to append after the agent's own output
return types.Content(parts=[types.Part(text=f"Concluding note added by after_agent_callback.")])
else:
print(f"[Callback] No concluding note added for agent {agent_name}.")
# Return None - no additional message appended
return None
my_llm_agent = LlmAgent(
name="SimpleLlmAgentWithAfter",
model=GEMINI_2_FLASH,
instruction="You are a simple agent. Just say 'Processing complete!'",
description="An LLM agent demonstrating after_agent_callback",
after_agent_callback=simple_after_agent_logger # Assign the function here
)
APP_NAME = "guardrail_app"
USER_ID = "user_1"
SESSION_ID = "session_001"
# Session and Runner
session_service = InMemorySessionService()
session = session_service.create_session(app_name=APP_NAME, user_id=USER_ID, session_id=SESSION_ID)
runner = Runner(agent=my_llm_agent, app_name=APP_NAME, session_service=session_service)
# Agent Interaction
def call_agent(query):
content = types.Content(role='user', parts=[types.Part(text=query)])
events = runner.run(user_id=USER_ID, session_id=SESSION_ID, new_message=content)
for event in events:
if event.is_final_response():
final_response = event.content.parts[0].text
print("Agent Response: ", final_response)
call_agent("callback example")
大模型交互回调
这些回调专属于 LlmAgent
,提供了与大模型交互过程中的钩子函数。
模型调用前回调
触发时机: 在 generate_content_async
(或等效请求)被发送到大模型之前,LlmAgent
流程中触发。
用途: 允许检查和修改发送给大模型的请求。典型应用场景包括: - 添加动态指令 - 根据状态注入少量示例 - 修改模型配置 - 实现防护机制(如敏感词过滤) - 实现请求级缓存
返回值影响:
若回调返回 None
,则大模型继续正常流程。若返回 LlmResponse
对象,则跳过对大模型的调用,直接使用返回的 LlmResponse
作为模型响应。此机制可用于实现防护或缓存功能。
代码示例
from google.adk.agents import LlmAgent
from google.adk.agents.callback_context import CallbackContext
from google.adk.models import LlmResponse, LlmRequest
from google.adk.runners import Runner
from typing import Optional
from google.genai import types
from google.adk.sessions import InMemorySessionService
GEMINI_2_FLASH="gemini-2.0-flash"
# --- Define the Callback Function ---
def simple_before_model_modifier(
callback_context: CallbackContext, llm_request: LlmRequest
) -> Optional[LlmResponse]:
"""Inspects/modifies the LLM request or skips the call."""
agent_name = callback_context.agent_name
print(f"[Callback] Before model call for agent: {agent_name}")
# Inspect the last user message in the request contents
last_user_message = ""
if llm_request.contents and llm_request.contents[-1].role == 'user':
if llm_request.contents[-1].parts:
last_user_message = llm_request.contents[-1].parts[0].text
print(f"[Callback] Inspecting last user message: '{last_user_message}'")
# --- Modification Example ---
# Add a prefix to the system instruction
original_instruction = llm_request.config.system_instruction or types.Content(role="system", parts=[])
prefix = "[Modified by Callback] "
# Ensure system_instruction is Content and parts list exists
if not isinstance(original_instruction, types.Content):
# Handle case where it might be a string (though config expects Content)
original_instruction = types.Content(role="system", parts=[types.Part(text=str(original_instruction))])
if not original_instruction.parts:
original_instruction.parts.append(types.Part(text="")) # Add an empty part if none exist
# Modify the text of the first part
modified_text = prefix + (original_instruction.parts[0].text or "")
original_instruction.parts[0].text = modified_text
llm_request.config.system_instruction = original_instruction
print(f"[Callback] Modified system instruction to: '{modified_text}'")
# --- Skip Example ---
# Check if the last user message contains "BLOCK"
if "BLOCK" in last_user_message.upper():
print("[Callback] 'BLOCK' keyword found. Skipping LLM call.")
# Return an LlmResponse to skip the actual LLM call
return LlmResponse(
content=types.Content(
role="model",
parts=[types.Part(text="LLM call was blocked by before_model_callback.")],
)
)
else:
print("[Callback] Proceeding with LLM call.")
# Return None to allow the (modified) request to go to the LLM
return None
# Create LlmAgent and Assign Callback
my_llm_agent = LlmAgent(
name="ModelCallbackAgent",
model=GEMINI_2_FLASH,
instruction="You are a helpful assistant.", # Base instruction
description="An LLM agent demonstrating before_model_callback",
before_model_callback=simple_before_model_modifier # Assign the function here
)
APP_NAME = "guardrail_app"
USER_ID = "user_1"
SESSION_ID = "session_001"
# Session and Runner
session_service = InMemorySessionService()
session = session_service.create_session(app_name=APP_NAME, user_id=USER_ID, session_id=SESSION_ID)
runner = Runner(agent=my_llm_agent, app_name=APP_NAME, session_service=session_service)
# Agent Interaction
def call_agent(query):
content = types.Content(role='user', parts=[types.Part(text=query)])
events = runner.run(user_id=USER_ID, session_id=SESSION_ID, new_message=content)
for event in events:
if event.is_final_response():
final_response = event.content.parts[0].text
print("Agent Response: ", final_response)
call_agent("callback example")
模型调用后回调
触发时机: 在接收到大模型响应(LlmResponse
)之后、调用智能体进一步处理之前触发。
用途: 允许检查或修改原始的大模型响应。典型应用场景包括:
- 记录模型输出
- 重新格式化响应
- 过滤模型生成的敏感信息
- 从响应中解析结构化数据并存储到 callback_context.state
- 处理特定的错误码
代码示例
from google.adk.agents import LlmAgent
from google.adk.agents.callback_context import CallbackContext
from google.adk.runners import Runner
from typing import Optional
from google.genai import types
from google.adk.sessions import InMemorySessionService
from google.adk.models import LlmResponse
GEMINI_2_FLASH="gemini-2.0-flash"
# --- Define the Callback Function ---
def simple_after_model_modifier(
callback_context: CallbackContext, llm_response: LlmResponse
) -> Optional[LlmResponse]:
"""Inspects/modifies the LLM response after it's received."""
agent_name = callback_context.agent_name
print(f"[Callback] After model call for agent: {agent_name}")
# --- Inspection ---
original_text = ""
if llm_response.content and llm_response.content.parts:
# Assuming simple text response for this example
if llm_response.content.parts[0].text:
original_text = llm_response.content.parts[0].text
print(f"[Callback] Inspected original response text: '{original_text[:100]}...'") # Log snippet
elif llm_response.content.parts[0].function_call:
print(f"[Callback] Inspected response: Contains function call '{llm_response.content.parts[0].function_call.name}'. No text modification.")
return None # Don't modify tool calls in this example
else:
print("[Callback] Inspected response: No text content found.")
return None
elif llm_response.error_message:
print(f"[Callback] Inspected response: Contains error '{llm_response.error_message}'. No modification.")
return None
else:
print("[Callback] Inspected response: Empty LlmResponse.")
return None # Nothing to modify
# --- Modification Example ---
# Replace "joke" with "funny story" (case-insensitive)
search_term = "joke"
replace_term = "funny story"
if search_term in original_text.lower():
print(f"[Callback] Found '{search_term}'. Modifying response.")
modified_text = original_text.replace(search_term, replace_term)
modified_text = modified_text.replace(search_term.capitalize(), replace_term.capitalize()) # Handle capitalization
# Create a NEW LlmResponse with the modified content
# Deep copy parts to avoid modifying original if other callbacks exist
modified_parts = [copy.deepcopy(part) for part in llm_response.content.parts]
modified_parts[0].text = modified_text # Update the text in the copied part
new_response = LlmResponse(
content=types.Content(role="model", parts=modified_parts),
# Copy other relevant fields if necessary, e.g., grounding_metadata
grounding_metadata=llm_response.grounding_metadata
)
print(f"[Callback] Returning modified response.")
return new_response # Return the modified response
else:
print(f"[Callback] '{search_term}' not found. Passing original response through.")
# Return None to use the original llm_response
return None
# Create LlmAgent and Assign Callback
my_llm_agent = LlmAgent(
name="AfterModelCallbackAgent",
model=GEMINI_2_FLASH,
instruction="You are a helpful assistant.",
description="An LLM agent demonstrating after_model_callback",
after_model_callback=simple_after_model_modifier # Assign the function here
)
APP_NAME = "guardrail_app"
USER_ID = "user_1"
SESSION_ID = "session_001"
# Session and Runner
session_service = InMemorySessionService()
session = session_service.create_session(app_name=APP_NAME, user_id=USER_ID, session_id=SESSION_ID)
runner = Runner(agent=my_llm_agent, app_name=APP_NAME, session_service=session_service)
# Agent Interaction
def call_agent(query):
content = types.Content(role='user', parts=[types.Part(text=query)])
events = runner.run(user_id=USER_ID, session_id=SESSION_ID, new_message=content)
for event in events:
if event.is_final_response():
final_response = event.content.parts[0].text
print("Agent Response: ", final_response)
call_agent("callback example")
工具执行回调
这些回调同样专属于 LlmAgent
,围绕工具执行过程触发(包括 FunctionTool
、AgentTool
等大模型可能请求的工具)。
工具执行前回调
触发时机: 在特定工具的 run_async
方法被调用前触发,此时大模型已生成对应的函数调用。
用途: 允许以下操作: - 检查并修改工具参数 - 执行前进行授权检查 - 记录工具使用尝试 - 实现工具级缓存
返回值影响:
1. 若返回 None
,则使用(可能被修改过的)args
执行工具的 run_async
方法
2. 若返回字典,则跳过工具的 run_async
方法,直接使用返回字典作为工具调用结果。适用于缓存或覆盖工具行为的场景
代码示例
from google.adk.agents import LlmAgent
from google.adk.runners import Runner
from typing import Optional
from google.genai import types
from google.adk.sessions import InMemorySessionService
from google.adk.tools import FunctionTool
from google.adk.tools.tool_context import ToolContext
from google.adk.tools.base_tool import BaseTool
from typing import Dict, Any
GEMINI_2_FLASH="gemini-2.0-flash"
def get_capital_city(country: str) -> str:
"""Retrieves the capital city of a given country."""
print(f"--- Tool 'get_capital_city' executing with country: {country} ---")
country_capitals = {
"united states": "Washington, D.C.",
"canada": "Ottawa",
"france": "Paris",
"germany": "Berlin",
}
return country_capitals.get(country.lower(), f"Capital not found for {country}")
capital_tool = FunctionTool(func=get_capital_city)
def simple_before_tool_modifier(
tool: BaseTool, args: Dict[str, Any], tool_context: ToolContext
) -> Optional[Dict]:
"""Inspects/modifies tool args or skips the tool call."""
agent_name = tool_context.agent_name
tool_name = tool.name
print(f"[Callback] Before tool call for tool '{tool_name}' in agent '{agent_name}'")
print(f"[Callback] Original args: {args}")
if tool_name == 'get_capital_city' and args.get('country', '').lower() == 'canada':
print("[Callback] Detected 'Canada'. Modifying args to 'France'.")
args['country'] = 'France'
print(f"[Callback] Modified args: {args}")
return None
# If the tool is 'get_capital_city' and country is 'BLOCK'
if tool_name == 'get_capital_city' and args.get('country', '').upper() == 'BLOCK':
print("[Callback] Detected 'BLOCK'. Skipping tool execution.")
return {"result": "Tool execution was blocked by before_tool_callback."}
print("[Callback] Proceeding with original or previously modified args.")
return None
my_llm_agent = LlmAgent(
name="ToolCallbackAgent",
model=GEMINI_2_FLASH,
instruction="You are an agent that can find capital cities. Use the get_capital_city tool.",
description="An LLM agent demonstrating before_tool_callback",
tools=[capital_tool],
before_tool_callback=simple_before_tool_modifier
)
APP_NAME = "guardrail_app"
USER_ID = "user_1"
SESSION_ID = "session_001"
# Session and Runner
session_service = InMemorySessionService()
session = session_service.create_session(app_name=APP_NAME, user_id=USER_ID, session_id=SESSION_ID)
runner = Runner(agent=my_llm_agent, app_name=APP_NAME, session_service=session_service)
# Agent Interaction
def call_agent(query):
content = types.Content(role='user', parts=[types.Part(text=query)])
events = runner.run(user_id=USER_ID, session_id=SESSION_ID, new_message=content)
for event in events:
if event.is_final_response():
final_response = event.content.parts[0].text
print("Agent Response: ", final_response)
call_agent("callback example")
工具执行后回调
触发时机: 在工具的 run_async
方法成功完成后立即触发。
用途: 允许在工具结果返回给大模型前(可能在摘要处理后)进行检查和修改。适用于: - 记录工具结果 - 对结果进行后处理或格式化 - 将特定结果保存到会话状态
返回值影响:
1. 若返回 None
,则使用原始 tool_response
2. 若返回新字典,则替换原始 tool_response
,从而修改大模型接收到的结果
代码示例
from google.adk.agents import LlmAgent
from google.adk.runners import Runner
from typing import Optional
from google.genai import types
from google.adk.sessions import InMemorySessionService
from google.adk.tools import FunctionTool
from google.adk.tools.tool_context import ToolContext
from google.adk.tools.base_tool import BaseTool
from typing import Dict, Any
from copy import copy
GEMINI_2_FLASH="gemini-2.0-flash"
# --- Define a Simple Tool Function (Same as before) ---
def get_capital_city(country: str) -> str:
"""Retrieves the capital city of a given country."""
print(f"--- Tool 'get_capital_city' executing with country: {country} ---")
country_capitals = {
"united states": "Washington, D.C.",
"canada": "Ottawa",
"france": "Paris",
"germany": "Berlin",
}
return {"result": country_capitals.get(country.lower(), f"Capital not found for {country}")}
# --- Wrap the function into a Tool ---
capital_tool = FunctionTool(func=get_capital_city)
# --- Define the Callback Function ---
def simple_after_tool_modifier(
tool: BaseTool, args: Dict[str, Any], tool_context: ToolContext, tool_response: Dict
) -> Optional[Dict]:
"""Inspects/modifies the tool result after execution."""
agent_name = tool_context.agent_name
tool_name = tool.name
print(f"[Callback] After tool call for tool '{tool_name}' in agent '{agent_name}'")
print(f"[Callback] Args used: {args}")
print(f"[Callback] Original tool_response: {tool_response}")
# Default structure for function tool results is {"result": <return_value>}
original_result_value = tool_response.get("result", "")
# original_result_value = tool_response
# --- Modification Example ---
# If the tool was 'get_capital_city' and result is 'Washington, D.C.'
if tool_name == 'get_capital_city' and original_result_value == "Washington, D.C.":
print("[Callback] Detected 'Washington, D.C.'. Modifying tool response.")
# IMPORTANT: Create a new dictionary or modify a copy
modified_response = copy.deepcopy(tool_response)
modified_response["result"] = f"{original_result_value} (Note: This is the capital of the USA)."
modified_response["note_added_by_callback"] = True # Add extra info if needed
print(f"[Callback] Modified tool_response: {modified_response}")
return modified_response # Return the modified dictionary
print("[Callback] Passing original tool response through.")
# Return None to use the original tool_response
return None
# Create LlmAgent and Assign Callback
my_llm_agent = LlmAgent(
name="AfterToolCallbackAgent",
model=GEMINI_2_FLASH,
instruction="You are an agent that finds capital cities using the get_capital_city tool. Report the result clearly.",
description="An LLM agent demonstrating after_tool_callback",
tools=[capital_tool], # Add the tool
after_tool_callback=simple_after_tool_modifier # Assign the callback
)
APP_NAME = "guardrail_app"
USER_ID = "user_1"
SESSION_ID = "session_001"
# Session and Runner
session_service = InMemorySessionService()
session = session_service.create_session(app_name=APP_NAME, user_id=USER_ID, session_id=SESSION_ID)
runner = Runner(agent=my_llm_agent, app_name=APP_NAME, session_service=session_service)
# Agent Interaction
def call_agent(query):
content = types.Content(role='user', parts=[types.Part(text=query)])
events = runner.run(user_id=USER_ID, session_id=SESSION_ID, new_message=content)
for event in events:
if event.is_final_response():
final_response = event.content.parts[0].text
print("Agent Response: ", final_response)
call_agent("callback example")