Skip to content

LangGraph Subgraphs 集成方案

一、模块概述

属性说明
模块名称Subgraphs(子图)
优先级⚪ P4(低)
预估工时2 天
依赖项langgraph.graph

为什么需要

将复杂工作流拆分为可复用的子模块:

  • 模块化:将大型复杂图拆分为小型可维护的子图
  • 复用性:子图可以在多个父图中重复使用
  • 封装性:每个子图有独立的状态空间和逻辑
  • 可测试性:子图可以独立测试

二、架构设计

2.1 子图嵌套结构

┌─────────────────────────────────────────────────────────────────────┐
│                         父图 (Parent Graph)                          │
│  ┌───────────────────────────────────────────────────────────────┐ │
│  │                       State: ParentState                        │ │
│  │   - user_query: str                                             │ │
│  │   - search_results: list                                        │ │
│  │   - analysis_result: str                                        │ │
│  │   - final_output: str                                           │ │
│  └───────────────────────────────────────────────────────────────┘ │
│                                                                     │
│   START ──► [路由判断] ──► [搜索子图] ──► [分析子图] ──► END       │
│                                │              │              │          │
│                                │              │              │          │
│                                ▼              ▼              ▼          │
│                         ┌─────────┐    ┌─────────┐    ┌─────────┐    │
│                         │ 父节点  │    │ 父节点  │    │ 父节点  │    │
│                         │(调用子图)│    │(调用子图)│    │(调用子图)│    │
│                         └────┬────┘    └────┬────┘    └────┬────┘    │
│                              │              │              │           │
│   ┌──────────────────────────┼──────────────┼──────────────┼───────┐ │
│   │                          │              │              │       │   │
│   │   ┌──────────────────────▼──────────────▼──────────────▼─────┐ │   │
│   │   │                    子图 (Subgraph)                           │ │   │
│   │   │  ┌───────────────────────────────────────────────────────┐  │ │   │
│   │   │  │                 State: ChildState                         │  │ │   │
│   │   │  │   - query: str                                           │  │ │   │
│   │   │  │   - results: list                                        │  │ │   │
│   │   │  └───────────────────────────────────────────────────────┘  │ │   │
│   │   │                                                          │ │   │
│   │   │   START ──► [处理] ──► [转换] ──► END                       │ │   │
│   │   │                                                        │ │   │
│   │   └────────────────────────────────────────────────────────────┘ │   │
│   │                                                                    │ │
│   └────────────────────────────────────────────────────────────────────┘ │
│                                                                         │
└─────────────────────────────────────────────────────────────────────────┘

2.2 父子图状态转换

python
# 父图调用子图时的状态转换
def call_search_subgraph(state: ParentState) -> ParentState:
    # 1. 从父状态提取子图输入
    child_input = {
        "query": state["user_query"],
        "results": []
    }

    # 2. 调用子图
    child_result = search_subgraph.invoke(child_input)

    # 3. 将子图输出转换为父状态
    return {
        "search_results": child_result["results"]
    }

三、代码实现

3.1 子图定义

创建文件: services/subgraphs/__init__.py

python
"""LangGraph 子图模块

定义可复用的子图组件。
"""
from typing import TypedDict, List
from langgraph.graph import START, StateGraph, END
from langchain_openai import ChatOpenAI
import logging

logger = logging.getLogger(__name__)


# ============ 搜索子图 ============

class SearchState(TypedDict):
    """搜索子图状态"""
    query: str
    results: List[str]


def search_node(state: SearchState) -> SearchState:
    """执行搜索"""
    # TODO: 集成实际搜索 API
    # 示例:返回模拟结果
    mock_results = [
        f"搜索结果 1: 关于 {state['query']} 的信息",
        f"搜索结果 2: {state['query']} 相关内容",
    ]
    return {"results": mock_results}


def process_results_node(state: SearchState) -> SearchState:
    """处理搜索结果"""
    # 对结果进行格式化处理
    processed = [f"[已处理] {r}" for r in state["results"]]
    return {"results": processed}


