Appearance
LangGraph Subgraphs 集成方案
一、模块概述
| 属性 | 说明 |
|---|---|
| 模块名称 | Subgraphs(子图) |
| 优先级 | ⚪ P4(低) |
| 预估工时 | 2 天 |
| 依赖项 | langgraph.graph |
为什么需要
将复杂工作流拆分为可复用的子模块:
- 模块化:将大型复杂图拆分为小型可维护的子图
- 复用性:子图可以在多个父图中重复使用
- 封装性:每个子图有独立的状态空间和逻辑
- 可测试性:子图可以独立测试
二、架构设计
2.1 子图嵌套结构
┌─────────────────────────────────────────────────────────────────────┐
│ 父图 (Parent Graph) │
│ ┌───────────────────────────────────────────────────────────────┐ │
│ │ State: ParentState │ │
│ │ - user_query: str │ │
│ │ - search_results: list │ │
│ │ - analysis_result: str │ │
│ │ - final_output: str │ │
│ └───────────────────────────────────────────────────────────────┘ │
│ │
│ START ──► [路由判断] ──► [搜索子图] ──► [分析子图] ──► END │
│ │ │ │ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ 父节点 │ │ 父节点 │ │ 父节点 │ │
│ │(调用子图)│ │(调用子图)│ │(调用子图)│ │
│ └────┬────┘ └────┬────┘ └────┬────┘ │
│ │ │ │ │
│ ┌──────────────────────────┼──────────────┼──────────────┼───────┐ │
│ │ │ │ │ │ │
│ │ ┌──────────────────────▼──────────────▼──────────────▼─────┐ │ │
│ │ │ 子图 (Subgraph) │ │ │
│ │ │ ┌───────────────────────────────────────────────────────┐ │ │ │
│ │ │ │ State: ChildState │ │ │ │
│ │ │ │ - query: str │ │ │ │
│ │ │ │ - results: list │ │ │ │
│ │ │ └───────────────────────────────────────────────────────┘ │ │ │
│ │ │ │ │ │
│ │ │ START ──► [处理] ──► [转换] ──► END │ │ │
│ │ │ │ │ │
│ │ └────────────────────────────────────────────────────────────┘ │ │
│ │ │ │
│ └────────────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────┘2.2 父子图状态转换
python
# 父图调用子图时的状态转换
def call_search_subgraph(state: ParentState) -> ParentState:
# 1. 从父状态提取子图输入
child_input = {
"query": state["user_query"],
"results": []
}
# 2. 调用子图
child_result = search_subgraph.invoke(child_input)
# 3. 将子图输出转换为父状态
return {
"search_results": child_result["results"]
}三、代码实现
3.1 子图定义
创建文件: services/subgraphs/__init__.py
python
"""LangGraph 子图模块
定义可复用的子图组件。
"""
from typing import TypedDict, List
from langgraph.graph import START, StateGraph, END
from langchain_openai import ChatOpenAI
import logging
logger = logging.getLogger(__name__)
# ============ 搜索子图 ============
class SearchState(TypedDict):
"""搜索子图状态"""
query: str
results: List[str]
def search_node(state: SearchState) -> SearchState:
"""执行搜索"""
# TODO: 集成实际搜索 API
# 示例:返回模拟结果
mock_results = [
f"搜索结果 1: 关于 {state['query']} 的信息",
f"搜索结果 2: {state['query']} 相关内容",
]
return {"results": mock_results}
def process_results_node(state: SearchState) -> SearchState:
"""处理搜索结果"""
# 对结果进行格式化处理
processed = [f"[已处理] {r}" for r in state["results"]]
return {"results": processed}
# 构建搜索子图
search_builder = StateGraph(SearchState)
search_builder.add_node("search", search_node)
search_builder.add_node("process", process_results_node)
search_builder.add_edge(START, "search")
search_builder.add_edge("search", "process")
search_builder.add_edge("process", END)
search_subgraph = search_builder.compile()
# ============ 分析子图 ============
class AnalysisState(TypedDict):
"""分析子图状态"""
content: str
analysis_type: str # sentiment, summary, keywords
result: str
def analyze_node(state: AnalysisState) -> AnalysisState:
"""执行分析"""
import os
llm = ChatOpenAI(
model="openai/gpt-4o-mini",
api_key=os.getenv("OPENROUTER_API_KEY"),
base_url=os.getenv("OPENROUTER_BASE_URL", "https://openrouter.ai/api/v1")
)
prompts = {
"sentiment": f"分析以下内容的情感倾向(正面/负面/中性):\n\n{state['content']}",
"summary": f"总结以下内容:\n\n{state['content']}",
"keywords": f"提取以下内容的关键词:\n\n{state['content']}"
}
prompt = prompts.get(state["analysis_type"], prompts["summary"])
result = llm.invoke(prompt)
return {"result": result.content}
# 构建分析子图
analysis_builder = StateGraph(AnalysisState)
analysis_builder.add_node("analyze", analyze_node)
analysis_builder.add_edge(START, "analyze")
analysis_builder.add_edge("analyze", END)
analysis_subgraph = analysis_builder.compile()
# ============ 翻译子图 ============
class TranslationState(TypedDict):
"""翻译子图状态"""
text: str
target_language: str
translated: str
def translate_node(state: TranslationState) -> TranslationState:
"""执行翻译"""
import os
llm = ChatOpenAI(
model="openai/gpt-4o-mini",
api_key=os.getenv("OPENROUTER_API_KEY"),
base_url=os.getenv("OPENROUTER_BASE_URL", "https://openrouter.ai/api/v1")
)
prompt = f"将以下内容翻译成{state['target_language']}:\n\n{state['text']}"
result = llm.invoke(prompt)
return {"translated": result.content}
# 构建翻译子图
translation_builder = StateGraph(TranslationState)
translation_builder.add_node("translate", translate_node)
translation_builder.add_edge(START, "translate")
translation_builder.add_edge("translate", END)
translation_subgraph = translation_builder.compile()
# 导出所有子图
__all__ = [
"search_subgraph",
"analysis_subgraph",
"translation_subgraph",
"SearchState",
"AnalysisState",
"TranslationState",
]3.2 父图调用子图
创建文件: services/parent_graph.py
python
"""父图 - 组合多个子图实现复杂工作流
展示如何在父图中调用和组合子图。
"""
from typing import TypedDict, List, Optional, Literal
from langgraph.graph import START, StateGraph, END
from langgraph.types import Command
from services.subgraphs import (
search_subgraph,
analysis_subgraph,
translation_subgraph,
)
import logging
logger = logging.getLogger(__name__)
class ParentState(TypedDict):
"""父图状态"""
user_query: str
intent: Optional[str] # search, analyze, translate
search_results: Optional[List[str]]
analysis_result: Optional[str]
translation_result: Optional[str]
final_output: str
def classify_intent(state: ParentState) -> Command[Literal["search", "analyze", "translate", "respond"]]:
"""分类用户意图"""
query = state["user_query"].lower()
if any(kw in query for kw in ["搜索", "查找", "找", "search"]):
return Command(goto="search", update={"intent": "search"})
elif any(kw in query for kw in ["分析", "情感", "总结", "关键词", "analyze"]):
return Command(goto="analyze", update={"intent": "analyze"})
elif any(kw in query for kw in ["翻译", "translate"]):
return Command(goto="translate", update={"intent": "translate"})
else:
return Command(goto="respond", update={"intent": "chat"})
def call_search(state: ParentState) -> ParentState:
"""调用搜索子图"""
logger.info(f"调用搜索子图: {state['user_query']}")
# 调用子图
result = search_subgraph.invoke({
"query": state["user_query"],
"results": []
})
return {"search_results": result["results"]}
def call_analysis(state: ParentState) -> ParentState:
"""调用分析子图"""
logger.info(f"调用分析子图: {state['user_query']}")
# 确定分析类型
query = state["user_query"].lower()
if "情感" in query:
analysis_type = "sentiment"
elif "总结" in query:
analysis_type = "summary"
elif "关键词" in query:
analysis_type = "keywords"
else:
analysis_type = "summary"
# 调用子图
result = analysis_subgraph.invoke({
"content": state.get("search_results", [state["user_query"]])[-1] if state.get("search_results") else state["user_query"],
"analysis_type": analysis_type,
"result": ""
})
return {"analysis_result": result["result"]}
def call_translation(state: ParentState) -> ParentState:
"""调用翻译子图"""
logger.info(f"调用翻译子图: {state['user_query']}")
# 确定目标语言
query = state["user_query"]
if "英文" in query or "英语" in query:
target_language = "英语"
elif "日文" in query or "日语" in query:
target_language = "日语"
elif "中文" in query:
target_language = "中文"
else:
target_language = "英语"
# 调用子图
result = translation_subgraph.invoke({
"text": state["user_query"],
"target_language": target_language,
"translated": ""
})
return {"translation_result": result["translated"]}
def respond_directly(state: ParentState) -> ParentState:
"""直接响应"""
import os
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(
model="openai/gpt-4o-mini",
api_key=os.getenv("OPENROUTER_API_KEY"),
base_url=os.getenv("OPENROUTER_BASE_URL", "https://openrouter.ai/api/v1")
)
response = llm.invoke(state["user_query"])
return {"final_output": response.content}
def format_output(state: ParentState) -> ParentState:
"""格式化最终输出"""
output_parts = []
if state.get("search_results"):
output_parts.append("## 搜索结果")
for r in state["search_results"]:
output_parts.append(f"- {r}")
if state.get("analysis_result"):
output_parts.append("\n## 分析结果")
output_parts.append(state["analysis_result"])
if state.get("translation_result"):
output_parts.append("\n## 翻译结果")
output_parts.append(state["translation_result"])
if state.get("final_output"):
output_parts.append(state["final_output"])
return {"final_output": "\n".join(output_parts)}
# 构建父图
parent_builder = StateGraph(ParentState)
parent_builder.add_node("classify", classify_intent)
parent_builder.add_node("search", call_search)
parent_builder.add_node("analyze", call_analysis)
parent_builder.add_node("translate", call_translation)
parent_builder.add_node("respond", respond_directly)
parent_builder.add_node("format", format_output)
# 边
parent_builder.add_edge(START, "classify")
# 条件边(通过 Command 实现)
parent_builder.add_conditional_edges(
"classify",
lambda state: state.get("intent", "respond"),
{
"search": "search",
"analyze": "analyze",
"translate": "translate",
"chat": "respond"
}
)
# 子图节点都指向格式化节点
parent_builder.add_edge("search", "format")
parent_builder.add_edge("analyze", "format")
parent_builder.add_edge("translate", "format")
parent_builder.add_edge("respond", "format")
parent_builder.add_edge("format", END)
# 编译父图
parent_graph = parent_builder.compile()
# ============ 服务类 ============
class SubgraphEnabledAgent:
"""支持子图的 Agent"""
def __init__(self):
self.graph = parent_graph
def process(self, query: str) -> dict:
"""
处理用户查询
自动识别意图并调用相应的子图。
Args:
query: 用户查询
Returns:
包含处理结果的字典
"""
result = self.graph.invoke({
"user_query": query,
"intent": None,
"search_results": None,
"analysis_result": None,
"translation_result": None,
"final_output": ""
})
return {
"intent": result.get("intent"),
"search_results": result.get("search_results"),
"analysis_result": result.get("analysis_result"),
"translation_result": result.get("translation_result"),
"output": result.get("final_output", "")
}
# 全局实例
_agent: Optional[SubgraphEnabledAgent] = None
def get_subgraph_agent() -> SubgraphEnabledAgent:
"""获取子图 Agent 单例"""
global _agent
if _agent is None:
_agent = SubgraphEnabledAgent()
return _agent四、更复杂的子图嵌套
4.1 多层嵌套示例
python
"""多层嵌套子图示例
展示 父图 -> 子图 -> 孙子图 的三层嵌套结构。
"""
from typing import TypedDict, List
from langgraph.graph import START, StateGraph, END
# ============ 孙子图(最底层)============
class GrandchildState(TypedDict):
text: str
processed: str
def grandchild_process(state: GrandchildState) -> GrandchildState:
return {"processed": state["text"].upper()}
grandchild_builder = StateGraph(GrandchildState)
grandchild_builder.add_node("process", grandchild_process)
grandchild_builder.add_edge(START, "process")
grandchild_builder.add_edge("process", END)
grandchild_graph = grandchild_builder.compile()
# ============ 子图(中间层)============
class ChildState(TypedDict):
items: List[str]
results: List[str]
def child_process(state: ChildState) -> ChildState:
results = []
for item in state["items"]:
# 调用孙子图
result = grandchild_graph.invoke({"text": item, "processed": ""})
results.append(result["processed"])
return {"results": results}
child_builder = StateGraph(ChildState)
child_builder.add_node("process", child_process)
child_builder.add_edge(START, "process")
child_builder.add_edge("process", END)
child_graph = child_builder.compile()
# ============ 父图(最顶层)============
class ParentState(TypedDict):
data: List[str]
final_results: List[str]
def parent_process(state: ParentState) -> ParentState:
# 调用子图
result = child_graph.invoke({"items": state["data"], "results": []})
return {"final_results": result["results"]}
parent_builder = StateGraph(ParentState)
parent_builder.add_node("process", parent_process)
parent_builder.add_edge(START, "process")
parent_builder.add_edge("process", END)
parent_graph = parent_builder.compile()
# 使用示例
if __name__ == "__main__":
result = parent_graph.invoke({
"data": ["hello", "world", "langgraph"],
"final_results": []
})
print(result["final_results"])
# 输出: ['HELLO', 'WORLD', 'LANGGRAPH']五、API 集成
5.1 路由定义
创建文件: api/subgraph.py
python
"""子图相关 API 路由"""
import logging
from fastapi import APIRouter, Depends
from pydantic import BaseModel
from typing import Optional, List
from services.parent_graph import get_subgraph_agent
router = APIRouter(prefix="/api", tags=["subgraph"])
logger = logging.getLogger(__name__)
class SubgraphRequest(BaseModel):
"""子图请求"""
query: str
options: Optional[dict] = None
class SubgraphResponse(BaseModel):
"""子图响应"""
success: bool
intent: Optional[str] = None
search_results: Optional[List[str]] = None
analysis_result: Optional[str] = None
translation_result: Optional[str] = None
output: str
error: Optional[str] = None
@router.post("/subgraph/process", response_model=SubgraphResponse)
async def process_with_subgraph(request: SubgraphRequest):
"""
使用子图处理请求
自动识别意图并调用相应的子图(搜索/分析/翻译)。
"""
try:
agent = get_subgraph_agent()
result = agent.process(request.query)
return SubgraphResponse(
success=True,
intent=result.get("intent"),
search_results=result.get("search_results"),
analysis_result=result.get("analysis_result"),
translation_result=result.get("translation_result"),
output=result.get("output", "")
)
except Exception as e:
logger.error(f"子图处理失败: {e}", exc_info=True)
return SubgraphResponse(
success=False,
output="",
error=str(e)
)六、测试计划
6.1 单元测试
python
# tests/test_subgraphs.py
import pytest
from services.subgraphs import (
search_subgraph,
analysis_subgraph,
translation_subgraph,
)
def test_search_subgraph():
"""测试搜索子图"""
result = search_subgraph.invoke({
"query": "测试查询",
"results": []
})
assert "results" in result
assert len(result["results"]) > 0
def test_analysis_subgraph_summary():
"""测试分析子图 - 摘要"""
result = analysis_subgraph.invoke({
"content": "这是一段测试文本。",
"analysis_type": "summary",
"result": ""
})
assert "result" in result
assert len(result["result"]) > 0
def test_translation_subgraph():
"""测试翻译子图"""
result = translation_subgraph.invoke({
"text": "Hello",
"target_language": "中文",
"translated": ""
})
assert "translated" in result
# tests/test_parent_graph.py
def test_parent_graph_search_intent():
"""测试父图 - 搜索意图"""
from services.parent_graph import get_subgraph_agent
agent = get_subgraph_agent()
result = agent.process("搜索人工智能")
assert result["intent"] == "search"
assert result.get("search_results") is not None
def test_parent_graph_translation_intent():
"""测试父图 - 翻译意图"""
from services.parent_graph import get_subgraph_agent
agent = get_subgraph_agent()
result = agent.process("翻译成英文:你好世界")
assert result["intent"] == "translate"
assert result.get("translation_result") is not None七、实施步骤
步骤 1: 创建子图模块(0.5 天)
- 创建
services/subgraphs/__init__.py - 实现搜索子图
- 实现分析子图
- 实现翻译子图
- 编写单元测试
步骤 2: 创建父图(0.5 天)
- 创建
services/parent_graph.py - 实现意图分类
- 实现子图调用节点
- 编写集成测试
步骤 3: API 集成(0.25 天)
- 创建
api/subgraph.py - 注册路由到
main.py
步骤 4: 测试和优化(0.25 天)
- 端到端测试
- 性能测试
- 文档更新
八、最佳实践
8.1 子图设计原则
- 单一职责:每个子图只做一件事
- 独立状态:子图有独立的状态定义
- 清晰接口:明确定义输入输出字段
- 可独立测试:子图可以单独测试
8.2 父子图通信
python
# 推荐:显式状态转换
def call_child(state: ParentState) -> ParentState:
# 1. 提取子图输入
child_input = {"query": state["parent_field"]}
# 2. 调用子图
child_result = child_graph.invoke(child_input)
# 3. 转换为父状态
return {"parent_result": child_result["child_field"]}8.3 状态隔离
python
# 父图和子图的状态是完全隔离的
# 不应该直接访问对方的字段
# ❌ 错误:直接访问子图字段
def bad_node(state: ParentState):
# 子图状态字段在父图中不可见
return state["child_field"] # 错误!
# ✅ 正确:通过返回值传递
def good_node(state: ParentState):
result = child_graph.invoke({"input": state["input"]})
return {"output": result["output"]}