Skip to main content

概述

kira-video-worker 是一个后台 Worker 服务,负责处理视频生成任务。它从 NATS JetStream 消费任务,调用视频生成 Provider(Seedance / Grok / Veo),将生成的视频上传到 Supabase Storage,并通过 Centrifugo 实时通知前端。

部署信息

配置
平台Fly.io
区域sjc (San Jose)
内存8GB
CPU4 vCPU (shared)
运行时Bun (oven/bun:1)
依赖ffmpeg (缩略图提取)
HTTP 服务无 (纯后台 Worker)
优雅关闭超时600 秒 (10 分钟)
心跳BetterStack (每 60 秒)

视频 Provider

Provider 列表

Provider模型API 基址支持比例成本 ($/秒)
Seedanceep-20260106141447-dxlclark.ap-southeast.bytepluses.com1:1, 3:4, 4:3, 16:9, 9:16, 21:9$0.05
Grokgrok-imagine-videoapi.x.ai/v116:9, 4:3, 1:1, 9:16, 3:4, 3:2, 2:3$0.05
Veoveo-3.1-fast-generate-previewgenerativelanguage.googleapis.com/v1beta16:9, 9:16$0.15

Provider 选择逻辑

  • 默认使用 GrokSeedance(由 kira-be 根据 NSFW 检测结果路由)
  • NSFW 内容自动路由到 Seedance(更安全的选择)
  • Veo 为高质量选项,积分消耗为其他 Provider 的 3 倍

Provider 实现模式

所有 Provider 继承 BaseProvider 抽象类,实现 createTask()pollTask() 方法:
abstract class BaseProvider {
  abstract createTask(task: VideoTask): Promise<string>;
  abstract pollTask(taskId: string): Promise<GenerationResult>;

  // 完整流程:创建任务 → 轮询直到完成
  async generate(task: VideoTask): Promise<GenerationResult>;
}
Provider轮询间隔最大等待时间轮询模式
Seedance2 秒10 分钟状态字段轮询
Grok2 秒10 分钟HTTP 202/200 延迟响应
Veo10 秒10 分钟Long-running operation (done: true 即完成)

Veo 时长映射

Veo 的时长映射不同于其他 Provider:
Worker 时长Veo 实际时长
"5"6 秒
"10"8 秒

处理管道

完整流程

1

接收任务

NATS Consumer 从 tasks.video.* subject 接收 VideoTask,立即 ack 消息取得所有权。每用户串行处理(不同用户并发),最大并发 100。
2

检查 Thread

验证 thread 是否仍存在,避免对已删除的 thread 浪费计算资源。
3

计费检查 & 扣费

检查用户 plan 是否为 Pro/Max,检查 credit + addon_credits 是否充足,通过 PostgreSQL RPC deduct_credits 原子扣除积分(幂等,基于 taskId 防重复扣费)。
4

状态更新 → processing

更新 thread_version.videos JSONB 中的视频状态为 processing,通过 Centrifugo 通知前端。
5

解析图片比例

如果 ratio 为 auto 且有源图片,使用 Sharp 检测图片尺寸并映射到最近的标准比例。
6

调用 Provider

调用对应 Provider 的 generate() 方法,创建任务并轮询直到完成。
7

后处理

下载 Provider 返回的视频 → 生成 UUID → 上传到 Supabase Storage → 使用 ffmpeg 提取首帧缩略图 → 使用 ffprobe 提取视频尺寸 → 生成 blurhash → 上传缩略图。
8

Analytics 埋点

发送 tool_usageai_cost 事件到 PostHog。
9

完成通知

更新 JSONB 状态为 completed,生成签名 URL,通过 Centrifugo 推送完整视频数据给前端。
10

内容审核

Fire-and-forget:对缩略图执行 OpenAI omni-moderation 审核,NSFW 则标记 thread flag。

失败处理

  • 扣费前失败:Consumer 内置重试机制(最多 2 次,间隔 5 秒),不消耗积分
  • 扣费后失败:通过 refund_credits RPC 退还积分,更新状态为 failed
  • NATS 层面max_deliver: 3ack_wait: 5 分钟

优雅关闭

收到 SIGTERM 后:
  1. 停止拉取新消息
  2. 等待所有活跃任务完成(最长 10 分钟)
  3. 将内存队列中的等待任务重新发布回 NATS
  4. Flush PostHog 事件
  5. 关闭 NATS 连接

NATS 配置

