Appearance
LangGraph 模块使用分析
本文分析当前代码库中 LangGraph 模块的使用情况,并提供后续集成建议。
一、已使用的模块
| 模块 | 使用位置 | 使用方式 |
|---|---|---|
| State | services/langgraph_chat.py | 使用 MessagesState 作为状态类型 |
| Nodes | services/langgraph_chat.py | 定义 call_model 节点处理 LLM 调用 |
| Graph | services/langgraph_chat.py | 使用 StateGraph 构建工作流 |
| Edges | services/langgraph_chat.py | 使用 START 和 add_edge 连接节点 |
| Checkpointer | services/checkpointer.py | 使用 PySQLSaver (MySQL) 持久化 |
| Streaming | services/langgraph_chat.py | 使用 stream_mode="messages" 流式输出 |
代码示例
python
# services/langgraph_chat.py
from langgraph.graph import START, StateGraph, MessagesState
def call_model(state: MessagesState):
"""调用 LLM 生成响应"""
chain = prompt_template | llm
response = chain.invoke(state["messages"])
return {"messages": [response]}
# 构建工作流
workflow = StateGraph(MessagesState)
workflow.add_node("agent", call_model)
workflow.add_edge(START, "agent")
app = workflow.compile(checkpointer=checkpointer)
# 流式输出
for chunk in app.stream(..., stream_mode="messages"):
yield message_chunk.content当前架构图
┌─────────────────────────────────────────┐
│ 当前 LangGraph 架构 │
├─────────────────────────────────────────┤
│ │
│ START ──► [agent] ──► END │
│ │ │
│ ▼ │
│ call_model() │
│ (LLM 调用) │
│ │ │
│ ▼ │
│ Checkpointer │
│ (MySQL 持久化) │
│ │
└─────────────────────────────────────────┘特点:
- 单节点线性流程
- 仅支持纯对话(无工具调用)
- 状态通过 MySQL Checkpointer 持久化
- 支持 token 级流式输出
二、未使用的模块
2.1 基础构建层
| 模块 | 说明 | 复杂度 |
|---|---|---|
| Tools | 未使用工具调用功能,当前是纯对话 Agent | 低 |
2.2 执行控制层
| 模块 | 说明 | 复杂度 |
|---|---|---|
| Conditional Edges | 未使用条件边,当前是单节点线性流程 | 低 |
| Command | 未使用命令模式,无法精确控制跳转和状态更新 | 中 |
| Recursion | 未使用递归/循环逻辑 | 中 |
2.3 高级特性层
| 模块 | 说明 | 复杂度 |
|---|---|---|
| Store | 未使用跨会话长期记忆存储 | 中 |
| Interrupts | 未使用中断/人机协作功能 | 中 |
| Subgraphs | 未使用子图模块化 | 高 |
| Parallel | 未使用并行执行 | 中 |
| Trimming | 未使用消息裁剪(长对话可能超出上下文) | 低 |
| Summarization | 未使用对话摘要 | 中 |
三、适合集成的业务场景
3.1 Tools(工具调用)⭐⭐⭐⭐⭐
优先级:高
为什么需要: 当前 Agent 只能进行纯对话,无法执行实际操作或获取实时信息。
适用场景:
| 场景 | 描述 | 实现方式 |
|---|---|---|
| 联网搜索 | 让 AI 搜索实时信息 | 集成搜索 API(如 Tavily、SerpAPI) |
| 数据库查询 | 让 AI 查询用户数据、订单信息 | 创建数据库查询工具 |
| API 调用 | 调用外部服务 | 封装 REST API 调用 |
| 知识库搜索 | 搜索内部知识库 | 集成向量数据库 |
示例实现:
python
from langchain.tools import tool
@tool
def search_web(query: str) -> str:
"""搜索网络获取实时信息"""
# 调用搜索 API
results = search_api.search(query)
return format_results(results)
@tool
def get_user_info(user_id: str) -> dict:
"""获取用户信息"""
# 查询数据库
user = db.query(User).filter_by(id=user_id).first()
return {"name": user.name, "email": user.email}
@tool
def send_notification(user_id: str, message: str) -> str:
"""发送通知给用户"""
# 调用通知服务
notification_service.send(user_id, message)
return "通知已发送"集成步骤:
- 定义工具函数
- 将工具绑定到 LLM
- 添加工具执行节点
- 使用条件边判断是否需要调用工具
3.2 Conditional Edges(条件边)⭐⭐⭐⭐⭐
优先级:高
为什么需要: 配合 Tools 使用,根据 LLM 响应动态决定下一步操作。
适用场景:
| 场景 | 描述 |
|---|---|
| 工具调用判断 | 判断是否需要调用工具 |
| 智能路由 | 根据问题类型选择不同处理节点 |
| 多模型协作 | 不同任务使用不同模型 |
示例实现:
python
from typing import Literal
def should_continue(state: MessagesState) -> Literal["tools", END]:
"""判断是否需要调用工具"""
last_message = state["messages"][-1]
if last_message.tool_calls:
return "tools" # 有工具调用,执行工具
return END # 无工具调用,结束
# 构建带条件边的工作流
builder = StateGraph(MessagesState)
builder.add_node("agent", agent_node)
builder.add_node("tools", tool_node)
builder.add_edge(START, "agent")
builder.add_conditional_edges("agent", should_continue, {"tools": "tools", END: END})
builder.add_edge("tools", "agent") # 工具执行后返回 agent完整架构图:
┌─────────────────────────────────────────────┐
│ 带 Tools 的 Agent 架构 │
├─────────────────────────────────────────────┤
│ │
│ START ──► [agent] ──► should_continue() │
│ │ │ │
│ │ ├──► END │
│ │ │ │
│ │ └──► [tools] │
│ │ │ │
│ └────────────────────┘ │
│ │
└─────────────────────────────────────────────┘3.3 Interrupts(中断/人机协作)⭐⭐⭐⭐
优先级:中高
为什么需要: 某些敏感操作需要人工确认后才能执行,提高系统安全性和可控性。
适用场景:
| 场景 | 描述 | 示例 |
|---|---|---|
| 敏感操作审批 | 执行前需要人工确认 | 发送邮件、删除数据、执行支付 |
| 内容审核 | AI 生成内容需要确认后发布 | 自动回复、公告发布 |
| 信息确认 | 关键信息需要用户确认 | 订单信息、联系方式 |
示例实现:
python
from langgraph.types import interrupt
from langgraph.checkpoint.memory import MemorySaver
def send_email_node(state: State):
"""发送邮件节点(带人机确认)"""
# 中断等待用户确认
decision = interrupt({
"action": "send_email",
"to": state["email_to"],
"subject": state["email_subject"],
"body": state["email_body"],
"message": "确认发送此邮件?"
})
if decision:
# 用户批准,执行发送
send_email(state["email_to"], state["email_subject"], state["email_body"])
return {"status": "sent", "email_sent": True}
return {"status": "cancelled", "email_sent": False}
# 使用
result = graph.invoke({"email_to": "user@example.com", ...})
# 检查是否需要人工确认
if result.get("__interrupts__"):
# 返回前端,等待用户确认
return {"need_confirmation": True, "interrupts": result["__interrupts__"]}
# 用户确认后恢复执行
result = graph.invoke(Command(resume=True), config)前端交互流程:
┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐
│ 用户请求 │ ──► │ AI 处理 │ ──► │ 中断等待 │ ──► │ 用户确认 │
└──────────┘ └──────────┘ └──────────┘ └──────────┘
│
▼
┌──────────┐
│ 继续执行 │
└──────────┘3.4 Store(长期记忆)⭐⭐⭐⭐
优先级:中高
为什么需要: 当前 Checkpointer 只能保存单次会话的状态,无法跨会话记住用户偏好。
适用场景:
| 场景 | 描述 |
|---|---|
| 用户偏好记忆 | 记住用户的语言偏好、回答风格偏好 |
| 个性化对话 | 根据用户历史调整对话风格 |
| 知识积累 | 跨会话积累用户相关信息 |
| 上下文增强 | 在新会话中引用之前的对话内容 |
示例实现:
python
from langgraph.store.memory import InMemoryStore
from langgraph.store.postgres import PostgresStore
# 创建 Store(支持语义搜索)
store = PostgresStore.from_conn_string(DB_URI)
store.setup()
def call_model(state: MessagesState, runtime: Runtime[Context]):
user_id = runtime.context.user_id
namespace = ("memories", user_id)
# 搜索相关记忆
memories = runtime.store.search(
namespace,
query=str(state["messages"][-1].content)
)
# 构建记忆上下文
memory_context = "\n".join([m.value["text"] for m in memories])
system_prompt = f"用户相关信息:\n{memory_context}\n\n你是一个友好的助手。"
# 如果用户要求记住某些信息
if "记住" in state["messages"][-1].content:
# 提取并存储记忆
memory = extract_memory(state["messages"][-1].content)
runtime.store.put(namespace, str(uuid.uuid4()), {"text": memory})
# 调用 LLM
response = llm.invoke([
{"role": "system", "content": system_prompt},
*state["messages"]
])
return {"messages": [response]}
# 编译时传入 store
graph = builder.compile(checkpointer=checkpointer, store=store)3.5 Trimming / Summarization(记忆管理)⭐⭐⭐
优先级:中
为什么需要: 当前实现没有处理长对话超出上下文窗口的问题。
适用场景:
| 场景 | 描述 |
|---|---|
| 长对话处理 | 对话超过上下文窗口时自动裁剪 |
| 成本优化 | 减少不必要的 token 消耗 |
| 关键信息保留 | 用摘要替代旧消息,保留关键信息 |
Trimming 示例:
python
from langchain_core.messages import trim_messages
def call_model(state: MessagesState):
# 裁剪消息,保留最近 4000 tokens
trimmed_messages = trim_messages(
state["messages"],
max_tokens=4000,
strategy="last",
token_counter=len, # 或使用更精确的计数器
start_on="human",
end_on=("human", "tool"),
)
response = llm.invoke(trimmed_messages)
return {"messages": [response]}Summarization 示例:
python
class State(TypedDict):
messages: list
summary: str
def summarize_if_needed(state: State):
if len(state["messages"]) > 10:
# 生成摘要
summary_prompt = f"总结以下对话:\n{state['messages']}"
new_summary = llm.invoke(summary_prompt).content
# 只保留最近 2 条消息 + 摘要
return {
"summary": new_summary,
"messages": state["messages"][-2:]
}
return state3.6 Parallel(并行执行)⭐⭐⭐
优先级:中
为什么需要: 某些场景需要同时执行多个独立任务,提升性能。
适用场景:
| 场景 | 描述 |
|---|---|
| 多模型对比 | 同时调用多个模型,对比结果 |
| 多任务处理 | 同时生成文本、图片、摘要 |
| 多角度分析 | 从不同角度分析同一问题 |
示例实现:
python
from operator import add
from typing import Annotated
class ParallelState(TypedDict):
topic: str
results: Annotated[list, add] # 使用 reducer 合并结果
def task_joke(state: ParallelState):
result = llm.invoke(f"写关于 {state['topic']} 的笑话")
return {"results": [{"type": "joke", "content": result.content}]}
def task_poem(state: ParallelState):
result = llm.invoke(f"写关于 {state['topic']} 的诗歌")
return {"results": [{"type": "poem", "content": result.content}]}
def task_story(state: ParallelState):
result = llm.invoke(f"写关于 {state['topic']} 的故事")
return {"results": [{"type": "story", "content": result.content}]}
def aggregator(state: ParallelState):
combined = f"关于 {state['topic']} 的创作:\n\n"
for r in state["results"]:
combined += f"【{r['type']}】\n{r['content']}\n\n"
return {"final_output": combined}
# 构建并行工作流
builder = StateGraph(ParallelState)
builder.add_node("joke", task_joke)
builder.add_node("poem", task_poem)
builder.add_node("story", task_story)
builder.add_node("aggregator", aggregator)
# 并行边:START 同时指向多个节点
builder.add_edge(START, "joke")
builder.add_edge(START, "poem")
builder.add_edge(START, "story")
# 汇聚边:所有节点都指向聚合节点
builder.add_edge("joke", "aggregator")
builder.add_edge("poem", "aggregator")
builder.add_edge("story", "aggregator")
builder.add_edge("aggregator", END)
graph = builder.compile()架构图:
┌──────────┐
│ START │
└────┬─────┘
│
┌──────────────┼──────────────┐
│ │ │
▼ ▼ ▼
┌────────┐ ┌────────┐ ┌────────┐
│ joke │ │ poem │ │ story │
└───┬────┘ └───┬────┘ └───┬────┘
│ │ │
└─────────────┼─────────────┘
│
▼
┌──────────┐
│aggregator│
└────┬─────┘
│
▼
┌──────────┐
│ END │
└──────────┘3.7 Subgraphs(子图)⭐⭐
优先级:低
为什么需要: 将复杂逻辑拆分为可复用的子模块。
适用场景:
| 场景 | 描述 |
|---|---|
| 复杂工作流模块化 | 将大图拆分为可复用子模块 |
| 多步骤子流程 | 如"分析->搜索->总结"作为一个整体复用 |
示例实现:
python
# 子图:搜索流程
class SearchState(TypedDict):
query: str
results: list
def search_node(state: SearchState):
results = search_api(state["query"])
return {"results": results}
search_builder = StateGraph(SearchState)
search_builder.add_node("search", search_node)
search_builder.add_edge(START, "search")
search_builder.add_edge("search", END)
search_graph = search_builder.compile()
# 父图中调用子图
def call_search(state: ParentState):
result = search_graph.invoke({"query": state["user_query"]})
return {"search_results": result["results"]}四、集成建议优先级
| 优先级 | 模块 | 业务价值 | 建议场景 | 预估工时 |
|---|---|---|---|---|
| 🔴 P0 | Tools + Conditional Edges | 扩展 AI 能力 | 联网搜索、数据库查询 | 2-3 天 |
| 🟠 P1 | Interrupts | 安全可控 | 敏感操作审批 | 1-2 天 |
| 🟡 P2 | Store | 用户体验 | 个性化对话 | 2-3 天 |
| 🟡 P2 | Trimming | 成本优化 | 长对话场景 | 0.5 天 |
| 🟢 P3 | Parallel | 性能提升 | 多模型对比 | 1 天 |
| ⚪ P4 | Subgraphs | 代码复用 | 复杂工作流 | 2 天 |
五、推荐实施路径
阶段 1:增强 Agent 能力(P0)
- 实现基础 Tools(搜索、查询)
- 添加 Conditional Edges 实现工具调用路由
- 测试并优化工具调用体验
阶段 2:提升安全性和用户体验(P1-P2)
- 实现 Interrupts 用于敏感操作
- 集成 Store 实现跨会话记忆
- 添加 Trimming 处理长对话
阶段 3: 性能优化和模块化(P3-P4)
- 评估是否需要 Parallel 执行
- 对复杂流程考虑 Subgraphs 模块化