Skip to content

场景 6: 复杂工作流编排

模块:Subgraphs(子图)优先级:⚪ P4(低)业务价值:模块化复杂流程,提高代码复用性

一、业务背景

1.1 当前挑战

随着功能增加,Agent 逻辑变得越来越复杂:

问题:

  • 单个 Graph 节点过多,难以维护
  • 相似逻辑无法复用
  • 测试困难
  • 修改一处可能影响其他功能

1.2 子图解决方案


二、子图设计

2.1 子图拆分原则

2.2 子图划分方案

基于项目实际需求,设计以下子图:

子图名称功能状态字段复用场景
SearchSubgraph联网搜索query, results问答、研究、推荐
AnalysisSubgraph内容分析content, type, result情感分析、摘要、关键词
TranslationSubgraph多语言翻译text, target_lang, translated文档翻译、实时翻译
ImageProcessSubgraph图片处理image_url, operation, result裁剪、OCR、风格提取
CodeReviewSubgraph代码审查code, language, issues开发辅助

2.3 子图通信机制


三、代码实现

3.1 子图模块

创建文件: services/subgraphs/search_subgraph.py

python
"""搜索子图

可复用的联网搜索功能。
"""
from typing import TypedDict, List, Optional
from langgraph.graph import StateGraph, START, END
import logging

logger = logging.getLogger(__name__)


class SearchState(TypedDict):
    """搜索子图状态"""
    query: str                       # 搜索查询
    max_results: int                 # 最大结果数
    results: List[dict]              # 搜索结果
    summary: Optional[str]           # 结果摘要


def search_node(state: SearchState) -> SearchState:
    """执行搜索"""
    # TODO: 集成实际搜索 API
    # 示例返回
    mock_results = [
        {
            "title": f"搜索结果 1: {state['query']}",
            "url": "https://example.com/1",
            "snippet": "这是搜索结果的摘要内容..."
        },
        {
            "title": f"搜索结果 2: {state['query']}",
            "url": "https://example.com/2",
            "snippet": "另一个搜索结果的摘要..."
        }
    ]

    return {"results": mock_results[:state.get("max_results", 3)]}


def summarize_node(state: SearchState) -> SearchState:
    """生成摘要"""
    if not state["results"]:
        return {"summary": "未找到相关结果"}

    snippets = [r["snippet"] for r in state["results"]]
    summary = f"找到 {len(state['results'])} 条结果:" + " | ".join(snippets[:2])

    return {"summary": summary}


# 构建子图
search_builder = StateGraph(SearchState)
search_builder.add_node("search", search_node)
search_builder.add_node("summarize", summarize_node)
search_builder.add_edge(START, "search")
search_builder.add_edge("search", "summarize")
search_builder.add_edge("summarize", END)

search_subgraph = search_builder.compile()

创建文件: services/subgraphs/analysis_subgraph.py

python
"""分析子图

可复用的内容分析功能。
"""
from typing import TypedDict, Optional, Literal
from langgraph.graph import StateGraph, START, END
from langchain_openai import ChatOpenAI
import os
import logging

logger = logging.getLogger(__name__)


class AnalysisState(TypedDict):
    """分析子图状态"""
    content: str                                    # 待分析内容
    analysis_type: Literal["sentiment", "summary", "keywords"]  # 分析类型
    result: Optional[str]                           # 分析结果
    confidence: Optional[float]                     # 置信度


def analyze_node(state: AnalysisState) -> AnalysisState:
    """执行分析"""
    llm = ChatOpenAI(
        model=os.getenv("OPENROUTER_MODEL", "openai/gpt-4o-mini"),
        api_key=os.getenv("OPENROUTER_API_KEY"),
        base_url=os.getenv("OPENROUTER_BASE_URL", "https://openrouter.ai/api/v1")
    )

    prompts = {
        "sentiment": f"""分析以下内容的情感倾向(正面/负面/中性),并给出置信度。

内容:{state['content']}

请以 JSON 格式返回:
{{"sentiment": "正面/负面/中性", "confidence": 0.0-1.0, "reason": "原因"}}
""",
        "summary": f"""请总结以下内容的核心要点(3-5 条):

{state['content']}
""",
        "keywords": f"""从以下内容中提取关键词(5-10 个):

{state['content']}

请按重要性排序返回。
"""
    }

    prompt = prompts.get(state["analysis_type"], prompts["summary"])
    response = llm.invoke(prompt)

    return {
        "result": response.content,
        "confidence": 0.85  # 简化,实际应从响应中提取
    }


