Skip to content

LangGraph 核心流程与扩展指南

本文档介绍项目中 /api/chat/stream/api/generate-image 两个核心 API 的工作原理,以及 LangGraph 的架构和扩展方法。


一、/api/chat/stream - 聊天对话 API

1.1 功能概述

用户发送问题,AI 以流式方式(一字一句)返回回答,类似 ChatGPT 的体验。

1.2 请求参数定义

源文件: schemas/chat.py

python
class ChatRequest(BaseModel):
    """聊天请求"""
    prompt: str = Field(..., min_length=1, max_length=100000)
    provider: str = "openai"
    model: Optional[str] = None
    conversation_id: Optional[str] = None
    system_prompt: Optional[str] = None
    images: Optional[List[str]] = Field(
        default=None,
        description="图片列表,支持多种格式:base64、COS key、URL"
    )
参数类型必填默认值说明
promptstring-用户输入,1-100000 字符
providerstring"openai"提供商标识
modelstringnull模型 ID(如 openai/gpt-4o
conversation_idstringnull会话 ID(新对话时不传)
system_promptstringnull自定义系统提示词
imagesList[string]null图片列表(支持 base64/COS key/URL)

1.3 内部流程

详细步骤:

  1. 认证:通过 Session 中间件检查登录状态,获取用户信息
  2. 处理图片:如果上传了图片,会先上传到腾讯云 COS,生成访问链接
  3. 调用 AI:通过 OpenRouter(AI 模型聚合平台)调用 GPT、Gemini 等模型
  4. 流式返回:使用 SSE (Server-Sent Events) 技术实现 token 级别流式输出

1.4 API 路由实现

源文件: api/chat.py

python
@router.post("/chat/stream")
async def chat_stream(
    data: ChatRequest,
    request: Request,
    db: Session = Depends(get_db)
):
    """流式聊天接口 (SSE)"""
    session = get_session(request)
    user_id = session.get("user", {}).get("user_id")

    # 获取用户信息用于 LangSmith 追踪
    user_session = session.get("user", {})
    user_info = {
        'user_id': user_session.get('user_id'),
        'nickname': user_session.get('name'),
        'email': user_session.get('email'),
    } if user_session else None

    # ... 会话验证逻辑 ...

    async def generate():
        """SSE 生成器 - 包装同步迭代器"""
        full_content = ""

        try:
            # 1. 处理图片(支持 base64、COS key、URL 混合格式)
            if data.images:
                llm_image_urls, db_image_keys = process_mixed_images(
                    data.images, user_id
                )

            # 2. 存储用户消息
            if data.conversation_id:
                user_msg = Message(
                    conversation_id=data.conversation_id,
                    role="user",
                    content=message_content,
                    provider=data.provider,
                    model_name=data.model,
                )
                db.add(user_msg)
                db.commit()

            # 3. 获取聊天服务并流式调用
            chat_service = get_chat_service()

            for chunk in chat_service.chat_stream(
                thread_id=thread_id or "anonymous",
                prompt=data.prompt,
                model=data.model,
                system_prompt=data.system_prompt,
                images=llm_image_urls,
                user_info=user_info
            ):
                full_content += chunk
                yield f"data: {json.dumps({'content': chunk})}\n\n"

            # 4. 存储 AI 回复
            if data.conversation_id:
                ai_msg = Message(
                    conversation_id=data.conversation_id,
                    role="assistant",
                    content=full_content,
                    provider=data.provider,
                    model_name=data.model,
                )
                db.add(ai_msg)
                db.commit()

            yield f"data: {json.dumps({'done': True})}\n\n"

        except Exception as e:
            yield f"data: {json.dumps({'error': str(e)})}\n\n"

    return StreamingResponse(
        generate(),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "X-Accel-Buffering": "no",
        }
    )

1.5 LangGraph 调用实现

源文件: services/langgraph_chat.py

