Appearance
LangGraph + MemorySaver
你是一个 LangGraph 专家,现在帮我把一个现有的简单问答应用重构成使用 LangGraph + 持久化检查点的架构。
当前情况:
- 应用是一个简单的多轮对话问答(带会话记忆)
- 会话历史目前手动存储在 MySQL 里,表结构大致是:
- 表名:chat_sessions 或 conversations(你可以假设常见字段)
- 字段示例:session_id / thread_id (varchar), user_id (varchar), message_history (JSON or TEXT 存 [{"role":"user","content":"..."}, ...]), updated_at (timestamp), created_at 等
- 每次用户发消息时,先从 MySQL 读历史 → 塞进 prompt → 调用 LLM → 把新消息追加回历史 → 更新 MySQL
- 使用的是 langchain_openai.ChatOpenAI 或类似 LLM
- Prompt 是一个简单的 system prompt + 历史消息 + 当前用户输入
目标:
- 切换到 LangGraph 写法,使用 MessagesState(或自定义 State)
- 使用 checkpointer 实现自动持久化(不再手动读写 MySQL 的消息历史)
- 因为我已经有 MySQL,不想引入新数据库(PostgreSQL),所以优先使用 MySQL 作为检查点存储后端
- 如果官方没有原生 MySQLSaver,就使用社区的 langgraph-checkpoint-mysql 包(tjni/langgraph-checkpoint-mysql),或者指导我如何简单实现一个兼容的自定义 Checkpointer(但优先用现成包)
- 保持最简结构:一个 prompt node → LLM 调用 → 输出
- 支持多用户(用 thread_id / configurable.session_id 区分会话)
- 提供完整的可运行代码示例,包括:
- 必要的 pip install 命令
- 如何初始化 checkpointer(连接 MySQL,调用 .setup() 创建表)
- StateGraph 的定义(简单一个 agent node)
- compile 时传入 checkpointer
- 如何 invoke / stream(带 config={"configurable": {"thread_id": "xxx"}})
- 迁移旧数据的建议(如何把旧 MySQL 里的历史消息导入到 LangGraph checkpoint 里,作为初始 checkpoint)
- 额外要求:
- 用中文回复用户(system prompt 里写清楚)
- 支持 streaming 输出(如果可能)
- 说明未来如果想加工具、分支、人机交互等,如何扩展
- 告诉我用 LangSmith tracing 的好处和接入方式(可选)
请输出结构化的回答:
- 迁移步骤建议
- 必要的依赖安装
- 完整代码(Python)
- 如何测试多轮对话
- 旧数据迁移的伪代码 / 思路
- 注意事项(比如事务、性能、表结构变化等)
py
from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import START, MessagesState, StateGraph
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(model="gpt-4o-mini")
prompt = ChatPromptTemplate.from_messages([
("system", "你是友好的助手,用中文回复。"),
("placeholder", "{messages}"),
])
# 简单一个 node
def call_model(state: MessagesState):
chain = prompt | llm
return {"messages": [chain.invoke(state["messages"])]}
workflow = StateGraph(MessagesState)
workflow.add_edge(START, "agent")
workflow.add_node("agent", call_model)
# 加 checkpointer 就自动有了会话记忆
memory = MemorySaver()
app = workflow.compile(checkpointer=memory)
# 运行时传入 thread_id(相当于 session_id)
config = {"configurable": {"thread_id": "luca_thread_1"}}
# 第一轮
output = app.invoke({"messages": [("human", "你好,我是 Luca")]}, config)
print(output["messages"][-1].content)
# 第二轮自动记住
output2 = app.invoke({"messages": [("human", "我叫什么?")]}, config)