# 构建搜索子图
search_builder = StateGraph(SearchState)
search_builder.add_node("search", search_node)
search_builder.add_node("process", process_results_node)
search_builder.add_edge(START, "search")
search_builder.add_edge("search", "process")
search_builder.add_edge("process", END)

search_subgraph = search_builder.compile()


# ============ 分析子图 ============

class AnalysisState(TypedDict):
    """分析子图状态"""
    content: str
    analysis_type: str  # sentiment, summary, keywords
    result: str


def analyze_node(state: AnalysisState) -> AnalysisState:
    """执行分析"""
    import os
    llm = ChatOpenAI(
        model="openai/gpt-4o-mini",
        api_key=os.getenv("OPENROUTER_API_KEY"),
        base_url=os.getenv("OPENROUTER_BASE_URL", "https://openrouter.ai/api/v1")
    )

    prompts = {
        "sentiment": f"分析以下内容的情感倾向(正面/负面/中性):\n\n{state['content']}",
        "summary": f"总结以下内容:\n\n{state['content']}",
        "keywords": f"提取以下内容的关键词:\n\n{state['content']}"
    }

    prompt = prompts.get(state["analysis_type"], prompts["summary"])
    result = llm.invoke(prompt)

    return {"result": result.content}


# 构建分析子图
analysis_builder = StateGraph(AnalysisState)
analysis_builder.add_node("analyze", analyze_node)
analysis_builder.add_edge(START, "analyze")
analysis_builder.add_edge("analyze", END)

analysis_subgraph = analysis_builder.compile()


# ============ 翻译子图 ============

class TranslationState(TypedDict):
    """翻译子图状态"""
    text: str
    target_language: str
    translated: str


def translate_node(state: TranslationState) -> TranslationState:
    """执行翻译"""
    import os
    llm = ChatOpenAI(
        model="openai/gpt-4o-mini",
        api_key=os.getenv("OPENROUTER_API_KEY"),
        base_url=os.getenv("OPENROUTER_BASE_URL", "https://openrouter.ai/api/v1")
    )

    prompt = f"将以下内容翻译成{state['target_language']}\n\n{state['text']}"
    result = llm.invoke(prompt)

    return {"translated": result.content}


# 构建翻译子图
translation_builder = StateGraph(TranslationState)
translation_builder.add_node("translate", translate_node)
translation_builder.add_edge(START, "translate")
translation_builder.add_edge("translate", END)

translation_subgraph = translation_builder.compile()


# 导出所有子图
__all__ = [
    "search_subgraph",
    "analysis_subgraph",
    "translation_subgraph",
    "SearchState",
    "AnalysisState",
    "TranslationState",
]

3.2 父图调用子图

创建文件: services/parent_graph.py

python
"""父图 - 组合多个子图实现复杂工作流

展示如何在父图中调用和组合子图。
"""
from typing import TypedDict, List, Optional, Literal
from langgraph.graph import START, StateGraph, END
from langgraph.types import Command

from services.subgraphs import (
    search_subgraph,
    analysis_subgraph,
    translation_subgraph,
)
import logging

logger = logging.getLogger(__name__)


class ParentState(TypedDict):
    """父图状态"""
    user_query: str
    intent: Optional[str]  # search, analyze, translate
    search_results: Optional[List[str]]
    analysis_result: Optional[str]
    translation_result: Optional[str]
    final_output: str


def classify_intent(state: ParentState) -> Command[Literal["search", "analyze", "translate", "respond"]]:
    """分类用户意图"""
    query = state["user_query"].lower()

    if any(kw in query for kw in ["搜索", "查找", "找", "search"]):
        return Command(goto="search", update={"intent": "search"})
    elif any(kw in query for kw in ["分析", "情感", "总结", "关键词", "analyze"]):
        return Command(goto="analyze", update={"intent": "analyze"})
    elif any(kw in query for kw in ["翻译", "translate"]):
        return Command(goto="translate", update={"intent": "translate"})
    else:
        return Command(goto="respond", update={"intent": "chat"})


