Appearance
场景 6: 复杂工作流编排
模块:Subgraphs(子图)优先级:⚪ P4(低)业务价值:模块化复杂流程,提高代码复用性
一、业务背景
1.1 当前挑战
随着功能增加,Agent 逻辑变得越来越复杂:
问题:
- 单个 Graph 节点过多,难以维护
- 相似逻辑无法复用
- 测试困难
- 修改一处可能影响其他功能
1.2 子图解决方案
二、子图设计
2.1 子图拆分原则
2.2 子图划分方案
基于项目实际需求,设计以下子图:
| 子图名称 | 功能 | 状态字段 | 复用场景 |
|---|---|---|---|
| SearchSubgraph | 联网搜索 | query, results | 问答、研究、推荐 |
| AnalysisSubgraph | 内容分析 | content, type, result | 情感分析、摘要、关键词 |
| TranslationSubgraph | 多语言翻译 | text, target_lang, translated | 文档翻译、实时翻译 |
| ImageProcessSubgraph | 图片处理 | image_url, operation, result | 裁剪、OCR、风格提取 |
| CodeReviewSubgraph | 代码审查 | code, language, issues | 开发辅助 |
2.3 子图通信机制
三、代码实现
3.1 子图模块
创建文件: services/subgraphs/search_subgraph.py
python
"""搜索子图
可复用的联网搜索功能。
"""
from typing import TypedDict, List, Optional
from langgraph.graph import StateGraph, START, END
import logging
logger = logging.getLogger(__name__)
class SearchState(TypedDict):
"""搜索子图状态"""
query: str # 搜索查询
max_results: int # 最大结果数
results: List[dict] # 搜索结果
summary: Optional[str] # 结果摘要
def search_node(state: SearchState) -> SearchState:
"""执行搜索"""
# TODO: 集成实际搜索 API
# 示例返回
mock_results = [
{
"title": f"搜索结果 1: {state['query']}",
"url": "https://example.com/1",
"snippet": "这是搜索结果的摘要内容..."
},
{
"title": f"搜索结果 2: {state['query']}",
"url": "https://example.com/2",
"snippet": "另一个搜索结果的摘要..."
}
]
return {"results": mock_results[:state.get("max_results", 3)]}
def summarize_node(state: SearchState) -> SearchState:
"""生成摘要"""
if not state["results"]:
return {"summary": "未找到相关结果"}
snippets = [r["snippet"] for r in state["results"]]
summary = f"找到 {len(state['results'])} 条结果:" + " | ".join(snippets[:2])
return {"summary": summary}
# 构建子图
search_builder = StateGraph(SearchState)
search_builder.add_node("search", search_node)
search_builder.add_node("summarize", summarize_node)
search_builder.add_edge(START, "search")
search_builder.add_edge("search", "summarize")
search_builder.add_edge("summarize", END)
search_subgraph = search_builder.compile()创建文件: services/subgraphs/analysis_subgraph.py
python
"""分析子图
可复用的内容分析功能。
"""
from typing import TypedDict, Optional, Literal
from langgraph.graph import StateGraph, START, END
from langchain_openai import ChatOpenAI
import os
import logging
logger = logging.getLogger(__name__)
class AnalysisState(TypedDict):
"""分析子图状态"""
content: str # 待分析内容
analysis_type: Literal["sentiment", "summary", "keywords"] # 分析类型
result: Optional[str] # 分析结果
confidence: Optional[float] # 置信度
def analyze_node(state: AnalysisState) -> AnalysisState:
"""执行分析"""
llm = ChatOpenAI(
model=os.getenv("OPENROUTER_MODEL", "openai/gpt-4o-mini"),
api_key=os.getenv("OPENROUTER_API_KEY"),
base_url=os.getenv("OPENROUTER_BASE_URL", "https://openrouter.ai/api/v1")
)
prompts = {
"sentiment": f"""分析以下内容的情感倾向(正面/负面/中性),并给出置信度。
内容:{state['content']}
请以 JSON 格式返回:
{{"sentiment": "正面/负面/中性", "confidence": 0.0-1.0, "reason": "原因"}}
""",
"summary": f"""请总结以下内容的核心要点(3-5 条):
{state['content']}
""",
"keywords": f"""从以下内容中提取关键词(5-10 个):
{state['content']}
请按重要性排序返回。
"""
}
prompt = prompts.get(state["analysis_type"], prompts["summary"])
response = llm.invoke(prompt)
return {
"result": response.content,
"confidence": 0.85 # 简化,实际应从响应中提取
}
# 构建子图
analysis_builder = StateGraph(AnalysisState)
analysis_builder.add_node("analyze", analyze_node)
analysis_builder.add_edge(START, "analyze")
analysis_builder.add_edge("analyze", END)
analysis_subgraph = analysis_builder.compile()3.2 父图编排
创建文件: services/orchestrator.py
python
"""工作流编排器
使用子图组合复杂工作流。
"""
from typing import TypedDict, Optional, List, Dict, Any, Literal
from langgraph.graph import StateGraph, START, END
from langgraph.types import Command
import logging
from services.subgraphs.search_subgraph import search_subgraph
from services.subgraphs.analysis_subgraph import analysis_subgraph
logger = logging.getLogger(__name__)
class OrchestratorState(TypedDict):
"""编排器状态"""
user_input: str
intent: Optional[str]
search_results: Optional[List[dict]]
analysis_result: Optional[str]
final_output: str
class WorkflowOrchestrator:
"""工作流编排器"""
def classify_intent(
self,
state: OrchestratorState
) -> Command[Literal["search", "analyze", "respond"]]:
"""分类用户意图"""
text = state["user_input"].lower()
# 简单关键词匹配(实际可用 LLM 分类)
if any(kw in text for kw in ["搜索", "查找", "找", "search"]):
return Command(goto="search", update={"intent": "search"})
elif any(kw in text for kw in ["分析", "情感", "总结", "关键词", "analyze"]):
return Command(goto="analyze", update={"intent": "analyze"})
else:
return Command(goto="respond", update={"intent": "chat"})
def call_search_subgraph(self, state: OrchestratorState) -> OrchestratorState:
"""调用搜索子图"""
logger.info(f"调用搜索子图: {state['user_input']}")
result = search_subgraph.invoke({
"query": state["user_input"],
"max_results": 5,
"results": [],
"summary": None
})
return {"search_results": result["results"]}
def call_analysis_subgraph(self, state: OrchestratorState) -> OrchestratorState:
"""调用分析子图"""
logger.info(f"调用分析子图: {state['user_input']}")
text = state["user_input"]
# 确定分析类型
if "情感" in text:
analysis_type = "sentiment"
elif "总结" in text or "摘要" in text:
analysis_type = "summary"
elif "关键词" in text:
analysis_type = "keywords"
else:
analysis_type = "summary"
# 使用搜索结果或用户输入作为分析内容
content = text
if state.get("search_results"):
content = "\n".join([
r.get("snippet", r.get("title", ""))
for r in state["search_results"]
])
result = analysis_subgraph.invoke({
"content": content,
"analysis_type": analysis_type,
"result": None,
"confidence": None
})
return {"analysis_result": result["result"]}
def respond_directly(self, state: OrchestratorState) -> OrchestratorState:
"""直接响应(无子图)"""
from langchain_openai import ChatOpenAI
import os
llm = ChatOpenAI(
model=os.getenv("OPENROUTER_MODEL", "openai/gpt-4o-mini"),
api_key=os.getenv("OPENROUTER_API_KEY"),
base_url=os.getenv("OPENROUTER_BASE_URL", "https://openrouter.ai/api/v1")
)
response = llm.invoke(state["user_input"])
return {"final_output": response.content}
def format_output(self, state: OrchestratorState) -> OrchestratorState:
"""格式化最终输出"""
parts = []
if state.get("search_results"):
parts.append("## 🔍 搜索结果\n")
for r in state["search_results"]:
parts.append(f"- [{r.get('title', 'Untitled')}]({r.get('url', '#')})")
parts.append(f" {r.get('snippet', '')}\n")
if state.get("analysis_result"):
parts.append("\n## 📊 分析结果\n")
parts.append(state["analysis_result"])
if state.get("final_output"):
parts.append(state["final_output"])
return {"final_output": "\n".join(parts) if parts else "无结果"}
def build_graph(self, checkpointer):
"""构建编排图"""
builder = StateGraph(OrchestratorState)
# 添加节点
builder.add_node("classify", self.classify_intent)
builder.add_node("search", self.call_search_subgraph)
builder.add_node("analyze", self.call_analysis_subgraph)
builder.add_node("respond", self.respond_directly)
builder.add_node("format", self.format_output)
# 添加边
builder.add_edge(START, "classify")
builder.add_conditional_edges(
"classify",
lambda s: s.get("intent", "respond"),
{
"search": "search",
"analyze": "analyze",
"chat": "respond"
}
)
builder.add_edge("search", "format")
builder.add_edge("analyze", "format")
builder.add_edge("respond", "format")
builder.add_edge("format", END)
return builder.compile(checkpointer=checkpointer)
# 全局实例
_orchestrator: Optional[WorkflowOrchestrator] = None
def get_orchestrator() -> WorkflowOrchestrator:
"""获取编排器单例"""
global _orchestrator
if _orchestrator is None:
_orchestrator = WorkflowOrchestrator()
return _orchestrator四、API 集成
python
# api/orchestrator.py
from fastapi import APIRouter
from pydantic import BaseModel
from typing import Optional
from services.orchestrator import get_orchestrator
from services.checkpointer import get_checkpointer
router = APIRouter(prefix="/api", tags=["orchestrator"])
class OrchestratorRequest(BaseModel):
input: str
thread_id: Optional[str] = None
@router.post("/orchestrator/process")
async def process_with_orchestrator(request: OrchestratorRequest):
"""
使用编排器处理请求
自动分类意图并调用相应的子图。
"""
orchestrator = get_orchestrator()
with get_checkpointer() as checkpointer:
graph = orchestrator.build_graph(checkpointer)
thread_id = request.thread_id or "anonymous"
config = {"configurable": {"thread_id": thread_id}}
result = graph.invoke({
"user_input": request.input,
"intent": None,
"search_results": None,
"analysis_result": None,
"final_output": ""
}, config)
return {
"success": True,
"intent": result.get("intent"),
"output": result.get("final_output", "")
}五、架构图
5.1 完整架构
5.2 状态流转
六、子图复用示例
6.1 在不同场景复用
python
# 场景 1: 问答机器人
class QABot:
def build(self):
builder = StateGraph(QAState)
# 复用搜索子图
builder.add_node("search", self.call_search_subgraph)
# ... 其他节点
# 场景 2: 研究助手
class ResearchAssistant:
def build(self):
builder = StateGraph(ResearchState)
# 复用搜索子图
builder.add_node("search", self.call_search_subgraph)
# 复用分析子图
builder.add_node("analyze", self.call_analysis_subgraph)
# ... 其他节点
# 场景 3: 内容创作工具
class ContentCreator:
def build(self):
builder = StateGraph(ContentState)
# 复用分析子图(关键词提取)
builder.add_node("keywords", lambda s: analysis_subgraph.invoke({
"content": s["topic"],
"analysis_type": "keywords",
...
}))
# ... 其他节点七、测试策略
7.1 子图独立测试
python
# tests/test_subgraphs.py
def test_search_subgraph():
"""独立测试搜索子图"""
result = search_subgraph.invoke({
"query": "Python 异步编程",
"max_results": 3,
"results": [],
"summary": None
})
assert len(result["results"]) > 0
assert result["summary"] is not None
def test_analysis_subgraph_sentiment():
"""独立测试情感分析子图"""
result = analysis_subgraph.invoke({
"content": "这个产品非常好用,我很喜欢!",
"analysis_type": "sentiment",
"result": None,
"confidence": None
})
assert result["result"] is not None
assert "正面" in result["result"] or "positive" in result["result"].lower()7.2 父图集成测试
python
# tests/test_orchestrator.py
def test_orchestrator_search_intent():
"""测试编排器 - 搜索意图"""
orchestrator = get_orchestrator()
# 使用内存 checkpointer 测试
from langgraph.checkpoint.memory import MemorySaver
graph = orchestrator.build_graph(MemorySaver())
result = graph.invoke({
"user_input": "搜索 Python 教程",
"intent": None,
"search_results": None,
"analysis_result": None,
"final_output": ""
})
assert result["intent"] == "search"
assert result.get("search_results") is not None八、实施计划
| 步骤 | 任务 | 预估时间 |
|---|---|---|
| 1 | 创建 services/subgraphs/ 目录结构 | 0.5h |
| 2 | 实现搜索子图 | 2h |
| 3 | 实现分析子图 | 1.5h |
| 4 | 实现翻译子图 | 1h |
| 5 | 实现编排器 | 2h |
| 6 | API 集成 | 1h |
| 7 | 测试 | 2h |
| 总计 | 10h (1.5天) |