python
def chat_stream(
    self,
    thread_id: str,
    prompt: str,
    model: Optional[str] = None,
    system_prompt: Optional[str] = None,
    images: Optional[List[str]] = None,
    temperature: float = 0.7,
    user_info: Optional[Dict[str, Any]] = None
) -> Iterator[str]:
    """流式对话调用"""

    # 1. 构建消息内容(支持文本+图片)
    content = self._build_content(prompt, images)

    # 2. 获取 MySQL Checkpointer
    with get_checkpointer() as checkpointer:
        llm = self._get_llm(model, temperature)

        # 3. 构建 Prompt 模板
        prompt_template = ChatPromptTemplate.from_messages([
            ("system", system_prompt or self.config.default_system_prompt),
            MessagesPlaceholder("messages"),
        ])

        def call_model(state: MessagesState):
            chain = prompt_template | llm
            response = chain.invoke(state["messages"])
            return {"messages": [response]}

        # 4. 构建 LangGraph StateGraph
        workflow = StateGraph(MessagesState)
        workflow.add_node("agent", call_model)
        workflow.add_edge(START, "agent")
        app = workflow.compile(checkpointer=checkpointer)

        config = {"configurable": {"thread_id": thread_id}}

        # 5. 流式输出(stream_mode="messages")
        for chunk in app.stream(
            {"messages": [HumanMessage(content=content)]},
            config=config,
            stream_mode="messages"
        ):
            if isinstance(chunk, tuple) and len(chunk) == 2:
                message_chunk, metadata = chunk
                if hasattr(message_chunk, 'content') and message_chunk.content:
                    yield message_chunk.content

1.6 数据流架构

1.7 数据持久化

存储位置用途
Message存储用户问题和 AI 回答
checkpointLangGraph 自动存储对话状态,支持下次继续对话

1.8 响应格式

响应类型:text/event-stream (SSE)

javascript
// 正常内容块
data: {"content": "你"}
data: {"content": "好"}

// 完成标记
data: {"done": true}

// 错误情况
data: {"error": "错误信息"}

二、/api/generate-image - 图片生成 API

2.1 功能概述

用户描述想要什么图,AI 就帮你画出来。支持纯文生图和图生图两种模式。

2.2 请求参数定义

源文件: schemas/image.py

python
class ImageGenerateRequest(BaseModel):
    """图片生成请求"""
    prompt: str = Field(..., min_length=1, max_length=10000)
    conversation_id: Optional[str] = None
    model: Optional[str] = None
    size: Optional[str] = "1024x1024"
    n: Optional[int] = Field(1, ge=1, le=4)
    reference_images: Optional[List[str]] = Field(
        default=None,
        description="参考图片列表(用于图生图),支持多种格式:base64、COS key、URL"
    )
参数类型必填说明
promptstring图片描述提示词 (1-10000 字符)
conversation_idstring会话 ID,用于关联对话历史
modelstring生成模型,默认 google/gemini-3.1-flash-image-preview
sizestring图片尺寸,默认 1024x1024
nint生成图片数量,默认 1,范围 1-4
reference_imagesList[string]参考图片列表(用于图生图)

2.3 内部流程

详细步骤:

  1. 认证:检查登录状态
  2. 处理参考图:如果有参考图片,会统一处理格式
  3. 让 AI 画:通过 OpenRouter 调用 Gemini 等图片生成模型
  4. 存图返回:AI 返回 base64 图片,上传到腾讯云 COS,返回临时访问链接(2 小时有效)

2.4 API 路由实现

源文件: api/image.py

