Skip to content

Video Agent 重构说明(2026-03-27)

将视频生成系统从线性管道重构为 LLM 驱动的 ReAct Agent,实现多轮对话、后台轮询、WebSocket 实时推送。

架构概览

前端 SSE 请求                     后端 VideoStreamService
    │                                    │
    │  POST /api/video/stream            │
    ├───────────────────────────────────►│
    │                                    │  ┌──────────────────────┐
    │  SSE: text-delta (LLM 推理文本)    │  │    VideoAgent (ReAct) │
    │◄───────────────────────────────────┤  │                      │
    │                                    │  │  reason → should_act │
    │  SSE: finish (video_submitted)     │  │    ↓           ↓     │
    │◄───────────────────────────────────┤  │  tool_node  respond  │
    │                                    │  │    ↓           ↑     │
    │  (SSE 结束,前端展示 pending 卡片)   │  │  extract_results    │
    │                                    │  └──────────┬───────────┘
    │                                    │             │
    │                                    │  asyncio.create_task
    │                                    │             │
    │                                    │  ┌──────────▼───────────┐
    │                                    │  │  Background Poller   │
    │                                    │  │  (指数退避轮询 Kkidc) │
    │                                    │  └──────────┬───────────┘
    │                                    │             │
    │                                    │  成功: 下载视频 → COS 转存
    │                                    │  更新 graph state + DB
    │                                    │             │
    │  WebSocket 推送                    │  ┌──────────▼───────────┐
    │  video_completed / video_failed    │  │  ConnectionManager   │
    │◄───────────────────────────────────┤  │  /ws/video/{threadId}│
    │                                    │  └──────────────────────┘
    │  前端精准替换对应卡片               │

核心设计决策

决策选择原因
ProviderKkidc(快快)替代原 Volcengine,统一 API 体系
架构ReAct AgentLLM 分析意图、优化 prompt,真正具备 Agent 能力
会话上下文共享 thread_id + checkpointerVideo Agent 可读取 Chat Agent 的历史消息
实时推送WebSocket(主)+ 批量查询(降级)WebSocket 实时性好,批量查询用于页面刷新恢复
任务匹配custom_task_id + user_message_id双主键确保多任务并发时精准匹配
视频持久化COS 转存生成完成后立即下载转存,不依赖平台临时 URL

LangGraph 工作流

START → reason → should_act ─Yes→ tool_node → extract_results →
            │               │                                       ↓
            └───────No──────┘                                    respond → END

节点职责

节点文件职责
reasonagent/video/nodes/reason.pyLLM 分析用户意图,优化 prompt,决定是否调用 Tool
tool_nodeLangGraph 内置 ToolNode执行 submit_video_task Tool,提交视频到 Kkidc
extract_resultsagent/video/nodes/extract_results.py从 Tool 缓存提取 VideoTask 写入 state
respondagent/video/nodes/respond.pyLLM 根据提交结果组织自然语言回复