# 构建子图
analysis_builder = StateGraph(AnalysisState)
analysis_builder.add_node("analyze", analyze_node)
analysis_builder.add_edge(START, "analyze")
analysis_builder.add_edge("analyze", END)

analysis_subgraph = analysis_builder.compile()

3.2 父图编排

创建文件: services/orchestrator.py

python
"""工作流编排器

使用子图组合复杂工作流。
"""
from typing import TypedDict, Optional, List, Dict, Any, Literal
from langgraph.graph import StateGraph, START, END
from langgraph.types import Command
import logging

from services.subgraphs.search_subgraph import search_subgraph
from services.subgraphs.analysis_subgraph import analysis_subgraph

logger = logging.getLogger(__name__)


class OrchestratorState(TypedDict):
    """编排器状态"""
    user_input: str
    intent: Optional[str]
    search_results: Optional[List[dict]]
    analysis_result: Optional[str]
    final_output: str


class WorkflowOrchestrator:
    """工作流编排器"""

    def classify_intent(
        self,
        state: OrchestratorState
    ) -> Command[Literal["search", "analyze", "respond"]]:
        """分类用户意图"""
        text = state["user_input"].lower()

        # 简单关键词匹配(实际可用 LLM 分类)
        if any(kw in text for kw in ["搜索", "查找", "找", "search"]):
            return Command(goto="search", update={"intent": "search"})
        elif any(kw in text for kw in ["分析", "情感", "总结", "关键词", "analyze"]):
            return Command(goto="analyze", update={"intent": "analyze"})
        else:
            return Command(goto="respond", update={"intent": "chat"})

    def call_search_subgraph(self, state: OrchestratorState) -> OrchestratorState:
        """调用搜索子图"""
        logger.info(f"调用搜索子图: {state['user_input']}")

        result = search_subgraph.invoke({
            "query": state["user_input"],
            "max_results": 5,
            "results": [],
            "summary": None
        })

        return {"search_results": result["results"]}

    def call_analysis_subgraph(self, state: OrchestratorState) -> OrchestratorState:
        """调用分析子图"""
        logger.info(f"调用分析子图: {state['user_input']}")

        text = state["user_input"]

        # 确定分析类型
        if "情感" in text:
            analysis_type = "sentiment"
        elif "总结" in text or "摘要" in text:
            analysis_type = "summary"
        elif "关键词" in text:
            analysis_type = "keywords"
        else:
            analysis_type = "summary"

        # 使用搜索结果或用户输入作为分析内容
        content = text
        if state.get("search_results"):
            content = "\n".join([
                r.get("snippet", r.get("title", ""))
                for r in state["search_results"]
            ])

        result = analysis_subgraph.invoke({
            "content": content,
            "analysis_type": analysis_type,
            "result": None,
            "confidence": None
        })

        return {"analysis_result": result["result"]}

    def respond_directly(self, state: OrchestratorState) -> OrchestratorState:
        """直接响应(无子图)"""
        from langchain_openai import ChatOpenAI
        import os

        llm = ChatOpenAI(
            model=os.getenv("OPENROUTER_MODEL", "openai/gpt-4o-mini"),
            api_key=os.getenv("OPENROUTER_API_KEY"),
            base_url=os.getenv("OPENROUTER_BASE_URL", "https://openrouter.ai/api/v1")
        )

        response = llm.invoke(state["user_input"])
        return {"final_output": response.content}

    def format_output(self, state: OrchestratorState) -> OrchestratorState:
        """格式化最终输出"""
        parts = []

        if state.get("search_results"):
            parts.append("## 🔍 搜索结果\n")
            for r in state["search_results"]:
                parts.append(f"- [{r.get('title', 'Untitled')}]({r.get('url', '#')})")
                parts.append(f"  {r.get('snippet', '')}\n")

        if state.get("analysis_result"):
            parts.append("\n## 📊 分析结果\n")
            parts.append(state["analysis_result"])

        if state.get("final_output"):
            parts.append(state["final_output"])

        return {"final_output": "\n".join(parts) if parts else "无结果"}

    def build_graph(self, checkpointer):
        """构建编排图"""
        builder = StateGraph(OrchestratorState)

        # 添加节点
        builder.add_node("classify", self.classify_intent)
        builder.add_node("search", self.call_search_subgraph)
        builder.add_node("analyze", self.call_analysis_subgraph)
        builder.add_node("respond", self.respond_directly)
        builder.add_node("format", self.format_output)

        # 添加边
        builder.add_edge(START, "classify")
        builder.add_conditional_edges(
            "classify",
            lambda s: s.get("intent", "respond"),
            {
                "search": "search",
                "analyze": "analyze",
                "chat": "respond"
            }
        )
        builder.add_edge("search", "format")
        builder.add_edge("analyze", "format")
        builder.add_edge("respond", "format")
        builder.add_edge("format", END)

        return builder.compile(checkpointer=checkpointer)