配置项
StreamTASKS
Subjecttasks.video.*(层级格式 tasks.video.<taskId>
Consumervideo-worker
Ack PolicyExplicit
Deliver PolicyAll
Max Deliver3
Ack Wait5 分钟
Retentionworkqueue
Max Messages100,000
Max Age7 天

并发控制

每用户串行处理 + 不同用户并发
├── 最大并发: 100 个任务
├── 用户忙时: 新任务加入内存队列
├── 容量满时: 新任务加入内存队列
└── 队列排空: 处理完后自动消费队列中的下一个任务

积分计算

时长普通 Provider (Seedance/Grok)Veo (3x)
5 秒60 积分180 积分
10 秒120 积分360 积分
扣费通过 PostgreSQL RPC deduct_credits 原子执行,支持 creditaddon_credits 双池扣除,基于 taskId 实现幂等防重复扣费。

Analytics 埋点

tool_usage 事件

{
  "event": "tool_usage",
  "properties": {
    "tool_name": "generateVideo",
    "task_id": "xxx",
    "thread_id": "xxx",
    "provider": "seedance | grok | veo",
    "success": true,
    "duration_ms": 45000,
    "credits_consumed": 60
  }
}

ai_cost 事件

{
  "event": "ai_cost",
  "properties": {
    "type": "tool_video",
    "model": "seedance-video | grok-video | veo-video",
    "tool_name": "generateVideo",
    "thread_id": "xxx",
    "duration_ms": 45000,
    "video_duration_seconds": 5,
    "cost_per_second_usd": 0.05,
    "total_cost_usd": 0.25
  }
}

Centrifugo 通知

Channel 格式

{userId}/{threadId}#{userId}

Payload 格式

所有通知统一使用 type: "video_status",通过 video.status 区分状态:
{
  "channel": "{userId}/{threadId}#{userId}",
  "data": {
    "type": "video_status",
    "taskId": "xxx",
    "video": {
      "taskId": "xxx",
      "status": "completed",
      "videoId": "{userId}/{threadId}/video_{uuid}.mp4",
      "thumbId": "{userId}/{threadId}/vc_{uuid}.jpg",
      "width": 1280,
      "height": 720,
      "url": "https://...signed-url...",
      "thumbUrl": "https://...signed-url..."
    },
    "timestamp": "2026-01-15T10:30:00.000Z"
  }
}

状态类型

status说明
processingWorker 已接收任务,开始生成
completed生成完成,包含 url/thumbUrl
failed生成失败,包含 errorMessage
insufficient_credits积分不足,触发前端计费弹窗
insufficient_plan套餐不支持视频

存储结构

所有文件存储在 Supabase Storage 的 agent_message bucket:
类型路径格式说明
源图片{userId}/{threadId}/gen_{uuid}由 kira-be 上传
视频{userId}/{threadId}/video_{uuid}.{ext}Worker 上传
缩略图{userId}/{threadId}/vc_{uuid}.jpgWorker 上传,含 blurhash metadata

数据存储

视频状态存储在 thread_version.videos JSONB 字段中(无独立 videos 表),通过 JSONB containment 查询 [{"taskId": "..."}] 定位记录。

环境变量

# 数据库
SUPABASE_URL                 # Supabase 项目 URL
SUPABASE_KEY                 # Supabase service role key

# NATS
NATS_URL                     # NATS 服务器地址 (默认: kira-nats.internal:4222)
NATS_AUTH_TOKEN              # NATS 认证 Token

# Provider API Keys
BYTEPLUS_API_KEY             # BytePlus Seedance API Key
XAI_API_KEY                  # xAI Grok API Key
GOOGLE_GENERATIVE_AI_API_KEY # Google Veo API Key

# 通知
CENTRIFUGO_URL               # Centrifugo API URL (默认: http://kira-centrifugo.internal:8000)
CENTRIFUGO_API_KEY           # Centrifugo API Key

# Redis
REDIS_URL                    # Redis URL (签名 URL 缓存)

# Analytics
POSTHOG_API_KEY              # PostHog API Key
POSTHOG_HOST                 # PostHog Host (默认: https://us.i.posthog.com)

# 内容审核
OPENAI_API_KEY               # OpenAI API Key (缩略图审核)

# 监控
HEARTBEAT_URL                # BetterStack 心跳 URL (可选)

文件结构

kira-video-worker/
├── fly.toml                    # Fly.io 部署配置
├── Dockerfile                  # 构建配置 (Bun + ffmpeg)
├── package.json
└── src/
    ├── index.ts                # 主入口,启动 Consumer + 优雅关闭
    ├── consumer.ts             # NATS JetStream Consumer (并发控制)
    ├── processor.ts            # 任务编排 (计费 → 生成 → 后处理 → 通知)
    ├── post-processor.ts       # 视频下载、缩略图提取、上传
    ├── storage.ts              # Supabase Storage + JSONB 操作
    ├── billing.ts              # 积分检查、扣除、退还
    ├── notify.ts               # Centrifugo WebSocket 推送
    ├── analytics.ts            # PostHog 埋点
    ├── moderation.ts           # 缩略图内容审核 + Thread Flag
    ├── url.ts                  # 签名 URL 生成(Redis 缓存)
    ├── types.ts                # 类型定义
    └── providers/
        ├── base.ts             # 抽象基类 (创建+轮询模式)
        ├── index.ts            # Provider 工厂 (单例)
        ├── seedance.ts         # BytePlus Seedance 实现
        ├── grok.ts             # xAI Grok Imagine 实现
        └── veo.ts              # Google Veo 实现
视频 Worker 为纯后台服务,不暴露 HTTP 端口。部署后通过 BetterStack 心跳监控存活状态。