Appearance
LangGraph 核心流程与扩展指南
本文档介绍项目中 /api/chat/stream 和 /api/generate-image 两个核心 API 的工作原理,以及 LangGraph 的架构和扩展方法。
一、/api/chat/stream - 聊天对话 API
1.1 功能概述
用户发送问题,AI 以流式方式(一字一句)返回回答,类似 ChatGPT 的体验。
1.2 请求参数定义
源文件: schemas/chat.py
python
class ChatRequest(BaseModel):
"""聊天请求"""
prompt: str = Field(..., min_length=1, max_length=100000)
provider: str = "openai"
model: Optional[str] = None
conversation_id: Optional[str] = None
system_prompt: Optional[str] = None
images: Optional[List[str]] = Field(
default=None,
description="图片列表,支持多种格式:base64、COS key、URL"
)| 参数 | 类型 | 必填 | 默认值 | 说明 |
|---|---|---|---|---|
prompt | string | 是 | - | 用户输入,1-100000 字符 |
provider | string | 否 | "openai" | 提供商标识 |
model | string | 否 | null | 模型 ID(如 openai/gpt-4o) |
conversation_id | string | 否 | null | 会话 ID(新对话时不传) |
system_prompt | string | 否 | null | 自定义系统提示词 |
images | List[string] | 否 | null | 图片列表(支持 base64/COS key/URL) |
1.3 内部流程
详细步骤:
- 认证:通过 Session 中间件检查登录状态,获取用户信息
- 处理图片:如果上传了图片,会先上传到腾讯云 COS,生成访问链接
- 调用 AI:通过 OpenRouter(AI 模型聚合平台)调用 GPT、Gemini 等模型
- 流式返回:使用 SSE (Server-Sent Events) 技术实现 token 级别流式输出
1.4 API 路由实现
源文件: api/chat.py
python
@router.post("/chat/stream")
async def chat_stream(
data: ChatRequest,
request: Request,
db: Session = Depends(get_db)
):
"""流式聊天接口 (SSE)"""
session = get_session(request)
user_id = session.get("user", {}).get("user_id")
# 获取用户信息用于 LangSmith 追踪
user_session = session.get("user", {})
user_info = {
'user_id': user_session.get('user_id'),
'nickname': user_session.get('name'),
'email': user_session.get('email'),
} if user_session else None
# ... 会话验证逻辑 ...
async def generate():
"""SSE 生成器 - 包装同步迭代器"""
full_content = ""
try:
# 1. 处理图片(支持 base64、COS key、URL 混合格式)
if data.images:
llm_image_urls, db_image_keys = process_mixed_images(
data.images, user_id
)
# 2. 存储用户消息
if data.conversation_id:
user_msg = Message(
conversation_id=data.conversation_id,
role="user",
content=message_content,
provider=data.provider,
model_name=data.model,
)
db.add(user_msg)
db.commit()
# 3. 获取聊天服务并流式调用
chat_service = get_chat_service()
for chunk in chat_service.chat_stream(
thread_id=thread_id or "anonymous",
prompt=data.prompt,
model=data.model,
system_prompt=data.system_prompt,
images=llm_image_urls,
user_info=user_info
):
full_content += chunk
yield f"data: {json.dumps({'content': chunk})}\n\n"
# 4. 存储 AI 回复
if data.conversation_id:
ai_msg = Message(
conversation_id=data.conversation_id,
role="assistant",
content=full_content,
provider=data.provider,
model_name=data.model,
)
db.add(ai_msg)
db.commit()
yield f"data: {json.dumps({'done': True})}\n\n"
except Exception as e:
yield f"data: {json.dumps({'error': str(e)})}\n\n"
return StreamingResponse(
generate(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no",
}
)1.5 LangGraph 调用实现
源文件: services/langgraph_chat.py
python
def chat_stream(
self,
thread_id: str,
prompt: str,
model: Optional[str] = None,
system_prompt: Optional[str] = None,
images: Optional[List[str]] = None,
temperature: float = 0.7,
user_info: Optional[Dict[str, Any]] = None
) -> Iterator[str]:
"""流式对话调用"""
# 1. 构建消息内容(支持文本+图片)
content = self._build_content(prompt, images)
# 2. 获取 MySQL Checkpointer
with get_checkpointer() as checkpointer:
llm = self._get_llm(model, temperature)
# 3. 构建 Prompt 模板
prompt_template = ChatPromptTemplate.from_messages([
("system", system_prompt or self.config.default_system_prompt),
MessagesPlaceholder("messages"),
])
def call_model(state: MessagesState):
chain = prompt_template | llm
response = chain.invoke(state["messages"])
return {"messages": [response]}
# 4. 构建 LangGraph StateGraph
workflow = StateGraph(MessagesState)
workflow.add_node("agent", call_model)
workflow.add_edge(START, "agent")
app = workflow.compile(checkpointer=checkpointer)
config = {"configurable": {"thread_id": thread_id}}
# 5. 流式输出(stream_mode="messages")
for chunk in app.stream(
{"messages": [HumanMessage(content=content)]},
config=config,
stream_mode="messages"
):
if isinstance(chunk, tuple) and len(chunk) == 2:
message_chunk, metadata = chunk
if hasattr(message_chunk, 'content') and message_chunk.content:
yield message_chunk.content1.6 数据流架构
1.7 数据持久化
| 存储位置 | 用途 |
|---|---|
Message 表 | 存储用户问题和 AI 回答 |
checkpoint 表 | LangGraph 自动存储对话状态,支持下次继续对话 |
1.8 响应格式
响应类型:text/event-stream (SSE)
javascript
// 正常内容块
data: {"content": "你"}
data: {"content": "好"}
// 完成标记
data: {"done": true}
// 错误情况
data: {"error": "错误信息"}二、/api/generate-image - 图片生成 API
2.1 功能概述
用户描述想要什么图,AI 就帮你画出来。支持纯文生图和图生图两种模式。
2.2 请求参数定义
源文件: schemas/image.py
python
class ImageGenerateRequest(BaseModel):
"""图片生成请求"""
prompt: str = Field(..., min_length=1, max_length=10000)
conversation_id: Optional[str] = None
model: Optional[str] = None
size: Optional[str] = "1024x1024"
n: Optional[int] = Field(1, ge=1, le=4)
reference_images: Optional[List[str]] = Field(
default=None,
description="参考图片列表(用于图生图),支持多种格式:base64、COS key、URL"
)| 参数 | 类型 | 必填 | 说明 |
|---|---|---|---|
prompt | string | 是 | 图片描述提示词 (1-10000 字符) |
conversation_id | string | 否 | 会话 ID,用于关联对话历史 |
model | string | 否 | 生成模型,默认 google/gemini-3.1-flash-image-preview |
size | string | 否 | 图片尺寸,默认 1024x1024 |
n | int | 否 | 生成图片数量,默认 1,范围 1-4 |
reference_images | List[string] | 否 | 参考图片列表(用于图生图) |
2.3 内部流程
详细步骤:
- 认证:检查登录状态
- 处理参考图:如果有参考图片,会统一处理格式
- 让 AI 画:通过 OpenRouter 调用 Gemini 等图片生成模型
- 存图返回:AI 返回 base64 图片,上传到腾讯云 COS,返回临时访问链接(2 小时有效)
2.4 API 路由实现
源文件: api/image.py
python
@router.post("/generate-image")
async def generate_image(
data: ImageGenerateRequest,
db: Session = Depends(get_db),
session: dict = Depends(require_login)
):
"""图片生成接口(支持多种模型,支持图生图)"""
user_id = session.get("user", {}).get("user_id")
prompt = data.prompt
model = data.model or "google/gemini-3.1-flash-image-preview"
reference_images = data.reference_images
# 构建用户信息用于 LangSmith 追踪
user_info = {
'user_id': user_id,
'nickname': session.get('user', {}).get('name'),
'email': session.get('user', {}).get('email'),
}
try:
# 1. 获取会话(如果有)
conv = None
if data.conversation_id:
conv = db.query(Conversation).filter_by(
id=data.conversation_id, user_id=user_id
).first()
# 2. 处理参考图片(支持 base64、COS key、URL 混合格式)
llm_ref_urls = []
db_ref_keys = []
if reference_images:
llm_ref_urls, db_ref_keys = process_mixed_images(
reference_images, user_id
)
# 3. 存储用户消息
if conv:
user_msg = Message(
conversation_id=data.conversation_id,
role="user",
content=json.dumps({"text": prompt, "images": db_ref_keys}),
provider="gemini",
model_name=model,
)
db.add(user_msg)
# 4. 调用生图 API
thread_id = get_thread_id(data.conversation_id) if data.conversation_id else None
base64_urls = model_integration.generate_image(
prompt=prompt,
model=model,
user_info=user_info,
reference_images=llm_ref_urls if llm_ref_urls else None,
thread_id=thread_id
)
# 5. 保存图片到 COS
local_urls = []
for data_url in base64_urls:
if data_url.startswith("data:"):
local_url = save_image_from_base64(user_id, data_url)
local_urls.append(local_url)
# 6. 存储 AI 回复
if conv:
ai_msg = Message(
conversation_id=data.conversation_id,
role="assistant",
content=json.dumps({
"type": "image_generation",
"prompt": prompt,
"model": model,
"images": local_urls
}),
provider="gemini",
model_name=model,
)
db.add(ai_msg)
conv.mode = "image"
db.commit()
# 7. 返回签名 URL
signed_urls = [get_signed_url_from_key(url) for url in local_urls]
return {"success": True, "image_urls": signed_urls}
except Exception as e:
db.rollback()
raise HTTPException(status_code=500, detail={"success": False, "error": str(e)})2.5 图片生成服务实现
源文件: services/model_integration.py
python
def generate_image(
self,
prompt: str,
model: str,
user_info: Optional[Dict[str, Any]] = None,
reference_images: Optional[List[str]] = None,
thread_id: Optional[str] = None
) -> List[str]:
"""生成图片(支持图生图)"""
# 根据模型前缀选择调用方式
if model.startswith("google/"):
return self._generate_image_gemini(prompt, model, reference_images)
elif model.startswith("bytedance-seed/"):
return self._generate_image_bytedance(prompt, model)
else:
raise ValueError(f"不支持的图片生成模型: {model}")
def _generate_image_gemini(
self,
prompt: str,
model: str,
reference_images: Optional[List[str]] = None
) -> List[str]:
"""使用 Gemini 模型生成图片(支持图生图)"""
# 构建消息内容
content = []
# 添加参考图片(图生图)
if reference_images:
for img_url in reference_images:
content.append({
"type": "image_url",
"image_url": {"url": img_url}
})
# 添加文本提示词
content.append({"type": "text", "text": prompt})
# 调用 OpenRouter API
response = self.openai_client.chat.completions.create(
model=model,
messages=[{"role": "user", "content": content}],
extra_body={"modalities": ["image", "text"]}
)
# 提取生成的图片(base64 格式)
base64_urls = []
for choice in response.choices:
if hasattr(choice.message, 'content') and choice.message.content:
# Gemini 返回的是 base64 data URL
base64_urls.append(choice.message.content)
return base64_urls2.6 图片生成数据流
2.7 响应格式
json
{
"success": true,
"image_urls": [
"https://cos-xxx.cos.ap-shanghai.myqcloud.com/uploads/images/123/abc.png?sign=xxx"
]
}注意: 返回的签名 URL 有效期为 2 小时,过期后需要重新获取。
三、LangGraph 架构解析
3.0 整体架构图
3.1 核心文件
| 文件 | 作用 |
|---|---|
services/langgraph_chat.py | 核心入口 - Graph 构建、节点定义、流式输出 |
services/checkpointer.py | 持久化 - MySQL Checkpointer |
api/chat.py | API 入口 - 调用 chat_stream() |
3.2 核心类:LangGraphChatService
python
class LangGraphChatService:
def chat() # 同步对话(返回完整响应)
def chat_stream() # 流式对话(一字一字返回)← 主要使用
def get_state() # 获取会话状态
def update_state() # 更新会话状态
def clear_state() # 清除会话状态3.3 当前 Graph 结构
python
# 在 chat_stream() 方法中(第 396-399 行)
workflow = StateGraph(MessagesState)
workflow.add_node("agent", call_model) # 只有一个节点
workflow.add_edge(START, "agent") # START → agent
app = workflow.compile(checkpointer=checkpointer)流程图:
这是一个最简单的单节点 Graph:开始 → 调用 LLM → 结束
3.4 持久化机制
使用 langgraph-checkpoint-mysql 包实现会话持久化:
python
# services/checkpointer.py
from langgraph.checkpoint.mysql.pymysql import PyMySQLSaver
@contextmanager
def get_checkpointer():
conn_string = f"mysql+pymysql://{user}:{password}@{host}:{port}/{database}"
with PyMySQLSaver.from_conn_string(conn_string) as checkpointer:
checkpointer.setup() # 创建必要的表(幂等)
yield checkpointer工作原理:
- 每次对话后自动保存状态到
checkpoint表 - 下次对话时通过
thread_id自动恢复历史消息
四、如何集成 Agent 能力
4.1 添加工具 (Tools)
步骤 1:定义工具
python
from langchain_core.tools import tool
@tool
def search_web(query: str) -> str:
"""搜索网页获取信息"""
# 你的搜索逻辑
return "搜索结果..."
@tool
def get_weather(city: str) -> str:
"""获取指定城市的天气"""
return f"{city} 晴天 25°C"
tools = [search_web, get_weather]步骤 2:绑定工具到 LLM
python
llm = ChatOpenAI(model="openai/gpt-4o", ...)
llm_with_tools = llm.bind_tools(tools)4.2 改造成带工具调用的 Agent
修改 chat_stream() 方法中的 workflow:
python
from langgraph.prebuilt import ToolNode
def call_model(state: MessagesState):
"""LLM 决定是否调用工具"""
response = llm_with_tools.invoke(state["messages"])
return {"messages": [response]}
def should_continue(state: MessagesState) -> str:
"""判断是否需要调用工具"""
last_message = state["messages"][-1]
if hasattr(last_message, 'tool_calls') and last_message.tool_calls:
return "tools"
return END
# 构建带工具的 workflow
workflow = StateGraph(MessagesState)
workflow.add_node("agent", call_model)
workflow.add_node("tools", ToolNode(tools))
workflow.add_edge(START, "agent")
workflow.add_conditional_edges("agent", should_continue, ["tools", END])
workflow.add_edge("tools", "agent") # 工具结果返回给 agent改造后的流程图:
4.3 扩展记忆/内存 (Memory)
当前已有: MySQL Checkpointer 自动持久化会话历史
如需更复杂的记忆:
python
# 简单内存(不持久化,适合临时存储)
from langgraph.checkpoint.memory import MemorySaver
checkpointer = MemorySaver()
# 跨会话的长期记忆存储
from langgraph.store.memory import InMemoryStore
store = InMemoryStore() # 可存储用户画像、长期记忆等五、支持的模型
5.1 对话模型(部分)
| 模型 ID | 名称 | 提供商 |
|---|---|---|
openai/gpt-4o | GPT-4o | OpenAI |
google/gemini-3.1-pro | Gemini 3.1 Pro | |
x-ai/grok-4 | Grok 4 | xAI |
qwen/qwen-3.5-9b | Qwen 3.5 9B | 阿里云 |
5.2 图片生成模型
| 模型 ID | 名称 | 支持图生图 |
|---|---|---|
google/gemini-2.5-flash-image | Nano Banana | 是 |
google/gemini-3-pro-image-preview | Nano Banana Pro | 是 |
bytedance-seed/seedream-4.5 | Seedream 4.5 | 否 |
完整模型配置见:config/models.py
六、关键技术栈
| 技术 | 用途 |
|---|---|
| FastAPI | Web 框架 |
| LangGraph | 状态管理 + 流式输出 |
| LangChain | LLM 抽象层 |
| OpenRouter | 多模型统一入口 |
| MySQL + PyMySQL | 数据持久化 |
| 腾讯云 COS | 图片存储 |
| SSE (Server-Sent Events) | 流式响应 |
| LangSmith | 可选的追踪/调试平台 |
七、扩展建议
- 先在
langgraph_chat.py中定义工具(可在文件顶部或单独tools/目录) - 修改
_build_graph()或chat_stream()中的 workflow - 添加条件边(conditional_edges)处理工具调用
- 测试流式输出是否正常
- 在 LangSmith 中追踪调试
八、参考资料
- LangGraph 官方文档
- LangChain 官方文档
- OpenRouter API 文档
- 项目内文档:
docs/LangSmith 集成指南.mddocs/OpenRouter 对话模型 API 文档.mddocs/OpenRouter 生图模型 API 文档.mddocs/LangGraph Checkpoint 表关系.md