State 定义(agent/video/state.py

VideoAgentState 继承 ChatState,复用 messages 字段实现会话上下文共享:

python
class VideoTask(TypedDict):
    task_id: str           # Kkidc 平台返回的原始 ID
    custom_task_id: str    # 后端 UUID,前端主键
    user_message_id: str   # 前端消息 ID,用于精准匹配
    prompt: str
    model: str
    status: str            # queued / processing / succeeded / failed / timeout
    video_url: Optional[str]
    cos_key: Optional[str]
    created_at: str
    last_checked: Optional[str]
    error: Optional[str]
    prompt_summary: str    # 简短摘要,用于 WebSocket 推送

class VideoAgentState(ChatState):
    video_model: str
    reference_images: Optional[List[str]]
    aspect_ratio: Optional[str]
    duration: int
    generate_audio: bool
    user_message_id: Optional[str]
    video_tasks: Annotated[List[VideoTask], "append"]

Tool 定义(agent/video/tools.py

  • submit_video_task — 提交视频生成任务,调用 Kkidc API,返回 task_id
    • 支持文生视频、图生视频(reference_images / first_frame_image / last_frame_image)
    • 通过模块级全局变量在 Tool 和 extract_results 节点之间传递结果
  • check_video_status — 查询任务状态(预留,目前返回提示信息)

后台轮询(agent/video/background.py

start_video_poller(task, thread_id, ...)

    ├── 轮询 Kkidc(指数退避:15s → 22s → 33s → ... → 60s max)

    ├── succeeded → 下载视频 → COS 转存
    │   → aupdate_state(status=succeeded)
    │   → 更新 ProviderTask DB
    │   → WebSocket broadcast(video_completed)

    ├── failed → aupdate_state(status=failed)
    │   → 更新 ProviderTask DB
    │   → WebSocket broadcast(video_failed)

    └── timeout(600s)→ aupdate_state(status=timeout)
        → WebSocket broadcast(video_failed)

WebSocket 协议

连接端点

ws://{host}/ws/video/{thread_id}

前端心跳

每 30 秒发送 ping,后端回复 pong

推送消息格式

json
// 视频完成
{
  "type": "video_completed",
  "custom_task_id": "uuid",
  "user_message_id": "msg_xxx",
  "video_url": "https://cos.../video.mp4",
  "prompt_summary": "日落海滩..."
}

// 视频失败
{
  "type": "video_failed",
  "custom_task_id": "uuid",
  "user_message_id": "msg_xxx",
  "error": "生成超时"
}

前端通过 custom_task_iduser_message_id 精准匹配对应的视频卡片并更新。


SSE 协议(AI SDK UI Protocol)

POST /api/video/stream 返回 text/event-stream

事件说明
start流开始
text-start文本开始
text-deltaLLM 推理文本增量(分析意图、提交确认等)
text-end文本结束
finish流结束,data 包含 data-videoGeneration 类型数据

finish 事件的 data 部分

json
[{
  "type": "data-videoGeneration",
  "data": {
    "prompt": "...",
    "model": "kuaikuai-2-pro",
    "taskId": "custom_task_id",
    "customTaskId": "custom_task_id",
    "userMessageId": "msg_xxx",
    "status": "pending",
    "videoUrl": null,
    "aspectRatio": "16:9"
  }
}]

API 端点

方法路径说明
POST/api/video/streamSSE 流式视频生成(主接口)
WS/ws/video/{thread_id}WebSocket 实时推送
GET/api/video/models获取支持的视频模型列表
GET/api/video/rate-limit-status获取当前用户限流状态
POST/api/video/status/batch批量查询任务状态(降级方案)

前端架构

新增文件

文件职责
features/workbench/lib/video-websocket.tsWebSocket 管理器单例,支持自动重连、心跳
features/workbench/hooks/use-video-websocket.tsWebSocket hook,连接/消息处理/更新 UI
features/workbench/hooks/use-resume-pending-video-tasks.ts页面加载时批量查询 pending 任务(一次性,不再轮询)

改动文件

文件变更
features/workbench/lib/video-generation.ts从 POST 改为 SSE fetch,解析 AI SDK 事件流
features/workbench/lib/video-polling.ts移除轮询循环,仅保留 batchQueryVideoTaskStatus
features/workbench/hooks/use-chat-session.ts集成 useVideoWebSocket hook
features/workbench/hooks/use-submission-handler.ts适配新的 SSE 提交流程
lib/chat/types.ts新增 VideoWsMessage 类型
lib/chat/adapter.ts解析 custom_task_iduser_message_id

本次变更摘要

新建文件(后端)

文件说明
agent/video/background.py后台轮询器(asyncio.create_task + 指数退避)
agent/video/websocket_manager.pyWebSocket 连接管理(按 thread_id 广播)
agent/video/prompts.pyVideo Agent 系统提示词
agent/video/nodes/reason.pyreason 节点(LLM 分析意图 + bind_tools)
agent/video/nodes/extract_results.py结果提取节点(Tool 缓存 → state)
agent/video/nodes/respond.pyrespond 节点(LLM 组织最终回复)
services/video_stream_service.py视频流式服务(SSE + 消息存储 + 启动轮询)

重写文件(后端)

文件变更
agent/video/state.py新 VideoAgentState,继承 ChatState
agent/video/tools.pyReAct Tool 封装(submit_video_task)
agent/video/graph.pyReAct StateGraph(reason → tool → extract → respond)
agent/video/agent.py流式 video_stream(),产出 progress/video_submitted/complete 事件
agent/video/config.py精简为视频专用配置 + 限流配置
api/video.pySSE 端点 + WebSocket 端点
schemas/video.pyVideoStreamRequest 等新 schema
services/providers/kkidc/__init__.py新增 submit_video_task / query_video_task 方法

删除文件

文件原因
agent/video/nodes/preprocess.py旧线性管道,被 reason 节点替代
agent/video/nodes/submit.py旧线性管道,被 tool_node 替代
services/video_service.py功能迁移到 background.py + video_stream_service.py

前端新建

文件说明
features/workbench/lib/video-websocket.tsWebSocket 管理器(自动重连 + 心跳)
features/workbench/hooks/use-video-websocket.tsWebSocket hook

配置项

环境变量默认值说明
VIDEO_REASONING_MODELclaude-sonnet-4-6Video Agent 推理用的 LLM
VIDEO_REASONING_PROVIDERkkidc推理模型所属 provider
VIDEO_POLL_TIMEOUT600后台轮询超时(秒)
VIDEO_POLL_INTERVAL15初始轮询间隔(秒)
VIDEO_POLL_MAX_INTERVAL60最大轮询间隔(秒)
VIDEO_POLL_BACKOFF_FACTOR1.5退避因子
KKIDC_SEEDANCE_API_KEYKkidc 视频生成专用 API Key
SEEDANCE_2_0_RATE_LIMITER_ENABLEDtrue限流开关
SEEDANCE_2_0_RATE_LIMITER_MAX_GLOBAL_CONCURRENT30全站最大并发
SEEDANCE_2_0_RATE_LIMITER_MAX_USER_CONCURRENT5单用户最大并发