# 全局实例
_orchestrator: Optional[WorkflowOrchestrator] = None


def get_orchestrator() -> WorkflowOrchestrator:
    """获取编排器单例"""
    global _orchestrator
    if _orchestrator is None:
        _orchestrator = WorkflowOrchestrator()
    return _orchestrator

四、API 集成

python
# api/orchestrator.py

from fastapi import APIRouter
from pydantic import BaseModel
from typing import Optional
from services.orchestrator import get_orchestrator
from services.checkpointer import get_checkpointer

router = APIRouter(prefix="/api", tags=["orchestrator"])


class OrchestratorRequest(BaseModel):
    input: str
    thread_id: Optional[str] = None


@router.post("/orchestrator/process")
async def process_with_orchestrator(request: OrchestratorRequest):
    """
    使用编排器处理请求

    自动分类意图并调用相应的子图。
    """
    orchestrator = get_orchestrator()

    with get_checkpointer() as checkpointer:
        graph = orchestrator.build_graph(checkpointer)

        thread_id = request.thread_id or "anonymous"
        config = {"configurable": {"thread_id": thread_id}}

        result = graph.invoke({
            "user_input": request.input,
            "intent": None,
            "search_results": None,
            "analysis_result": None,
            "final_output": ""
        }, config)

        return {
            "success": True,
            "intent": result.get("intent"),
            "output": result.get("final_output", "")
        }

五、架构图

5.1 完整架构

5.2 状态流转


六、子图复用示例

6.1 在不同场景复用

python
# 场景 1: 问答机器人
class QABot:
    def build(self):
        builder = StateGraph(QAState)
        # 复用搜索子图
        builder.add_node("search", self.call_search_subgraph)
        # ... 其他节点


# 场景 2: 研究助手
class ResearchAssistant:
    def build(self):
        builder = StateGraph(ResearchState)
        # 复用搜索子图
        builder.add_node("search", self.call_search_subgraph)
        # 复用分析子图
        builder.add_node("analyze", self.call_analysis_subgraph)
        # ... 其他节点


# 场景 3: 内容创作工具
class ContentCreator:
    def build(self):
        builder = StateGraph(ContentState)
        # 复用分析子图(关键词提取)
        builder.add_node("keywords", lambda s: analysis_subgraph.invoke({
            "content": s["topic"],
            "analysis_type": "keywords",
            ...
        }))
        # ... 其他节点

七、测试策略

7.1 子图独立测试

python
# tests/test_subgraphs.py

def test_search_subgraph():
    """独立测试搜索子图"""
    result = search_subgraph.invoke({
        "query": "Python 异步编程",
        "max_results": 3,
        "results": [],
        "summary": None
    })

    assert len(result["results"]) > 0
    assert result["summary"] is not None


def test_analysis_subgraph_sentiment():
    """独立测试情感分析子图"""
    result = analysis_subgraph.invoke({
        "content": "这个产品非常好用,我很喜欢!",
        "analysis_type": "sentiment",
        "result": None,
        "confidence": None
    })

    assert result["result"] is not None
    assert "正面" in result["result"] or "positive" in result["result"].lower()

7.2 父图集成测试

python
# tests/test_orchestrator.py

def test_orchestrator_search_intent():
    """测试编排器 - 搜索意图"""
    orchestrator = get_orchestrator()
    # 使用内存 checkpointer 测试
    from langgraph.checkpoint.memory import MemorySaver

    graph = orchestrator.build_graph(MemorySaver())

    result = graph.invoke({
        "user_input": "搜索 Python 教程",
        "intent": None,
        "search_results": None,
        "analysis_result": None,
        "final_output": ""
    })

    assert result["intent"] == "search"
    assert result.get("search_results") is not None

八、实施计划

步骤任务预估时间
1创建 services/subgraphs/ 目录结构0.5h
2实现搜索子图2h
3实现分析子图1.5h
4实现翻译子图1h
5实现编排器2h
6API 集成1h
7测试2h
总计10h (1.5天)