def call_search(state: ParentState) -> ParentState:
    """调用搜索子图"""
    logger.info(f"调用搜索子图: {state['user_query']}")

    # 调用子图
    result = search_subgraph.invoke({
        "query": state["user_query"],
        "results": []
    })

    return {"search_results": result["results"]}


def call_analysis(state: ParentState) -> ParentState:
    """调用分析子图"""
    logger.info(f"调用分析子图: {state['user_query']}")

    # 确定分析类型
    query = state["user_query"].lower()
    if "情感" in query:
        analysis_type = "sentiment"
    elif "总结" in query:
        analysis_type = "summary"
    elif "关键词" in query:
        analysis_type = "keywords"
    else:
        analysis_type = "summary"

    # 调用子图
    result = analysis_subgraph.invoke({
        "content": state.get("search_results", [state["user_query"]])[-1] if state.get("search_results") else state["user_query"],
        "analysis_type": analysis_type,
        "result": ""
    })

    return {"analysis_result": result["result"]}


def call_translation(state: ParentState) -> ParentState:
    """调用翻译子图"""
    logger.info(f"调用翻译子图: {state['user_query']}")

    # 确定目标语言
    query = state["user_query"]
    if "英文" in query or "英语" in query:
        target_language = "英语"
    elif "日文" in query or "日语" in query:
        target_language = "日语"
    elif "中文" in query:
        target_language = "中文"
    else:
        target_language = "英语"

    # 调用子图
    result = translation_subgraph.invoke({
        "text": state["user_query"],
        "target_language": target_language,
        "translated": ""
    })

    return {"translation_result": result["translated"]}


def respond_directly(state: ParentState) -> ParentState:
    """直接响应"""
    import os
    from langchain_openai import ChatOpenAI

    llm = ChatOpenAI(
        model="openai/gpt-4o-mini",
        api_key=os.getenv("OPENROUTER_API_KEY"),
        base_url=os.getenv("OPENROUTER_BASE_URL", "https://openrouter.ai/api/v1")
    )

    response = llm.invoke(state["user_query"])
    return {"final_output": response.content}


def format_output(state: ParentState) -> ParentState:
    """格式化最终输出"""
    output_parts = []

    if state.get("search_results"):
        output_parts.append("## 搜索结果")
        for r in state["search_results"]:
            output_parts.append(f"- {r}")

    if state.get("analysis_result"):
        output_parts.append("\n## 分析结果")
        output_parts.append(state["analysis_result"])

    if state.get("translation_result"):
        output_parts.append("\n## 翻译结果")
        output_parts.append(state["translation_result"])

    if state.get("final_output"):
        output_parts.append(state["final_output"])

    return {"final_output": "\n".join(output_parts)}


# 构建父图
parent_builder = StateGraph(ParentState)
parent_builder.add_node("classify", classify_intent)
parent_builder.add_node("search", call_search)
parent_builder.add_node("analyze", call_analysis)
parent_builder.add_node("translate", call_translation)
parent_builder.add_node("respond", respond_directly)
parent_builder.add_node("format", format_output)

# 边
parent_builder.add_edge(START, "classify")
# 条件边(通过 Command 实现)
parent_builder.add_conditional_edges(
    "classify",
    lambda state: state.get("intent", "respond"),
    {
        "search": "search",
        "analyze": "analyze",
        "translate": "translate",
        "chat": "respond"
    }
)
# 子图节点都指向格式化节点
parent_builder.add_edge("search", "format")
parent_builder.add_edge("analyze", "format")
parent_builder.add_edge("translate", "format")
parent_builder.add_edge("respond", "format")
parent_builder.add_edge("format", END)

# 编译父图
parent_graph = parent_builder.compile()


