Appearance
LangGraph AI Agent 核心模块
本文介绍使用 LangGraph 开发 AI Agent 应用时必知必会的核心模块和概念。
架构全景
┌─────────────────────────────────────────────────────────────────────────┐
│ LangGraph AI Agent 核心架构 │
├─────────────────────────────────────────────────────────────────────────┤
│ 基础构建层 │ 执行控制层 │ 高级特性层 │
├─────────────────────────────────────────────────────────────────────────┤
│ • State (状态) │ • Edges (边) │ • Memory (短期/长期) │
│ • Nodes (节点) │ • Conditional Edges │ • Interrupts (中断/人机协作) │
│ • Graph (图) │ • Command (命令) │ • Subgraphs (子图) │
│ • Tools (工具) │ • Recursion (递归) │ • Parallel (并行执行) │
│ │ │ • Streaming (流式输出) │
└─────────────────────────────────────────────────────────────────────────┘一、基础构建层
1.1 State(状态)
定义图执行过程中的数据结构,是节点间传递的核心载体。
python
from typing import TypedDict, Annotated
from operator import add
class AgentState(TypedDict):
messages: Annotated[list, add] # 使用 reducer 合并消息
context: str核心概念:
- TypedDict: 类型化的状态定义
- Annotated: 带注解的字段,可附加 reducer 函数
- Reducer: 用于合并状态更新的函数(如
operator.add)
1.2 Nodes(节点)
执行具体业务逻辑的函数,每个节点接收状态并返回状态更新。
python
def call_model(state: AgentState) -> AgentState:
"""调用 LLM 生成响应"""
response = model.invoke(state["messages"])
return {"messages": [response]}
def tool_node(state: AgentState) -> AgentState:
"""执行工具调用"""
tool_calls = state["messages"][-1].tool_calls
results = [execute_tool(call) for call in tool_calls]
return {"messages": results}节点类型:
- 普通节点: 执行业务逻辑
- 工具节点: 执行工具调用
- 条件节点: 根据状态决定下一步
1.3 Graph(图)
组织节点和边的工作流容器。
python
from langgraph.graph import StateGraph, START, END
builder = StateGraph(AgentState)
builder.add_node("agent", agent_node)
builder.add_node("tools", tool_node)
builder.add_edge(START, "agent")
builder.add_edge("agent", "tools")
builder.add_edge("tools", END)
graph = builder.compile()图类型:
- StateGraph: 最常用的图,支持状态管理
- MessageGraph: 简化的消息处理图
- 编译后的图: 调用
compile()生成可执行图
1.4 Tools(工具)
扩展 LLM 能力的外部函数,可被模型调用。
python
from langchain.tools import tool
@tool
def search_web(query: str) -> str:
"""搜索网络获取信息"""
# 实现搜索逻辑
return search_results
@tool
def get_weather(city: str) -> dict:
"""获取指定城市的天气"""
# 实现天气查询逻辑
return {"temperature": 25, "condition": "sunny"}工具特性:
- 使用
@tool装饰器定义 - 自动生成 JSON Schema
- 支持参数验证
- 可绑定到 LLM
二、执行控制层
2.1 Edges(边)
定义节点之间的连接关系,控制执行流程。
python
# 直接边
builder.add_edge("node_a", "node_b")
# 起始边
builder.add_edge(START, "first_node")
# 结束边
builder.add_edge("last_node", END)边类型:
- 普通边: 固定连接
node_a -> node_b - 起始边:
START -> node - 结束边:
node -> END
2.2 Conditional Edges(条件边)
根据状态动态决定下一步执行哪个节点。
python
def should_continue(state: AgentState) -> Literal["tools", END]:
"""判断是否需要调用工具"""
if state["messages"][-1].tool_calls:
return "tools"
return END
builder.add_conditional_edges(
"agent",
should_continue,
{"tools": "tools", END: END}
)条件边特性:
- 误路由函数返回下一个节点名称
- 支持多个目标节点
- 可返回
END结束执行
2.3 Command(命令)
精确控制图的执行流程,包括跳转和状态更新。
python
from langgraph.types import Command
def routing_node(state: State) -> Command[Literal["node_a", "node_b"]]:
if state["should_retry"]:
return Command(
goto="node_a",
update={"retry_count": state["retry_count"] + 1}
)
return Command(goto="node_b")Command 能力:
goto: 指定下一个执行的节点update: 更新状态resume: 恢复中断的执行(用于人机协作)
2.4 Recursion(递归)
实现循环或迭代逻辑,让图可以自我调用。
python
def recursive_node(state: State) -> Command[Literal["recursive_node", END]]:
if state["count"] < state["max_count"]:
return Command(
goto="recursive_node",
update={"count": state["count"] + 1}
)
return Command(goto=END)递归特性:
- 支持自我循环
- 可设置最大迭代次数
- 结合 Command 使用
- 适合需要重试或迭代的场景
三、高级特性层
3.1 Memory(记忆)
核心组件:
- Checkpointer: 皂停/恢复、时间旅行
- Store: 跨会话长期记忆
- Trimming: 消息裁剪
- Summarization: 对话摘要
3.2 Interrupts(中断/人机协作)
暂停执行等待人工输入或审批。
python
from langgraph.types import interrupt
from langgraph.checkpoint.memory import MemorySaver
def approval_node(state: State):
# 暂停并等待人工审批
decision = interrupt({
"question": "是否批准此操作? Please approve or reject.",
"details": state["action_details"]
})
return {"approved": decision}
builder = StateGraph(ApprovalState)
builder.add_node("approval", approval_node)
builder.add_edge(START, "approval")
builder.add_edge("approval", END)
checkpointer = MemorySaver()
graph = builder.compile(checkpointer=checkpointer)中断特性:
使用
interrupt()函数暂停执行中断信息会出现在
__interrupts__字段中使用
Command(resume=value)恢复执行需要 Checkpointer 支持
适合需要人工审批的场景(如发送邮件、执行危险操作等)
完整示例:
python
# 启动执行(会中断)
result = graph.invoke({"action_details": "发送邮件给 user@example.com"}, config)
print(result["__interrupts__"]) # 查看中断信息
# 恢复执行(批准)
result = graph.invoke(Command(resume=True), config) # 批准
# 恢复执行(拒绝)
result = graph.invoke(Command(resume=False), config) # 拒绝3.3 Subgraphs(子图)
将复杂逻辑拆分为可复用的子模块,实现图的嵌套。
python
# 子图定义
class ChildState(TypedDict):
child_value: str
def child_node(state: ChildState) -> ChildState:
return {"child_value": f"processed: {state['child_value']}"}
child_builder = StateGraph(ChildState)
child_builder.add_node("child_node", child_node)
child_builder.add_edge(START, "child_node")
child_builder.add_edge("child_node", END)
child_graph = child_builder.compile()
# 父图中调用子图
class ParentState(TypedDict):
parent_value: str
child_result: str
def call_child(state: ParentState) -> ParentState:
result = child_graph.invoke({"child_value": state["parent_value"]})
return {"child_result": result["child_value"]}子图特性:
- 支持多层嵌套
- 子图有独立的状态空间
- 父子图通过
invoke通信 - 可传递和转换状态
- 实现模块化和复用
3.4 Parallel(并行执行)
同时执行多个独立任务并聚合结果。
python
from operator import add
from typing import Annotated
class ParallelState(TypedDict):
topic: str
results: Annotated[list, add] # 使用 reducer 合并结果
def task_a(state: ParallelState) -> ParallelState:
result = model.invoke(f"写关于 {state['topic']} 的笑话")
return {"results": [{"type": "joke", "content": result.content}]}
def task_b(state: ParallelState) -> ParallelState:
result = model.invoke(f"写关于 {state['topic']} 的诗歌")
return {"results": [{"type": "poem", "content": result.content}]}
builder = StateGraph(ParallelState)
builder.add_node("task_a", task_a)
builder.add_node("task_b", task_b)
builder.add_node("aggregator", aggregator_node)
# 并行边: START 同时指向两个节点
builder.add_edge(START, "task_a")
builder.add_edge(START, "task_b")
# 汇聚边: 两个节点都指向聚合节点
builder.add_edge("task_a", "aggregator")
builder.add_edge("task_b", "aggregator")
builder.add_edge("aggregator", END)并行特性:
- 同一步骤中的节点并行执行
- 使用
Annotated+ reducer 合并结果 - 适合独立可并行的任务
- 显著提升性能
注意事项:
- 并行节点的状态更新顺序不确定
- 黙需要使用 reducer 确保正确合并
- 聚合节点需要等待所有并行节点完成
3.5 Streaming(流式输出)
实时输出执行过程,支持多种输出模式。
python
# 流式输出示例
for chunk in graph.stream(
{"messages": [{"role": "user", "content": "你好"}]},
config,
stream_mode="values" # 输出模式
):
print(chunk)流式输出模式:
| 模式 | 说明 | 用途 |
|---|---|---|
"values" | 每个步骤后的完整状态 | 查看完整状态变化 |
"updates" | 状态增量更新 | 细粒度状态追踪 |
"messages" | LLM 输出的消息流 | 实时显示 LLM 响应 |
"debug" | 调试信息 | 开发调试 |
"custom" | 自定义输出 | 特殊需求 |
Token 级流式输出:
python
# 按 Token 流式输出
for chunk in graph.stream(
input,
config,
stream_mode="messages",
version="v2", # 使用 v2 版本
):
if "messages" in chunk:
for token in chunk["messages"]:
print(token.content, end="", flush=True)四、模块选择指南
4.1 猏的 Agent
使用 LangGraph 快速构建标准 Agent。
python
from langgraph.prebuilt import create_react_agent
agent = create_react_agent(
model=model,
tools=[search_web, get_weather],
checkpointer=checkpointer
)适用场景:
- 快速原型开发
- 标准 ReAct 模式
- 简单的 Agent 需求
4.2 自定义 Agent
完全控制 Agent 的每个环节。
适用场景:
- 复杂业务逻辑
- 需要精确控制流程
- 特殊的状态管理需求
4.3 騡块组合建议
| 场景 | 推荐模块组合 |
|---|---|
| 癮通对话 Agent | State + Nodes + Graph + Memory |
| 工具调用 Agent | + Tools + Conditional Edges + Memory |
| 需人机协作 | + Interrupts + Checkpointer |
| 复杂工作流 | + Subgraphs + Parallel |
| 长时间运行 | + Store + Streaming |
| 生产部署 | Memory + Interrupts + Streaming |