Appearance
LangGraph Parallel 集成方案
一、模块概述
| 属性 | 说明 |
|---|---|
| 模块名称 | Parallel(并行执行) |
| 优先级 | 🟢 P3(中) |
| 预估工时 | 1 天 |
| 依赖项 | langgraph.graph |
为什么需要
某些场景需要同时执行多个独立任务:
- 多模型对比:同时调用多个模型,对比输出结果
- 多任务处理:同时生成文本、图片、摘要等
- 多角度分析:从不同角度分析同一问题
- 性能提升:并行执行可以显著减少总耗时
二、架构设计
2.1 并行执行流程
┌───────────────────────────────────────────────────────────────┐
│ Parallel 并行执行流程 │
├───────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────┐ │
│ │ START │ │
│ └────┬─────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────────────────────────────────────────┐ │
│ │ Fan-out (分支) │ │
│ │ │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │
│ │ │ Task A │ │ Task B │ │ Task C │ │ │
│ │ │ (并行) │ │ (并行) │ │ (并行) │ │ │
│ │ └────┬─────┘ └────┬─────┘ └────┬─────┘ │ │
│ │ │ │ │ │ │
│ └────────┼─────────────┼─────────────┼─────────────────┘ │
│ │ │ │ │
│ └─────────────┼─────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────────────────────────────────────────┐ │
│ │ Fan-in (汇聚) │ │
│ │ ┌──────────┐ │ │
│ │ │Aggregator│ │ │
│ │ │ (聚合) │ │ │
│ │ └────┬─────┘ │ │
│ └───────────────────────┼──────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────┐ │
│ │ END │ │
│ └──────────┘ │
│ │
└───────────────────────────────────────────────────────────────┘2.2 状态设计
python
from typing import TypedDict, Annotated
from operator import add
class ParallelState(TypedDict):
"""并行执行状态"""
input: str # 输入内容
results: Annotated[list, add] # 并行结果(使用 add reducer 合并)
final_output: str # 最终聚合输出关键点: 使用 Annotated[list, add] 让并行节点的结果自动合并,而不是覆盖。
三、代码实现
3.1 并行 Agent 服务
创建文件: services/parallel_agent.py
python
"""并行执行 Agent 服务
支持同时执行多个独立任务并聚合结果。
"""
import os
import logging
from typing import TypedDict, Annotated, Optional, List, Dict, Any
from operator import add
from dataclasses import dataclass, field
from dotenv import load_dotenv
from langchain_core.messages import HumanMessage
from langchain_openai import ChatOpenAI
from langgraph.graph import START, StateGraph, END
load_dotenv(override=True)
logger = logging.getLogger(__name__)
@dataclass
class ParallelConfig:
"""并行执行配置"""
api_key: str = field(default_factory=lambda: os.getenv("OPENROUTER_API_KEY", ""))
base_url: str = field(default_factory=lambda: os.getenv("OPENROUTER_BASE_URL", "https://openrouter.ai/api/v1"))
class ParallelState(TypedDict):
"""并行执行状态"""
input: str
results: Annotated[list, add] # 使用 add reducer 合并结果
final_output: str
class ParallelAgent:
"""并行执行 Agent"""
def __init__(self, config: Optional[ParallelConfig] = None):
self.config = config or ParallelConfig()
def _get_llm(self, model: str, temperature: float = 0.7) -> ChatOpenAI:
"""获取 LLM 实例"""
return ChatOpenAI(
model=model,
api_key=self.config.api_key,
base_url=self.config.base_url,
temperature=temperature
)
def create_multi_model_comparison(self, models: List[str]):
"""
创建多模型对比工作流
同时调用多个模型,对比输出结果。
Args:
models: 模型列表,如 ["openai/gpt-4o", "anthropic/claude-3.5-sonnet"]
"""
def create_model_node(model_name: str):
"""创建模型节点工厂"""
def node(state: ParallelState) -> ParallelState:
llm = self._get_llm(model_name)
response = llm.invoke(state["input"])
return {
"results": [{
"model": model_name,
"content": response.content
}]
}
return node
def aggregator(state: ParallelState) -> ParallelState:
"""聚合结果"""
output = "# 多模型对比结果\n\n"
for result in state["results"]:
model_name = result.get("model", "unknown")
content = result.get("content", "")
output += f"## {model_name}\n\n{content}\n\n---\n\n"
return {"final_output": output}
# 构建工作流
builder = StateGraph(ParallelState)
# 添加聚合节点
builder.add_node("aggregator", aggregator)
# 为每个模型添加节点
for i, model in enumerate(models):
node_name = f"model_{i}"
builder.add_node(node_name, create_model_node(model))
# 并行边:START 同时指向所有模型节点
builder.add_edge(START, node_name)
# 所有模型节点指向聚合节点
builder.add_edge(node_name, "aggregator")
builder.add_edge("aggregator", END)
return builder.compile()
def create_multi_task_workflow(self, tasks: List[Dict[str, str]]):
"""
创建多任务并行工作流
同时执行多个不同类型的任务。
Args:
tasks: 任务列表,每个任务包含 {"name": "任务名", "prompt": "提示模板"}
"""
def create_task_node(task: Dict[str, str]):
"""创建任务节点工厂"""
def node(state: ParallelState) -> ParallelState:
llm = self._get_llm("openai/gpt-4o-mini") # 任务节点使用小模型
prompt = task["prompt"].format(input=state["input"])
response = llm.invoke(prompt)
return {
"results": [{
"task": task["name"],
"content": response.content
}]
}
return node
def aggregator(state: ParallelState) -> ParallelState:
"""聚合结果"""
output = ""
for result in state["results"]:
task_name = result.get("task", "unknown")
content = result.get("content", "")
output += f"### {task_name}\n\n{content}\n\n"
return {"final_output": output}
# 构建工作流
builder = StateGraph(ParallelState)
builder.add_node("aggregator", aggregator)
for task in tasks:
node_name = f"task_{task['name'].replace(' ', '_')}"
builder.add_node(node_name, create_task_node(task))
builder.add_edge(START, node_name)
builder.add_edge(node_name, "aggregator")
builder.add_edge("aggregator", END)
return builder.compile()
def create_multi_perspective_analysis(self):
"""
创建多角度分析工作流
从不同角度分析同一问题。
"""
perspectives = [
{"name": "技术角度", "prompt": "从技术角度分析:{input}"},
{"name": "商业角度", "prompt": "从商业角度分析:{input}"},
{"name": "用户角度", "prompt": "从用户体验角度分析:{input}"},
]
return self.create_multi_task_workflow(perspectives)
def compare_models(
self,
prompt: str,
models: List[str]
) -> Dict[str, Any]:
"""
对比多个模型的输出
Args:
prompt: 输入提示
models: 要对比的模型列表
Returns:
包含各模型输出和聚合结果的字典
"""
graph = self.create_multi_model_comparison(models)
result = graph.invoke({"input": prompt, "results": [], "final_output": ""})
return {
"results": result["results"],
"comparison": result["final_output"]
}
def analyze_from_multiple_perspectives(self, topic: str) -> Dict[str, Any]:
"""
多角度分析
Args:
topic: 分析主题
Returns:
包含各角度分析结果的字典
"""
graph = self.create_multi_perspective_analysis()
result = graph.invoke({"input": topic, "results": [], "final_output": ""})
return {
"perspectives": result["results"],
"analysis": result["final_output"]
}
# 全局实例
_parallel_agent: Optional[ParallelAgent] = None
def get_parallel_agent() -> ParallelAgent:
"""获取并行 Agent 单例"""
global _parallel_agent
if _parallel_agent is None:
_parallel_agent = ParallelAgent()
return _parallel_agent3.2 API 端点
创建 api/parallel.py:
python
"""并行执行 API"""
import logging
from typing import List, Optional
from fastapi import APIRouter, Depends
from pydantic import BaseModel
from services.parallel_agent import get_parallel_agent
router = APIRouter(prefix="/api/parallel", tags=["parallel"])
logger = logging.getLogger(__name__)
class CompareRequest(BaseModel):
"""模型对比请求"""
prompt: str
models: List[str] # 如 ["openai/gpt-4o", "anthropic/claude-3.5-sonnet"]
class AnalyzeRequest(BaseModel):
"""多角度分析请求"""
topic: str
@router.post("/compare")
async def compare_models(data: CompareRequest):
"""
对比多个模型的输出
同时调用多个模型,返回对比结果。
"""
agent = get_parallel_agent()
try:
result = agent.compare_models(data.prompt, data.models)
return {
"success": True,
"results": result["results"],
"comparison": result["comparison"]
}
except Exception as e:
logger.error(f"模型对比失败: {e}", exc_info=True)
return {"success": False, "error": str(e)}
@router.post("/analyze")
async def multi_perspective_analyze(data: AnalyzeRequest):
"""
多角度分析
从技术、商业、用户等角度分析同一问题。
"""
agent = get_parallel_agent()
try:
result = agent.analyze_from_multiple_perspectives(data.topic)
return {
"success": True,
"perspectives": result["perspectives"],
"analysis": result["analysis"]
}
except Exception as e:
logger.error(f"多角度分析失败: {e}", exc_info=True)
return {"success": False, "error": str(e)}四、前端集成
4.1 模型对比组件
添加到 static/js/parallel.js:
javascript
class ModelComparison {
constructor() {
this.availableModels = [
{ id: 'openai/gpt-4o', name: 'GPT-4o' },
{ id: 'anthropic/claude-3.5-sonnet', name: 'Claude 3.5 Sonnet' },
{ id: 'google/gemini-pro', name: 'Gemini Pro' },
];
}
async compare(prompt, selectedModels) {
try {
const response = await fetch('/api/parallel/compare', {
method: 'POST',
headers: {'Content-Type': 'application/json'},
body: JSON.stringify({
prompt: prompt,
models: selectedModels
})
});
const data = await response.json();
if (data.success) {
return this.renderComparison(data.results);
} else {
throw new Error(data.error);
}
} catch (error) {
console.error('模型对比失败:', error);
throw error;
}
}
renderComparison(results) {
// 创建对比展示
const container = document.createElement('div');
container.className = 'model-comparison';
results.forEach(result => {
const card = document.createElement('div');
card.className = 'comparison-card';
card.innerHTML = `
<div class="model-name">${result.model}</div>
<div class="model-content">${result.content}</div>
`;
container.appendChild(card);
});
return container;
}
renderUI() {
// 渲染模型选择 UI
const container = document.createElement('div');
container.className = 'comparison-selector';
this.availableModels.forEach(model => {
const label = document.createElement('label');
label.innerHTML = `
<input type="checkbox" value="${model.id}" />
${model.name}
`;
container.appendChild(label);
});
return container;
}
}
class MultiPerspectiveAnalysis {
async analyze(topic) {
try {
const response = await fetch('/api/parallel/analyze', {
method: 'POST',
headers: {'Content-Type': 'application/json'},
body: JSON.stringify({ topic })
});
const data = await response.json();
if (data.success) {
return this.renderAnalysis(data.perspectives);
} else {
throw new Error(data.error);
}
} catch (error) {
console.error('多角度分析失败:', error);
throw error;
}
}
renderAnalysis(perspectives) {
const container = document.createElement('div');
container.className = 'perspective-analysis';
perspectives.forEach(p => {
const section = document.createElement('div');
section.className = 'perspective-section';
section.innerHTML = `
<h4 class="perspective-title">${p.task}</h4>
<div class="perspective-content">${p.content}</div>
`;
container.appendChild(section);
});
return container;
}
}
// 全局实例
const modelComparison = new ModelComparison();
const multiPerspective = new MultiPerspectiveAnalysis();4.2 CSS 样式
添加到 static/css/index.css:
css
/* 模型对比样式 */
.model-comparison {
display: grid;
grid-template-columns: repeat(auto-fit, minmax(300px, 1fr));
gap: 20px;
margin: 20px 0;
}
.comparison-card {
border: 1px solid #e0e0e0;
border-radius: 12px;
overflow: hidden;
}
.comparison-card .model-name {
background: #4a90d9;
color: white;
padding: 12px;
font-weight: bold;
}
.comparison-card .model-content {
padding: 15px;
font-size: 14px;
line-height: 1.6;
max-height: 400px;
overflow-y: auto;
}
/* 多角度分析样式 */
.perspective-analysis {
margin: 20px 0;
}
.perspective-section {
background: #f8f9fa;
border-radius: 8px;
margin-bottom: 15px;
overflow: hidden;
}
.perspective-title {
background: #e8f5e9;
color: #2e7d32;
padding: 12px;
margin: 0;
}
.perspective-content {
padding: 15px;
font-size: 14px;
line-height: 1.6;
}
/* 对比选择器样式 */
.comparison-selector {
display: flex;
gap: 15px;
padding: 15px;
background: #f5f5f5;
border-radius: 8px;
margin-bottom: 15px;
}
.comparison-selector label {
display: flex;
align-items: center;
gap: 8px;
cursor: pointer;
}五、使用场景
5.1 多模型对比
javascript
// 用户选择多个模型进行对比
const selectedModels = ['openai/gpt-4o', 'anthropic/claude-3.5-sonnet'];
const comparison = await modelComparison.compare(
"解释什么是机器学习",
selectedModels
);
document.getElementById('output').appendChild(comparison);5.2 多角度分析
javascript
// 从多个角度分析问题
const analysis = await multiPerspective.analyze(
"人工智能在医疗领域的应用前景"
);
document.getElementById('output').appendChild(analysis);5.3 自定义多任务
python
# 在后端创建自定义并行任务
agent = get_parallel_agent()
tasks = [
{"name": "摘要", "prompt": "请总结以下内容的要点:{input}"},
{"name": "关键词", "prompt": "提取以下内容的关键词:{input}"},
{"name": "翻译", "prompt": "将以下内容翻译成英文:{input}"},
]
graph = agent.create_multi_task_workflow(tasks)
result = graph.invoke({"input": "长文本内容...", "results": [], "final_output": ""})六、测试计划
6.1 单元测试
python
# tests/test_parallel.py
import pytest
from services.parallel_agent import ParallelAgent
def test_create_multi_model_comparison():
"""测试多模型对比工作流创建"""
agent = ParallelAgent()
models = ["openai/gpt-4o", "anthropic/claude-3.5-sonnet"]
graph = agent.create_multi_model_comparison(models)
assert graph is not None
def test_compare_models():
"""测试模型对比功能"""
agent = ParallelAgent()
models = ["openai/gpt-4o-mini"] # 使用小模型测试
result = agent.compare_models("你好", models)
assert "results" in result
assert len(result["results"]) == 1
def test_create_multi_perspective_analysis():
"""测试多角度分析工作流"""
agent = ParallelAgent()
graph = agent.create_multi_perspective_analysis()
assert graph is not None七、实施步骤
步骤 1: 创建并行 Agent(0.5 天)
- 创建
services/parallel_agent.py - 实现多模型对比工作流
- 实现多任务并行工作流
- 编写单元测试
步骤 2: API 集成(0.25 天)
- 创建
api/parallel.py - 注册路由到 main.py
步骤 3: 前端集成(0.25 天)
- 创建
static/js/parallel.js - 添加 CSS 样式
- 集成到聊天界面