Appearance
LangGraph Interrupts 集成方案
一、模块概述
| 属性 | 说明 |
|---|---|
| 模块名称 | Interrupts(中断/人机协作) |
| 优先级 | 🟠 P1(中高) |
| 预估工时 | 1-2 天 |
| 依赖项 | langgraph.types, Checkpointer |
为什么需要
某些敏感操作需要人工确认后才能执行,提高系统安全性和可控性:
- 敏感操作审批(发送邮件、删除数据、执行支付)
- 内容审核(AI 生成内容需要确认后发布)
- 信息确认(关键信息需要用户确认)
二、架构设计
2.1 人机协作流程
┌──────────────────────────────────────────────────────────────────┐
│ Interrupts 人机协作流程 │
├──────────────────────────────────────────────────────────────────┤
│ │
│ 用户请求 ──► Agent 处理 ──► 检测敏感操作 │
│ │ │
│ ▼ │
│ ┌──────────────┐ │
│ │ interrupt │ │
│ │ (暂停执行) │ │
│ └──────┬───────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ 返回前端等待确认 │ │
│ │ __interrupts__ = [{action, details, message}] │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │ │
│ ┌──────────────┴──────────────┐ │
│ │ │ │
│ ▼ ▼ │
│ ┌──────────────┐ ┌──────────────┐ │
│ │ 用户批准 │ │ 用户拒绝 │ │
│ │ resume=True │ │ resume=False │ │
│ └──────┬───────┘ └──────┬───────┘ │
│ │ │ │
│ ▼ ▼ │
│ ┌──────────────┐ ┌──────────────┐ │
│ │ 执行操作 │ │ 取消操作 │ │
│ └──────────────┘ └──────────────┘ │
│ │
└──────────────────────────────────────────────────────────────────┘2.2 状态设计
python
from typing import TypedDict, Optional, List, Any
from langgraph.graph import MessagesState
class HumanApprovalState(MessagesState):
"""支持人机审批的状态"""
# 继承 messages 字段
pending_action: Optional[dict] # 待审批的操作
approved: Optional[bool] # 审批结果
action_result: Optional[str] # 操作执行结果三、代码实现
3.1 中断工具定义
创建文件: services/tools/approval.py
python
"""需要人工审批的工具
这些工具在执行敏感操作前会中断,等待用户确认。
"""
from typing import Optional
from langchain.tools import tool
import logging
logger = logging.getLogger(__name__)
@tool
def send_email(to: str, subject: str, body: str) -> dict:
"""
发送邮件。
这是一个需要人工审批的操作。在执行前会暂停,等待用户确认。
Args:
to: 收件人邮箱地址
subject: 邮件主题
body: 邮件正文
Returns:
包含操作信息的字典(实际发送在审批后执行)
"""
# 返回操作信息,实际执行在审批后
return {
"action": "send_email",
"requires_approval": True,
"details": {
"to": to,
"subject": subject,
"body": body[:100] + "..." if len(body) > 100 else body
}
}
@tool
def delete_conversation(conversation_id: str, confirm: bool = False) -> dict:
"""
删除对话记录。
这是一个需要人工审批的敏感操作。
Args:
conversation_id: 要删除的对话 ID
confirm: 确认删除(需要用户确认后设为 True)
Returns:
包含操作信息的字典
"""
return {
"action": "delete_conversation",
"requires_approval": True,
"details": {
"conversation_id": conversation_id
}
}
@tool
def update_user_info(field: str, new_value: str) -> dict:
"""
更新用户信息。
修改用户个人信息需要用户确认。
Args:
field: 要修改的字段名(如 nickname, email)
new_value: 新的值
Returns:
包含操作信息的字典
"""
return {
"action": "update_user_info",
"requires_approval": True,
"details": {
"field": field,
"new_value": new_value
}
}
# 需要审批的工具列表
APPROVAL_REQUIRED_TOOLS = [send_email, delete_conversation, update_user_info]3.2 带中断的 Agent 服务
创建文件: services/langgraph_approval.py
python
"""带人机审批的 LangGraph Agent
使用 interrupt 实现敏感操作的人工确认。
"""
import os
import logging
from typing import Optional, Dict, Any, Literal
from dataclasses import dataclass, field
from dotenv import load_dotenv
from langchain_core.messages import HumanMessage, AIMessage, SystemMessage
from langchain_openai import ChatOpenAI
from langgraph.graph import START, StateGraph, MessagesState, END
from langgraph.types import interrupt, Command
from langgraph.prebuilt import ToolNode
from services.tools import ALL_TOOLS
from services.tools.approval import APPROVAL_REQUIRED_TOOLS
load_dotenv(override=True)
logger = logging.getLogger(__name__)
@dataclass
class ApprovalConfig:
"""审批配置"""
api_key: str = field(default_factory=lambda: os.getenv("OPENROUTER_API_KEY", ""))
base_url: str = field(default_factory=lambda: os.getenv("OPENROUTER_BASE_URL", "https://openrouter.ai/api/v1"))
default_model: str = field(default_factory=lambda: os.getenv("OPENROUTER_MODEL", "openai/gpt-4o"))
class ApprovalEnabledAgent:
"""带人机审批的 Agent"""
# 需要审批的操作类型
APPROVAL_ACTIONS = {"send_email", "delete_conversation", "update_user_info"}
def __init__(self, config: Optional[ApprovalConfig] = None):
self.config = config or ApprovalConfig()
self.all_tools = ALL_TOOLS + APPROVAL_REQUIRED_TOOLS
def _get_llm(self, model: Optional[str] = None, temperature: float = 0.7) -> ChatOpenAI:
"""获取绑定了工具的 LLM"""
return ChatOpenAI(
model=model or self.config.default_model,
api_key=self.config.api_key,
base_url=self.config.base_url,
temperature=temperature
).bind_tools(self.all_tools)
def _should_continue(self, state: MessagesState) -> Literal["tools", "approval", END]:
"""判断下一步操作"""
last_message = state["messages"][-1]
if not last_message.tool_calls:
return END
# 检查是否有需要审批的工具
for tool_call in last_message.tool_calls:
if tool_call["name"] in self.APPROVAL_ACTIONS:
return "approval"
return "tools"
def _approval_node(self, state: MessagesState) -> Command[Literal["tools", "agent"]]:
"""审批节点 - 中断等待用户确认"""
last_message = state["messages"][-1]
tool_calls = last_message.tool_calls
# 收集所有需要审批的操作
approval_requests = []
for tool_call in tool_calls:
if tool_call["name"] in self.APPROVAL_ACTIONS:
approval_requests.append({
"tool_call_id": tool_call["id"],
"action": tool_call["name"],
"args": tool_call["args"]
})
# 中断等待用户确认
decision = interrupt({
"type": "approval_required",
"message": "以下操作需要您的确认",
"requests": approval_requests
})
# decision 可能是:
# - True: 批准所有
# - False: 拒绝所有
# - dict: {"tool_call_id": bool} 针对每个操作单独审批
if decision is True:
# 批准所有,继续执行工具
return Command(goto="tools")
elif decision is False:
# 拒绝所有,返回拒绝消息
return Command(
goto="agent",
update={"messages": [AIMessage(content="用户取消了操作。")]}
)
elif isinstance(decision, dict):
# 部分批准 - 这里简化处理,实际可能需要更复杂的逻辑
approved_ids = [k for k, v in decision.items() if v]
if approved_ids:
return Command(goto="tools")
return Command(
goto="agent",
update={"messages": [AIMessage(content="用户取消了操作。")]}
)
return Command(goto="tools")
def _build_graph(self, checkpointer, model: Optional[str] = None, system_prompt: Optional[str] = None):
"""构建带审批的工作流"""
llm = self._get_llm(model)
tool_node = ToolNode(self.all_tools)
def call_model(state: MessagesState):
messages = [SystemMessage(content=system_prompt or "你是一个有用的助手。")] + state["messages"]
response = llm.invoke(messages)
return {"messages": [response]}
workflow = StateGraph(MessagesState)
workflow.add_node("agent", call_model)
workflow.add_node("tools", tool_node)
workflow.add_node("approval", self._approval_node)
workflow.add_edge(START, "agent")
workflow.add_conditional_edges(
"agent",
self._should_continue,
{"tools": "tools", "approval": "approval", END: END}
)
workflow.add_edge("tools", "agent")
# 审批节点的条件跳转在节点内部通过 Command 实现
return workflow.compile(checkpointer=checkpointer)
def chat(self, thread_id: str, prompt: str, **kwargs) -> Dict[str, Any]:
"""对话(可能需要审批)"""
from services.checkpointer import get_checkpointer
with get_checkpointer() as checkpointer:
app = self._build_graph(checkpointer)
config = {"configurable": {"thread_id": thread_id}}
result = app.invoke(
{"messages": [HumanMessage(content=prompt)]},
config
)
# 检查是否需要审批
if "__interrupts__" in result:
return {
"needs_approval": True,
"interrupts": result["__interrupts__"],
"thread_id": thread_id
}
return {
"content": result["messages"][-1].content,
"needs_approval": False
}
def resume(self, thread_id: str, decision: bool) -> Dict[str, Any]:
"""恢复中断的执行"""
from services.checkpointer import get_checkpointer
with get_checkpointer() as checkpointer:
app = self._build_graph(checkpointer)
config = {"configurable": {"thread_id": thread_id}}
result = app.invoke(
Command(resume=decision),
config
)
return {"content": result["messages"][-1].content}
# 全局实例
_approval_agent: Optional[ApprovalEnabledAgent] = None
def get_approval_agent() -> ApprovalEnabledAgent:
global _approval_agent
if _approval_agent is None:
_approval_agent = ApprovalEnabledAgent()
return _approval_agent四、API 集成
4.1 新增 API 端点
修改 api/chat.py:
python
from services.langgraph_approval import get_approval_agent
@router.post("/chat/approval")
async def chat_with_approval(
data: ChatRequest,
request: Request,
db: Session = Depends(get_db)
):
"""可能需要审批的聊天"""
session = get_session(request)
agent = get_approval_agent()
thread_id = data.conversation_id or str(uuid.uuid4())
result = agent.chat(
thread_id=thread_id,
prompt=data.prompt,
model=data.model,
system_prompt=data.system_prompt
)
if result.get("needs_approval"):
# 需要审批,返回中断信息
return {
"success": True,
"needs_approval": True,
"thread_id": thread_id,
"interrupts": result["interrupts"]
}
return {
"success": True,
"needs_approval": False,
"content": result["content"]
}
@router.post("/chat/approval/resume")
async def resume_approval(
thread_id: str = Form(...),
approved: bool = Form(...),
request: Request = None
):
"""恢复中断的执行"""
agent = get_approval_agent()
result = agent.resume(thread_id=thread_id, decision=approved)
return {
"success": True,
"content": result["content"]
}五、前端集成
5.1 审批对话框组件
创建 static/js/components/approval-dialog.js:
javascript
/**
* 审批对话框组件
*/
class ApprovalDialog {
constructor() {
this.dialog = null;
}
/**
* 显示审批对话框
* @param {Object} interrupts - 中断信息
* @param {string} threadId - 线程 ID
* @returns {Promise<boolean>} - 用户决定
*/
show(interrupts, threadId) {
return new Promise((resolve) => {
// 创建对话框
this.dialog = document.createElement('div');
this.dialog.className = 'approval-dialog-overlay';
this.dialog.innerHTML = `
<div class="approval-dialog">
<div class="approval-header">
<h3>⚠️ 操作确认</h3>
</div>
<div class="approval-body">
${this._renderRequests(interrupts)}
</div>
<div class="approval-footer">
<button class="btn-approve" id="approve-btn">✓ 批准</button>
<button class="btn-reject" id="reject-btn">✗ 拒绝</button>
</div>
</div>
`;
document.body.appendChild(this.dialog);
// 绑定事件
document.getElementById('approve-btn').onclick = async () => {
await this._submitDecision(threadId, true);
resolve(true);
this.close();
};
document.getElementById('reject-btn').onclick = async () => {
await this._submitDecision(threadId, false);
resolve(false);
this.close();
};
});
}
_renderRequests(interrupts) {
return interrupts.map(intr => {
const req = intr.value.requests[0];
return `
<div class="approval-item">
<div class="approval-action">${this._getActionLabel(req.action)}</div>
<div class="approval-details">
${this._renderDetails(req.action, req.args)}
</div>
</div>
`;
}).join('');
}
_getActionLabel(action) {
const labels = {
'send_email': '📧 发送邮件',
'delete_conversation': '🗑️ 删除对话',
'update_user_info': '✏️ 更新信息'
};
return labels[action] || action;
}
_renderDetails(action, args) {
switch (action) {
case 'send_email':
return `
<p><strong>收件人:</strong> ${args.to}</p>
<p><strong>主题:</strong> ${args.subject}</p>
<p><strong>内容预览:</strong> ${args.body}</p>
`;
case 'delete_conversation':
return `<p><strong>对话 ID:</strong> ${args.conversation_id}</p>`;
case 'update_user_info':
return `
<p><strong>修改字段:</strong> ${args.field}</p>
<p><strong>新值:</strong> ${args.new_value}</p>
`;
default:
return `<pre>${JSON.stringify(args, null, 2)}</pre>`;
}
}
async _submitDecision(threadId, approved) {
try {
const response = await fetch('/api/chat/approval/resume', {
method: 'POST',
headers: {'Content-Type': 'application/json'},
body: JSON.stringify({thread_id: threadId, approved: approved})
});
const result = await response.json();
if (result.success) {
// 继续显示 AI 响应
appendMessage('assistant', result.content);
}
} catch (error) {
console.error('提交审批决定失败:', error);
}
}
close() {
if (this.dialog) {
this.dialog.remove();
this.dialog = null;
}
}
}
// 全局实例
const approvalDialog = new ApprovalDialog();5.2 CSS 样式
添加到 static/css/index.css:
css
/* 审批对话框样式 */
.approval-dialog-overlay {
position: fixed;
top: 0;
left: 0;
right: 0;
bottom: 0;
background: rgba(0, 0, 0, 0.5);
display: flex;
align-items: center;
justify-content: center;
z-index: 1000;
}
.approval-dialog {
background: white;
border-radius: 12px;
max-width: 500px;
width: 90%;
box-shadow: 0 4px 20px rgba(0, 0, 0, 0.15);
}
.approval-header {
padding: 20px;
border-bottom: 1px solid #eee;
}
.approval-header h3 {
margin: 0;
color: #ff9800;
}
.approval-body {
padding: 20px;
max-height: 300px;
overflow-y: auto;
}
.approval-item {
background: #f5f5f5;
padding: 15px;
border-radius: 8px;
margin-bottom: 10px;
}
.approval-action {
font-weight: bold;
margin-bottom: 10px;
}
.approval-details p {
margin: 5px 0;
font-size: 14px;
}
.approval-footer {
padding: 20px;
border-top: 1px solid #eee;
display: flex;
gap: 10px;
justify-content: flex-end;
}
.btn-approve, .btn-reject {
padding: 10px 20px;
border: none;
border-radius: 6px;
cursor: pointer;
font-size: 14px;
transition: all 0.2s;
}
.btn-approve {
background: #4caf50;
color: white;
}
.btn-approve:hover {
background: #43a047;
}
.btn-reject {
background: #f44336;
color: white;
}
.btn-reject:hover {
background: #e53935;
}六、测试计划
6.1 单元测试
python
# tests/test_approval.py
import pytest
from services.langgraph_approval import ApprovalEnabledAgent
def test_should_continue_no_tool_calls():
"""测试无工具调用时直接结束"""
agent = ApprovalEnabledAgent()
from langchain_core.messages import AIMessage
state = {"messages": [AIMessage(content="你好")]}
result = agent._should_continue(state)
assert result == END
def test_should_continue_normal_tool():
"""测试普通工具调用"""
agent = ApprovalEnabledAgent()
from langchain_core.messages import AIMessage
msg = AIMessage(content="")
msg.tool_calls = [{"name": "search_web", "args": {"query": "test"}}]
state = {"messages": [msg]}
result = agent._should_continue(state)
assert result == "tools"
def test_should_continue_approval_tool():
"""测试需要审批的工具调用"""
agent = ApprovalEnabledAgent()
from langchain_core.messages import AIMessage
msg = AIMessage(content="")
msg.tool_calls = [{"name": "send_email", "args": {"to": "test@example.com"}}]
state = {"messages": [msg]}
result = agent._should_continue(state)
assert result == "approval"七、实施步骤
步骤 1: 创建审批工具(0.5 天)
- 创建
services/tools/approval.py - 定义需要审批的工具
- 编写单元测试
步骤 2: 创建审批 Agent(0.5 天)
- 创建
services/langgraph_approval.py - 实现
_approval_node中断节点 - 实现
resume方法
步骤 3: API 集成(0.25 天)
- 添加
/api/chat/approval端点 - 添加
/api/chat/approval/resume端点
步骤 4: 前端集成(0.5 天)
- 创建审批对话框组件
- 添加 CSS 样式
- 集成到聊天流程
步骤 5: 测试(0.25 天)
- 端到端测试
- 修复问题