# ============ 服务类 ============

class SubgraphEnabledAgent:
    """支持子图的 Agent"""

    def __init__(self):
        self.graph = parent_graph

    def process(self, query: str) -> dict:
        """
        处理用户查询

        自动识别意图并调用相应的子图。

        Args:
            query: 用户查询

        Returns:
            包含处理结果的字典
        """
        result = self.graph.invoke({
            "user_query": query,
            "intent": None,
            "search_results": None,
            "analysis_result": None,
            "translation_result": None,
            "final_output": ""
        })

        return {
            "intent": result.get("intent"),
            "search_results": result.get("search_results"),
            "analysis_result": result.get("analysis_result"),
            "translation_result": result.get("translation_result"),
            "output": result.get("final_output", "")
        }


# 全局实例
_agent: Optional[SubgraphEnabledAgent] = None


def get_subgraph_agent() -> SubgraphEnabledAgent:
    """获取子图 Agent 单例"""
    global _agent
    if _agent is None:
        _agent = SubgraphEnabledAgent()
    return _agent

四、更复杂的子图嵌套

4.1 多层嵌套示例

python
"""多层嵌套子图示例

展示 父图 -> 子图 -> 孙子图 的三层嵌套结构。
"""
from typing import TypedDict, List
from langgraph.graph import START, StateGraph, END

# ============ 孙子图(最底层)============

class GrandchildState(TypedDict):
    text: str
    processed: str

def grandchild_process(state: GrandchildState) -> GrandchildState:
    return {"processed": state["text"].upper()}

grandchild_builder = StateGraph(GrandchildState)
grandchild_builder.add_node("process", grandchild_process)
grandchild_builder.add_edge(START, "process")
grandchild_builder.add_edge("process", END)
grandchild_graph = grandchild_builder.compile()


# ============ 子图(中间层)============

class ChildState(TypedDict):
    items: List[str]
    results: List[str]

def child_process(state: ChildState) -> ChildState:
    results = []
    for item in state["items"]:
        # 调用孙子图
        result = grandchild_graph.invoke({"text": item, "processed": ""})
        results.append(result["processed"])
    return {"results": results}

child_builder = StateGraph(ChildState)
child_builder.add_node("process", child_process)
child_builder.add_edge(START, "process")
child_builder.add_edge("process", END)
child_graph = child_builder.compile()


# ============ 父图(最顶层)============

class ParentState(TypedDict):
    data: List[str]
    final_results: List[str]

def parent_process(state: ParentState) -> ParentState:
    # 调用子图
    result = child_graph.invoke({"items": state["data"], "results": []})
    return {"final_results": result["results"]}

parent_builder = StateGraph(ParentState)
parent_builder.add_node("process", parent_process)
parent_builder.add_edge(START, "process")
parent_builder.add_edge("process", END)
parent_graph = parent_builder.compile()


# 使用示例
if __name__ == "__main__":
    result = parent_graph.invoke({
        "data": ["hello", "world", "langgraph"],
        "final_results": []
    })
    print(result["final_results"])
    # 输出: ['HELLO', 'WORLD', 'LANGGRAPH']

五、API 集成

5.1 路由定义

创建文件: api/subgraph.py

python
"""子图相关 API 路由"""
import logging
from fastapi import APIRouter, Depends
from pydantic import BaseModel
from typing import Optional, List

from services.parent_graph import get_subgraph_agent

router = APIRouter(prefix="/api", tags=["subgraph"])
logger = logging.getLogger(__name__)


class SubgraphRequest(BaseModel):
    """子图请求"""
    query: str
    options: Optional[dict] = None


class SubgraphResponse(BaseModel):
    """子图响应"""
    success: bool
    intent: Optional[str] = None
    search_results: Optional[List[str]] = None
    analysis_result: Optional[str] = None
    translation_result: Optional[str] = None
    output: str
    error: Optional[str] = None