python
@router.post("/generate-image")
async def generate_image(
    data: ImageGenerateRequest,
    db: Session = Depends(get_db),
    session: dict = Depends(require_login)
):
    """图片生成接口(支持多种模型,支持图生图)"""
    user_id = session.get("user", {}).get("user_id")
    prompt = data.prompt
    model = data.model or "google/gemini-3.1-flash-image-preview"
    reference_images = data.reference_images

    # 构建用户信息用于 LangSmith 追踪
    user_info = {
        'user_id': user_id,
        'nickname': session.get('user', {}).get('name'),
        'email': session.get('user', {}).get('email'),
    }

    try:
        # 1. 获取会话(如果有)
        conv = None
        if data.conversation_id:
            conv = db.query(Conversation).filter_by(
                id=data.conversation_id, user_id=user_id
            ).first()

        # 2. 处理参考图片(支持 base64、COS key、URL 混合格式)
        llm_ref_urls = []
        db_ref_keys = []

        if reference_images:
            llm_ref_urls, db_ref_keys = process_mixed_images(
                reference_images, user_id
            )

        # 3. 存储用户消息
        if conv:
            user_msg = Message(
                conversation_id=data.conversation_id,
                role="user",
                content=json.dumps({"text": prompt, "images": db_ref_keys}),
                provider="gemini",
                model_name=model,
            )
            db.add(user_msg)

        # 4. 调用生图 API
        thread_id = get_thread_id(data.conversation_id) if data.conversation_id else None
        base64_urls = model_integration.generate_image(
            prompt=prompt,
            model=model,
            user_info=user_info,
            reference_images=llm_ref_urls if llm_ref_urls else None,
            thread_id=thread_id
        )

        # 5. 保存图片到 COS
        local_urls = []
        for data_url in base64_urls:
            if data_url.startswith("data:"):
                local_url = save_image_from_base64(user_id, data_url)
                local_urls.append(local_url)

        # 6. 存储 AI 回复
        if conv:
            ai_msg = Message(
                conversation_id=data.conversation_id,
                role="assistant",
                content=json.dumps({
                    "type": "image_generation",
                    "prompt": prompt,
                    "model": model,
                    "images": local_urls
                }),
                provider="gemini",
                model_name=model,
            )
            db.add(ai_msg)
            conv.mode = "image"
            db.commit()

        # 7. 返回签名 URL
        signed_urls = [get_signed_url_from_key(url) for url in local_urls]

        return {"success": True, "image_urls": signed_urls}

    except Exception as e:
        db.rollback()
        raise HTTPException(status_code=500, detail={"success": False, "error": str(e)})

2.5 图片生成服务实现

源文件: services/model_integration.py

python
def generate_image(
    self,
    prompt: str,
    model: str,
    user_info: Optional[Dict[str, Any]] = None,
    reference_images: Optional[List[str]] = None,
    thread_id: Optional[str] = None
) -> List[str]:
    """生成图片(支持图生图)"""

    # 根据模型前缀选择调用方式
    if model.startswith("google/"):
        return self._generate_image_gemini(prompt, model, reference_images)
    elif model.startswith("bytedance-seed/"):
        return self._generate_image_bytedance(prompt, model)
    else:
        raise ValueError(f"不支持的图片生成模型: {model}")


def _generate_image_gemini(
    self,
    prompt: str,
    model: str,
    reference_images: Optional[List[str]] = None
) -> List[str]:
    """使用 Gemini 模型生成图片(支持图生图)"""

    # 构建消息内容
    content = []

    # 添加参考图片(图生图)
    if reference_images:
        for img_url in reference_images:
            content.append({
                "type": "image_url",
                "image_url": {"url": img_url}
            })

    # 添加文本提示词
    content.append({"type": "text", "text": prompt})

    # 调用 OpenRouter API
    response = self.openai_client.chat.completions.create(
        model=model,
        messages=[{"role": "user", "content": content}],
        extra_body={"modalities": ["image", "text"]}
    )

    # 提取生成的图片(base64 格式)
    base64_urls = []
    for choice in response.choices:
        if hasattr(choice.message, 'content') and choice.message.content:
            # Gemini 返回的是 base64 data URL
            base64_urls.append(choice.message.content)

    return base64_urls

2.6 图片生成数据流

2.7 响应格式

json
{
  "success": true,
  "image_urls": [
    "https://cos-xxx.cos.ap-shanghai.myqcloud.com/uploads/images/123/abc.png?sign=xxx"
  ]
}

注意: 返回的签名 URL 有效期为 2 小时,过期后需要重新获取。


三、LangGraph 架构解析

3.0 整体架构图

3.1 核心文件

文件作用
services/langgraph_chat.py核心入口 - Graph 构建、节点定义、流式输出
services/checkpointer.py持久化 - MySQL Checkpointer
api/chat.pyAPI 入口 - 调用 chat_stream()

3.2 核心类:LangGraphChatService

python
class LangGraphChatService:
    def chat()           # 同步对话(返回完整响应)
    def chat_stream()    # 流式对话(一字一字返回)← 主要使用
    def get_state()      # 获取会话状态
    def update_state()   # 更新会话状态
    def clear_state()    # 清除会话状态

3.3 当前 Graph 结构

