Skip to content

LangGraph Memory 架构详解

LangGraph 提供了完整的 Memory 机制,用于在多轮对话和跨会话中保持上下文。本文档详细介绍 LangGraph 的 Memory 架构及其使用方法。

架构概览

┌─────────────────────────────────────────────────────────────────┐
│                     LangGraph Memory 架构                        │
├────────────────────────┬────────────────────────────────────────┤
│   Short-term Memory    │         Long-term Memory               │
│      (短期记忆)          │           (长期记忆)                    │
├────────────────────────┼────────────────────────────────────────┤
│    • Checkpointer      │    • Store (BaseStore)                 │
│    • State             │    • Semantic Search (语义搜索)          │
│    • Trimming          │                                        │
│    • Summarization     │                                        │
└────────────────────────┴────────────────────────────────────────┘

一、Short-term Memory(短期记忆)

短期记忆用于在单次会话(thread)中保持上下文,主要包含以下组件:

1.1 Checkpointer(状态持久化)

Checkpointer 用于保存图的执行状态,支持暂停、恢复、时间旅行(回到历史状态)等功能。

支持的 Checkpointer 类型

类型持久化分布式速度适用场景
InMemorySaver⭐⭐⭐⭐⭐开发测试
SqliteSaver✅ 文件⭐⭐⭐⭐单机小规模
PostgresSaver✅ 数据库⭐⭐⭐生产环境
RedisSaver✅ 可选⭐⭐⭐⭐高并发生产

使用示例

InMemorySaver(内存存储)

python
from langgraph.checkpoint.memory import InMemorySaver

checkpointer = InMemorySaver()
graph = builder.compile(checkpointer=checkpointer)

config = {"configurable": {"thread_id": "1"}}
graph.invoke({"messages": [{"role": "user", "content": "hi"}]}, config)

SqliteSaver(SQLite 存储)

python
import sqlite3
from langgraph.checkpoint.sqlite import SqliteSaver

checkpointer = SqliteSaver(sqlite3.connect("checkpoint.db"))
graph = builder.compile(checkpointer=checkpointer)

PostgresSaver(PostgreSQL 存储)

python
from langgraph.checkpoint.postgres import PostgresSaver

DB_URI = "postgresql://user:pass@localhost:5432/db"
checkpointer = PostgresSaver.from_conn_string(DB_URI)
checkpointer.setup()  # 首次需要初始化表结构
graph = builder.compile(checkpointer=checkpointer)

异步版本:

python
from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver

async with AsyncPostgresSaver.from_conn_string(DB_URI) as checkpointer:
    graph = builder.compile(checkpointer=checkpointer)

RedisSaver(Redis 存储)

python
from langgraph.checkpoint.redis import RedisSaver

DB_URI = "redis://localhost:6379"
checkpointer = RedisSaver.from_conn_string(DB_URI)
checkpointer.setup()
graph = builder.compile(checkpointer=checkpointer)

异步版本:

python
from langgraph.checkpoint.redis.aio import AsyncRedisSaver

async with AsyncRedisSaver.from_conn_string(DB_URI) as checkpointer:
    graph = builder.compile(checkpointer=checkpointer)

1.2 Trimming(消息裁剪)

当对话超出上下文窗口时,可以使用 Trimming 裁剪旧消息:

python
from langchain_core.messages import trim_messages
from langchain_core.messages import count_tokens_approximately

messages = trim_messages(
    state["messages"],
    strategy="last",           # 保留最近的消息
    token_counter=count_tokens_approximately,
    max_tokens=128,            # 最大 token 数
    start_on="human",          # 从人类消息开始
    end_on=("human", "tool"),  # 在人类/工具消息结束
)

参数说明:

  • strategy: 裁剪策略,"last" 表示保留最近的
  • token_counter: Token 计数方法
  • max_tokens: 最大允许的 token 数
  • start_on: 裁剪后的消息从哪种类型开始
  • end_on: 裁剪后的消息在哪种类型结束

1.3 Summarization(对话摘要)

