Appearance
Video Agent 重构说明(2026-03-27)
将视频生成系统从线性管道重构为 LLM 驱动的 ReAct Agent,实现多轮对话、后台轮询、WebSocket 实时推送。
架构概览
前端 SSE 请求 后端 VideoStreamService
│ │
│ POST /api/video/stream │
├───────────────────────────────────►│
│ │ ┌──────────────────────┐
│ SSE: text-delta (LLM 推理文本) │ │ VideoAgent (ReAct) │
│◄───────────────────────────────────┤ │ │
│ │ │ reason → should_act │
│ SSE: finish (video_submitted) │ │ ↓ ↓ │
│◄───────────────────────────────────┤ │ tool_node respond │
│ │ │ ↓ ↑ │
│ (SSE 结束,前端展示 pending 卡片) │ │ extract_results │
│ │ └──────────┬───────────┘
│ │ │
│ │ asyncio.create_task
│ │ │
│ │ ┌──────────▼───────────┐
│ │ │ Background Poller │
│ │ │ (指数退避轮询 Kkidc) │
│ │ └──────────┬───────────┘
│ │ │
│ │ 成功: 下载视频 → COS 转存
│ │ 更新 graph state + DB
│ │ │
│ WebSocket 推送 │ ┌──────────▼───────────┐
│ video_completed / video_failed │ │ ConnectionManager │
│◄───────────────────────────────────┤ │ /ws/video/{threadId}│
│ │ └──────────────────────┘
│ 前端精准替换对应卡片 │核心设计决策
| 决策 | 选择 | 原因 |
|---|---|---|
| Provider | Kkidc(快快) | 替代原 Volcengine,统一 API 体系 |
| 架构 | ReAct Agent | LLM 分析意图、优化 prompt,真正具备 Agent 能力 |
| 会话上下文 | 共享 thread_id + checkpointer | Video Agent 可读取 Chat Agent 的历史消息 |
| 实时推送 | WebSocket(主)+ 批量查询(降级) | WebSocket 实时性好,批量查询用于页面刷新恢复 |
| 任务匹配 | custom_task_id + user_message_id | 双主键确保多任务并发时精准匹配 |
| 视频持久化 | COS 转存 | 生成完成后立即下载转存,不依赖平台临时 URL |
LangGraph 工作流
START → reason → should_act ─Yes→ tool_node → extract_results →
│ │ ↓
└───────No──────┘ respond → END节点职责
| 节点 | 文件 | 职责 |
|---|---|---|
reason | agent/video/nodes/reason.py | LLM 分析用户意图,优化 prompt,决定是否调用 Tool |
tool_node | LangGraph 内置 ToolNode | 执行 submit_video_task Tool,提交视频到 Kkidc |
extract_results | agent/video/nodes/extract_results.py | 从 Tool 缓存提取 VideoTask 写入 state |
respond | agent/video/nodes/respond.py | LLM 根据提交结果组织自然语言回复 |
State 定义(agent/video/state.py)
VideoAgentState 继承 ChatState,复用 messages 字段实现会话上下文共享:
python
class VideoTask(TypedDict):
task_id: str # Kkidc 平台返回的原始 ID
custom_task_id: str # 后端 UUID,前端主键
user_message_id: str # 前端消息 ID,用于精准匹配
prompt: str
model: str
status: str # queued / processing / succeeded / failed / timeout
video_url: Optional[str]
cos_key: Optional[str]
created_at: str
last_checked: Optional[str]
error: Optional[str]
prompt_summary: str # 简短摘要,用于 WebSocket 推送
class VideoAgentState(ChatState):
video_model: str
reference_images: Optional[List[str]]
aspect_ratio: Optional[str]
duration: int
generate_audio: bool
user_message_id: Optional[str]
video_tasks: Annotated[List[VideoTask], "append"]Tool 定义(agent/video/tools.py)
submit_video_task— 提交视频生成任务,调用 Kkidc API,返回 task_id- 支持文生视频、图生视频(reference_images / first_frame_image / last_frame_image)
- 通过模块级全局变量在 Tool 和 extract_results 节点之间传递结果
check_video_status— 查询任务状态(预留,目前返回提示信息)
后台轮询(agent/video/background.py)
start_video_poller(task, thread_id, ...)
│
├── 轮询 Kkidc(指数退避:15s → 22s → 33s → ... → 60s max)
│
├── succeeded → 下载视频 → COS 转存
│ → aupdate_state(status=succeeded)
│ → 更新 ProviderTask DB
│ → WebSocket broadcast(video_completed)
│
├── failed → aupdate_state(status=failed)
│ → 更新 ProviderTask DB
│ → WebSocket broadcast(video_failed)
│
└── timeout(600s)→ aupdate_state(status=timeout)
→ WebSocket broadcast(video_failed)WebSocket 协议
连接端点
ws://{host}/ws/video/{thread_id}前端心跳
每 30 秒发送 ping,后端回复 pong。
推送消息格式
json
// 视频完成
{
"type": "video_completed",
"custom_task_id": "uuid",
"user_message_id": "msg_xxx",
"video_url": "https://cos.../video.mp4",
"prompt_summary": "日落海滩..."
}
// 视频失败
{
"type": "video_failed",
"custom_task_id": "uuid",
"user_message_id": "msg_xxx",
"error": "生成超时"
}前端通过 custom_task_id 和 user_message_id 精准匹配对应的视频卡片并更新。
SSE 协议(AI SDK UI Protocol)
POST /api/video/stream 返回 text/event-stream:
| 事件 | 说明 |
|---|---|
start | 流开始 |
text-start | 文本开始 |
text-delta | LLM 推理文本增量(分析意图、提交确认等) |
text-end | 文本结束 |
finish | 流结束,data 包含 data-videoGeneration 类型数据 |
finish 事件的 data 部分
json
[{
"type": "data-videoGeneration",
"data": {
"prompt": "...",
"model": "kuaikuai-2-pro",
"taskId": "custom_task_id",
"customTaskId": "custom_task_id",
"userMessageId": "msg_xxx",
"status": "pending",
"videoUrl": null,
"aspectRatio": "16:9"
}
}]API 端点
| 方法 | 路径 | 说明 |
|---|---|---|
| POST | /api/video/stream | SSE 流式视频生成(主接口) |
| WS | /ws/video/{thread_id} | WebSocket 实时推送 |
| GET | /api/video/models | 获取支持的视频模型列表 |
| GET | /api/video/rate-limit-status | 获取当前用户限流状态 |
| POST | /api/video/status/batch | 批量查询任务状态(降级方案) |
前端架构
新增文件
| 文件 | 职责 |
|---|---|
features/workbench/lib/video-websocket.ts | WebSocket 管理器单例,支持自动重连、心跳 |
features/workbench/hooks/use-video-websocket.ts | WebSocket hook,连接/消息处理/更新 UI |
features/workbench/hooks/use-resume-pending-video-tasks.ts | 页面加载时批量查询 pending 任务(一次性,不再轮询) |
改动文件
| 文件 | 变更 |
|---|---|
features/workbench/lib/video-generation.ts | 从 POST 改为 SSE fetch,解析 AI SDK 事件流 |
features/workbench/lib/video-polling.ts | 移除轮询循环,仅保留 batchQueryVideoTaskStatus |
features/workbench/hooks/use-chat-session.ts | 集成 useVideoWebSocket hook |
features/workbench/hooks/use-submission-handler.ts | 适配新的 SSE 提交流程 |
lib/chat/types.ts | 新增 VideoWsMessage 类型 |
lib/chat/adapter.ts | 解析 custom_task_id、user_message_id |
本次变更摘要
新建文件(后端)
| 文件 | 说明 |
|---|---|
agent/video/background.py | 后台轮询器(asyncio.create_task + 指数退避) |
agent/video/websocket_manager.py | WebSocket 连接管理(按 thread_id 广播) |
agent/video/prompts.py | Video Agent 系统提示词 |
agent/video/nodes/reason.py | reason 节点(LLM 分析意图 + bind_tools) |
agent/video/nodes/extract_results.py | 结果提取节点(Tool 缓存 → state) |
agent/video/nodes/respond.py | respond 节点(LLM 组织最终回复) |
services/video_stream_service.py | 视频流式服务(SSE + 消息存储 + 启动轮询) |
重写文件(后端)
| 文件 | 变更 |
|---|---|
agent/video/state.py | 新 VideoAgentState,继承 ChatState |
agent/video/tools.py | ReAct Tool 封装(submit_video_task) |
agent/video/graph.py | ReAct StateGraph(reason → tool → extract → respond) |
agent/video/agent.py | 流式 video_stream(),产出 progress/video_submitted/complete 事件 |
agent/video/config.py | 精简为视频专用配置 + 限流配置 |
api/video.py | SSE 端点 + WebSocket 端点 |
schemas/video.py | VideoStreamRequest 等新 schema |
services/providers/kkidc/__init__.py | 新增 submit_video_task / query_video_task 方法 |
删除文件
| 文件 | 原因 |
|---|---|
agent/video/nodes/preprocess.py | 旧线性管道,被 reason 节点替代 |
agent/video/nodes/submit.py | 旧线性管道,被 tool_node 替代 |
services/video_service.py | 功能迁移到 background.py + video_stream_service.py |
前端新建
| 文件 | 说明 |
|---|---|
features/workbench/lib/video-websocket.ts | WebSocket 管理器(自动重连 + 心跳) |
features/workbench/hooks/use-video-websocket.ts | WebSocket hook |
配置项
| 环境变量 | 默认值 | 说明 |
|---|---|---|
VIDEO_REASONING_MODEL | claude-sonnet-4-6 | Video Agent 推理用的 LLM |
VIDEO_REASONING_PROVIDER | kkidc | 推理模型所属 provider |
VIDEO_POLL_TIMEOUT | 600 | 后台轮询超时(秒) |
VIDEO_POLL_INTERVAL | 15 | 初始轮询间隔(秒) |
VIDEO_POLL_MAX_INTERVAL | 60 | 最大轮询间隔(秒) |
VIDEO_POLL_BACKOFF_FACTOR | 1.5 | 退避因子 |
KKIDC_SEEDANCE_API_KEY | — | Kkidc 视频生成专用 API Key |
SEEDANCE_2_0_RATE_LIMITER_ENABLED | true | 限流开关 |
SEEDANCE_2_0_RATE_LIMITER_MAX_GLOBAL_CONCURRENT | 30 | 全站最大并发 |
SEEDANCE_2_0_RATE_LIMITER_MAX_USER_CONCURRENT | 5 | 单用户最大并发 |