python
# 在 chat_stream() 方法中(第 396-399 行)
workflow = StateGraph(MessagesState)
workflow.add_node("agent", call_model)   # 只有一个节点
workflow.add_edge(START, "agent")         # START → agent
app = workflow.compile(checkpointer=checkpointer)

流程图:

这是一个最简单的单节点 Graph:开始 → 调用 LLM → 结束

3.4 持久化机制

使用 langgraph-checkpoint-mysql 包实现会话持久化:

python
# services/checkpointer.py
from langgraph.checkpoint.mysql.pymysql import PyMySQLSaver

@contextmanager
def get_checkpointer():
    conn_string = f"mysql+pymysql://{user}:{password}@{host}:{port}/{database}"
    with PyMySQLSaver.from_conn_string(conn_string) as checkpointer:
        checkpointer.setup()  # 创建必要的表(幂等)
        yield checkpointer

工作原理:

  • 每次对话后自动保存状态到 checkpoint
  • 下次对话时通过 thread_id 自动恢复历史消息

四、如何集成 Agent 能力

4.1 添加工具 (Tools)

步骤 1:定义工具

python
from langchain_core.tools import tool

@tool
def search_web(query: str) -> str:
    """搜索网页获取信息"""
    # 你的搜索逻辑
    return "搜索结果..."

@tool
def get_weather(city: str) -> str:
    """获取指定城市的天气"""
    return f"{city} 晴天 25°C"

tools = [search_web, get_weather]

步骤 2:绑定工具到 LLM

python
llm = ChatOpenAI(model="openai/gpt-4o", ...)
llm_with_tools = llm.bind_tools(tools)

4.2 改造成带工具调用的 Agent

修改 chat_stream() 方法中的 workflow:

python
from langgraph.prebuilt import ToolNode

def call_model(state: MessagesState):
    """LLM 决定是否调用工具"""
    response = llm_with_tools.invoke(state["messages"])
    return {"messages": [response]}

def should_continue(state: MessagesState) -> str:
    """判断是否需要调用工具"""
    last_message = state["messages"][-1]
    if hasattr(last_message, 'tool_calls') and last_message.tool_calls:
        return "tools"
    return END

# 构建带工具的 workflow
workflow = StateGraph(MessagesState)
workflow.add_node("agent", call_model)
workflow.add_node("tools", ToolNode(tools))
workflow.add_edge(START, "agent")
workflow.add_conditional_edges("agent", should_continue, ["tools", END])
workflow.add_edge("tools", "agent")  # 工具结果返回给 agent

改造后的流程图:

4.3 扩展记忆/内存 (Memory)

当前已有: MySQL Checkpointer 自动持久化会话历史

如需更复杂的记忆:

python
# 简单内存(不持久化,适合临时存储)
from langgraph.checkpoint.memory import MemorySaver
checkpointer = MemorySaver()

# 跨会话的长期记忆存储
from langgraph.store.memory import InMemoryStore
store = InMemoryStore()  # 可存储用户画像、长期记忆等

五、支持的模型

5.1 对话模型(部分)

模型 ID名称提供商
openai/gpt-4oGPT-4oOpenAI
google/gemini-3.1-proGemini 3.1 ProGoogle
x-ai/grok-4Grok 4xAI
qwen/qwen-3.5-9bQwen 3.5 9B阿里云

5.2 图片生成模型

模型 ID名称支持图生图
google/gemini-2.5-flash-imageNano Banana
google/gemini-3-pro-image-previewNano Banana Pro
bytedance-seed/seedream-4.5Seedream 4.5

完整模型配置见:config/models.py


六、关键技术栈

技术用途
FastAPIWeb 框架
LangGraph状态管理 + 流式输出
LangChainLLM 抽象层
OpenRouter多模型统一入口
MySQL + PyMySQL数据持久化
腾讯云 COS图片存储
SSE (Server-Sent Events)流式响应
LangSmith可选的追踪/调试平台

七、扩展建议

  1. 先在 langgraph_chat.py 中定义工具(可在文件顶部或单独 tools/ 目录)
  2. 修改 _build_graph()chat_stream() 中的 workflow
  3. 添加条件边(conditional_edges)处理工具调用
  4. 测试流式输出是否正常
  5. 在 LangSmith 中追踪调试

八、参考资料