当对话过长时,可以用摘要替代旧消息,保留关键信息:

python
def summarize_conversation(state: State):
    summary = state.get("summary", "")

    # 根据是否有已有摘要,构建不同的提示
    if summary:
        summary_message = (
            f"这是之前的对话摘要: {summary}\n\n"
            "根据上面的新消息扩展摘要:"
        )
    else:
        summary_message = "为上面的对话创建摘要:"

    # 调用模型生成摘要
    messages = state["messages"] + [HumanMessage(content=summary_message)]
    response = model.invoke(messages)

    # 只保留最近 2 条消息,删除其余的(用摘要替代)
    return {
        "summary": response.content,
        "messages": state["messages"][-2:]
    }

二、Long-term Memory(长期记忆)

长期记忆用于跨会话、跨线程存储用户数据、偏好和历史信息。

2.1 Store(跨会话存储)

Store 提供跨线程的数据持久化能力,可以存储用户级别的信息。

支持的 Store 类型

类型持久化分布式适用场景
InMemoryStore开发测试
PostgresStore生产环境
RedisStore高并发生产

使用示例

基本用法:

python
from langgraph.store.memory import InMemoryStore
from langgraph.runtime import Runtime

store = InMemoryStore()

async def call_model(state: MessagesState, runtime: Runtime[Context]):
    namespace = (runtime.context.user_id, "memories")

    # 搜索相关记忆
    memories = await runtime.store.asearch(
        namespace,
        query=state["messages"][-1].content,
        limit=3
    )

    # 存储新记忆
    await runtime.store.aput(
        namespace,
        str(uuid.uuid4()),
        {"data": "用户偏好深色模式"}
    )

    return {"messages": [response]}

# 编译图时传入 store
builder = StateGraph(MessagesState, context_schema=Context)
builder.add_node(call_model)
graph = builder.compile(store=store)

PostgresStore:

python
from langgraph.store.postgres import PostgresStore

DB_URI = "postgresql://user:pass@localhost:5432/db"
store = PostgresStore.from_conn_string(DB_URI)
store.setup()  # 首次需要初始化

graph = builder.compile(checkpointer=checkpointer, store=store)

RedisStore:

python
from langgraph.store.redis import RedisStore

DB_URI = "redis://localhost:6379"
store = RedisStore.from_conn_string(DB_URI)
store.setup()

graph = builder.compile(checkpointer=checkpointer, store=store)

2.2 Semantic Search(语义搜索)

Store 支持基于向量嵌入的语义搜索,可以智能检索相关记忆:

python
from langchain.embeddings import init_embeddings
from langgraph.store.memory import InMemoryStore

# 启用语义搜索
embeddings = init_embeddings("openai:text-embedding-3-small")
store = InMemoryStore(
    index={
        "embed": embeddings,
        "dims": 1536,
    }
)

# 存储记忆
store.put(("user_123", "memories"), "1", {"text": "我喜欢披萨"})
store.put(("user_123", "memories"), "2", {"text": "我是个水管工"})
store.put(("user_123", "memories"), "3", {"text": "我住在上海"})

# 语义搜索(会根据语义相关性返回结果)
items = store.search(
    ("user_123", "memories"),
    query="我饿了",  # 会找到"我喜欢披萨"
    limit=2
)

for item in items:
    print(item.value["text"])

完整示例(带语义搜索的对话):

python
from langchain.embeddings import init_embeddings
from langchain.chat_models import init_chat_model
from langgraph.store.memory import InMemoryStore
from langgraph.graph import START, MessagesState, StateGraph
from langgraph.runtime import Runtime

model = init_chat_model("gpt-4.1-mini")

# 创建支持语义搜索的 Store
embeddings = init_embeddings("openai:text-embedding-3-small")
store = InMemoryStore(
    index={
        "embed": embeddings,
        "dims": 1536,
    }
)