@router.post("/subgraph/process", response_model=SubgraphResponse)
async def process_with_subgraph(request: SubgraphRequest):
    """
    使用子图处理请求

    自动识别意图并调用相应的子图(搜索/分析/翻译)。
    """
    try:
        agent = get_subgraph_agent()
        result = agent.process(request.query)

        return SubgraphResponse(
            success=True,
            intent=result.get("intent"),
            search_results=result.get("search_results"),
            analysis_result=result.get("analysis_result"),
            translation_result=result.get("translation_result"),
            output=result.get("output", "")
        )
    except Exception as e:
        logger.error(f"子图处理失败: {e}", exc_info=True)
        return SubgraphResponse(
            success=False,
            output="",
            error=str(e)
        )

六、测试计划

6.1 单元测试

python
# tests/test_subgraphs.py
import pytest
from services.subgraphs import (
    search_subgraph,
    analysis_subgraph,
    translation_subgraph,
)


def test_search_subgraph():
    """测试搜索子图"""
    result = search_subgraph.invoke({
        "query": "测试查询",
        "results": []
    })

    assert "results" in result
    assert len(result["results"]) > 0


def test_analysis_subgraph_summary():
    """测试分析子图 - 摘要"""
    result = analysis_subgraph.invoke({
        "content": "这是一段测试文本。",
        "analysis_type": "summary",
        "result": ""
    })

    assert "result" in result
    assert len(result["result"]) > 0


def test_translation_subgraph():
    """测试翻译子图"""
    result = translation_subgraph.invoke({
        "text": "Hello",
        "target_language": "中文",
        "translated": ""
    })

    assert "translated" in result


# tests/test_parent_graph.py
def test_parent_graph_search_intent():
    """测试父图 - 搜索意图"""
    from services.parent_graph import get_subgraph_agent

    agent = get_subgraph_agent()
    result = agent.process("搜索人工智能")

    assert result["intent"] == "search"
    assert result.get("search_results") is not None


def test_parent_graph_translation_intent():
    """测试父图 - 翻译意图"""
    from services.parent_graph import get_subgraph_agent

    agent = get_subgraph_agent()
    result = agent.process("翻译成英文:你好世界")

    assert result["intent"] == "translate"
    assert result.get("translation_result") is not None

七、实施步骤

步骤 1: 创建子图模块(0.5 天)

  • 创建 services/subgraphs/__init__.py
  • 实现搜索子图
  • 实现分析子图
  • 实现翻译子图
  • 编写单元测试

步骤 2: 创建父图(0.5 天)

  • 创建 services/parent_graph.py
  • 实现意图分类
  • 实现子图调用节点
  • 编写集成测试

步骤 3: API 集成(0.25 天)

  • 创建 api/subgraph.py
  • 注册路由到 main.py

步骤 4: 测试和优化(0.25 天)

  • 端到端测试
  • 性能测试
  • 文档更新

八、最佳实践

8.1 子图设计原则

  1. 单一职责:每个子图只做一件事
  2. 独立状态:子图有独立的状态定义
  3. 清晰接口:明确定义输入输出字段
  4. 可独立测试:子图可以单独测试

8.2 父子图通信

python
# 推荐:显式状态转换
def call_child(state: ParentState) -> ParentState:
    # 1. 提取子图输入
    child_input = {"query": state["parent_field"]}

    # 2. 调用子图
    child_result = child_graph.invoke(child_input)

    # 3. 转换为父状态
    return {"parent_result": child_result["child_field"]}

8.3 状态隔离

python
# 父图和子图的状态是完全隔离的
# 不应该直接访问对方的字段

# ❌ 错误:直接访问子图字段
def bad_node(state: ParentState):
    # 子图状态字段在父图中不可见
    return state["child_field"]  # 错误!

# ✅ 正确:通过返回值传递
def good_node(state: ParentState):
    result = child_graph.invoke({"input": state["input"]})
    return {"output": result["output"]}

九、相关文档