async def chat(state: MessagesState, runtime: Runtime):
    # 基于用户消息进行语义搜索
    items = await runtime.store.asearch(
        ("user_123", "memories"),
        query=state["messages"][-1].content,
        limit=2
    )

    # 构建记忆上下文
    memories = "\n".join(item.value["text"] for item in items)
    system_msg = f"You are a helpful assistant.\n## User memories:\n{memories}"

    response = await model.ainvoke([
        {"role": "system", "content": system_msg},
        *state["messages"],
    ])
    return {"messages": [response]}

builder = StateGraph(MessagesState)
builder.add_node(chat)
builder.add_edge(START, "chat")
graph = builder.compile(store=store)

三、完整集成示例

以下示例同时使用了 Checkpointer(短期记忆)和 Store(长期记忆):

python
from dataclasses import dataclass
from langchain.chat_models import init_chat_model
from langgraph.graph import StateGraph, MessagesState, START
from langgraph.checkpoint.postgres import PostgresSaver
from langgraph.store.postgres import PostgresStore
from langgraph.runtime import Runtime
import uuid

model = init_chat_model("claude-sonnet-4-6")

@dataclass
class Context:
    user_id: str

def call_model(state: MessagesState, runtime: Runtime[Context]):
    user_id = runtime.context.user_id
    namespace = ("memories", user_id)

    # 搜索用户相关的长期记忆
    memories = runtime.store.search(
        namespace,
        query=str(state["messages"][-1].content)
    )
    info = "\n".join([d.value["data"] for d in memories])

    # 如果用户要求记住某些信息,存储到长期记忆
    last_message = state["messages"][-1]
    if "记住" in last_message.content.lower():
        memory = "用户的重要信息..."
        runtime.store.put(namespace, str(uuid.uuid4()), {"data": memory})

    # 构建系统提示,包含长期记忆
    system_msg = f"You are a helpful assistant.\nUser info: {info}"
    response = model.invoke([
        {"role": "system", "content": system_msg}
    ] + state["messages"])

    return {"messages": response}

DB_URI = "postgresql://postgres:postgres@localhost:5432/langgraph"

with (
    PostgresStore.from_conn_string(DB_URI) as store,
    PostgresSaver.from_conn_string(DB_URI) as checkpointer,
):
    store.setup()
    checkpointer.setup()

    builder = StateGraph(MessagesState, context_schema=Context)
    builder.add_node(call_model)
    builder.add_edge(START, "call_model")

    # 同时传入 checkpointer 和 store
    graph = builder.compile(checkpointer=checkpointer, store=store)

    # 调用图
    config = {"configurable": {"thread_id": "conversation-1"}}
    for chunk in graph.stream(
        {"messages": [{"role": "user", "content": "你好,记住我的名字是小明"}]},
        config,
        stream_mode="values",
        context=Context(user_id="user-123"),
    ):
        chunk["messages"][-1].pretty_print()

四、组件对比总结

组件持久化跨线程作用域特性
Checkpointer单线程暂停/恢复、时间旅行
State单次调用图执行时的临时状态
Trimming--会话内消息裁剪
Summarization--会话内对话摘要
Store跨会话跨线程数据存储
Semantic Search--全局基于语义检索记忆

五、最佳实践

5.1 开发环境

python
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.store.memory import InMemoryStore

checkpointer = InMemorySaver()
store = InMemoryStore()

5.2 生产环境

python
from langgraph.checkpoint.postgres import PostgresSaver
from langgraph.store.postgres import PostgresStore

DB_URI = "postgresql://user:pass@host:5432/db"
checkpointer = PostgresSaver.from_conn_string(DB_URI)
store = PostgresStore.from_conn_string(DB_URI)

5.3 高并发场景

python
from langgraph.checkpoint.redis import RedisSaver
from langgraph.store.redis import RedisStore

DB_URI = "redis://localhost:6379"
checkpointer = RedisSaver.from_conn_string(DB_URI)
store = RedisStore.from_conn_string(DB_URI)

5.4 记忆管理策略

  • 短对话:不需要特殊处理
  • 中等长度对话:使用 Trimming 裁剪旧消息
  • 长对话:使用 Summarization 生成摘要
  • 跨会话记忆:使用 Store 存储重要信息

六、相关文档