kira-video-worker 是一个后台 Worker 服务,负责处理视频生成任务。它从 NATS JetStream 消费任务,调用视频生成 Provider(Seedance / Grok / Veo),将生成的视频上传到 Supabase Storage,并通过 Centrifugo 实时通知前端。
部署信息
| 配置 | 值 |
|---|
| 平台 | Fly.io |
| 区域 | sjc (San Jose) |
| 内存 | 8GB |
| CPU | 4 vCPU (shared) |
| 运行时 | Bun (oven/bun:1) |
| 依赖 | ffmpeg (缩略图提取) |
| HTTP 服务 | 无 (纯后台 Worker) |
| 优雅关闭超时 | 600 秒 (10 分钟) |
| 心跳 | BetterStack (每 60 秒) |
视频 Provider
Provider 列表
| Provider | 模型 | API 基址 | 支持比例 | 成本 ($/秒) |
|---|
| Seedance | ep-20260106141447-dxlcl | ark.ap-southeast.bytepluses.com | 1:1, 3:4, 4:3, 16:9, 9:16, 21:9 | $0.05 |
| Grok | grok-imagine-video | api.x.ai/v1 | 16:9, 4:3, 1:1, 9:16, 3:4, 3:2, 2:3 | $0.05 |
| Veo | veo-3.1-fast-generate-preview | generativelanguage.googleapis.com/v1beta | 16:9, 9:16 | $0.15 |
Provider 选择逻辑
- 默认使用 Grok 或 Seedance(由 kira-be 根据 NSFW 检测结果路由)
- NSFW 内容自动路由到 Seedance(更安全的选择)
- Veo 为高质量选项,积分消耗为其他 Provider 的 3 倍
Provider 实现模式
所有 Provider 继承 BaseProvider 抽象类,实现 createTask() 和 pollTask() 方法:
abstract class BaseProvider {
abstract createTask(task: VideoTask): Promise<string>;
abstract pollTask(taskId: string): Promise<GenerationResult>;
// 完整流程:创建任务 → 轮询直到完成
async generate(task: VideoTask): Promise<GenerationResult>;
}
| Provider | 轮询间隔 | 最大等待时间 | 轮询模式 |
|---|
| Seedance | 2 秒 | 10 分钟 | 状态字段轮询 |
| Grok | 2 秒 | 10 分钟 | HTTP 202/200 延迟响应 |
| Veo | 10 秒 | 10 分钟 | Long-running operation (done: true 即完成) |
Veo 时长映射
Veo 的时长映射不同于其他 Provider:
| Worker 时长 | Veo 实际时长 |
|---|
"5" | 6 秒 |
"10" | 8 秒 |
处理管道
完整流程
接收任务
NATS Consumer 从 tasks.video.* subject 接收 VideoTask,立即 ack 消息取得所有权。每用户串行处理(不同用户并发),最大并发 100。
检查 Thread
验证 thread 是否仍存在,避免对已删除的 thread 浪费计算资源。
计费检查 & 扣费
检查用户 plan 是否为 Pro/Max,检查 credit + addon_credits 是否充足,通过 PostgreSQL RPC deduct_credits 原子扣除积分(幂等,基于 taskId 防重复扣费)。
状态更新 → processing
更新 thread_version.videos JSONB 中的视频状态为 processing,通过 Centrifugo 通知前端。
解析图片比例
如果 ratio 为 auto 且有源图片,使用 Sharp 检测图片尺寸并映射到最近的标准比例。
调用 Provider
调用对应 Provider 的 generate() 方法,创建任务并轮询直到完成。
后处理
下载 Provider 返回的视频 → 生成 UUID → 上传到 Supabase Storage → 使用 ffmpeg 提取首帧缩略图 → 使用 ffprobe 提取视频尺寸 → 生成 blurhash → 上传缩略图。
Analytics 埋点
发送 tool_usage 和 ai_cost 事件到 PostHog。
完成通知
更新 JSONB 状态为 completed,生成签名 URL,通过 Centrifugo 推送完整视频数据给前端。
内容审核
Fire-and-forget:对缩略图执行 OpenAI omni-moderation 审核,NSFW 则标记 thread flag。
失败处理
- 扣费前失败:Consumer 内置重试机制(最多 2 次,间隔 5 秒),不消耗积分
- 扣费后失败:通过
refund_credits RPC 退还积分,更新状态为 failed
- NATS 层面:
max_deliver: 3,ack_wait: 5 分钟
优雅关闭
收到 SIGTERM 后:
- 停止拉取新消息
- 等待所有活跃任务完成(最长 10 分钟)
- 将内存队列中的等待任务重新发布回 NATS
- Flush PostHog 事件
- 关闭 NATS 连接
NATS 配置
| 配置项 | 值 |
|---|
| Stream | TASKS |
| Subject | tasks.video.*(层级格式 tasks.video.<taskId>) |
| Consumer | video-worker |
| Ack Policy | Explicit |
| Deliver Policy | All |
| Max Deliver | 3 |
| Ack Wait | 5 分钟 |
| Retention | workqueue |
| Max Messages | 100,000 |
| Max Age | 7 天 |
并发控制
每用户串行处理 + 不同用户并发
├── 最大并发: 100 个任务
├── 用户忙时: 新任务加入内存队列
├── 容量满时: 新任务加入内存队列
└── 队列排空: 处理完后自动消费队列中的下一个任务
积分计算
| 时长 | 普通 Provider (Seedance/Grok) | Veo (3x) |
|---|
| 5 秒 | 60 积分 | 180 积分 |
| 10 秒 | 120 积分 | 360 积分 |
扣费通过 PostgreSQL RPC deduct_credits 原子执行,支持 credit 和 addon_credits 双池扣除,基于 taskId 实现幂等防重复扣费。
Analytics 埋点
{
"event": "tool_usage",
"properties": {
"tool_name": "generateVideo",
"task_id": "xxx",
"thread_id": "xxx",
"provider": "seedance | grok | veo",
"success": true,
"duration_ms": 45000,
"credits_consumed": 60
}
}
ai_cost 事件
{
"event": "ai_cost",
"properties": {
"type": "tool_video",
"model": "seedance-video | grok-video | veo-video",
"tool_name": "generateVideo",
"thread_id": "xxx",
"duration_ms": 45000,
"video_duration_seconds": 5,
"cost_per_second_usd": 0.05,
"total_cost_usd": 0.25
}
}
Centrifugo 通知
Channel 格式
{userId}/{threadId}#{userId}
Payload 格式
所有通知统一使用 type: "video_status",通过 video.status 区分状态:
{
"channel": "{userId}/{threadId}#{userId}",
"data": {
"type": "video_status",
"taskId": "xxx",
"video": {
"taskId": "xxx",
"status": "completed",
"videoId": "{userId}/{threadId}/video_{uuid}.mp4",
"thumbId": "{userId}/{threadId}/vc_{uuid}.jpg",
"width": 1280,
"height": 720,
"url": "https://...signed-url...",
"thumbUrl": "https://...signed-url..."
},
"timestamp": "2026-01-15T10:30:00.000Z"
}
}
状态类型
| status | 说明 |
|---|
processing | Worker 已接收任务,开始生成 |
completed | 生成完成,包含 url/thumbUrl |
failed | 生成失败,包含 errorMessage |
insufficient_credits | 积分不足,触发前端计费弹窗 |
insufficient_plan | 套餐不支持视频 |
存储结构
所有文件存储在 Supabase Storage 的 agent_message bucket:
| 类型 | 路径格式 | 说明 |
|---|
| 源图片 | {userId}/{threadId}/gen_{uuid} | 由 kira-be 上传 |
| 视频 | {userId}/{threadId}/video_{uuid}.{ext} | Worker 上传 |
| 缩略图 | {userId}/{threadId}/vc_{uuid}.jpg | Worker 上传,含 blurhash metadata |
数据存储
视频状态存储在 thread_version.videos JSONB 字段中(无独立 videos 表),通过 JSONB containment 查询 [{"taskId": "..."}] 定位记录。
环境变量
# 数据库
SUPABASE_URL # Supabase 项目 URL
SUPABASE_KEY # Supabase service role key
# NATS
NATS_URL # NATS 服务器地址 (默认: kira-nats.internal:4222)
NATS_AUTH_TOKEN # NATS 认证 Token
# Provider API Keys
BYTEPLUS_API_KEY # BytePlus Seedance API Key
XAI_API_KEY # xAI Grok API Key
GOOGLE_GENERATIVE_AI_API_KEY # Google Veo API Key
# 通知
CENTRIFUGO_URL # Centrifugo API URL (默认: http://kira-centrifugo.internal:8000)
CENTRIFUGO_API_KEY # Centrifugo API Key
# Redis
REDIS_URL # Redis URL (签名 URL 缓存)
# Analytics
POSTHOG_API_KEY # PostHog API Key
POSTHOG_HOST # PostHog Host (默认: https://us.i.posthog.com)
# 内容审核
OPENAI_API_KEY # OpenAI API Key (缩略图审核)
# 监控
HEARTBEAT_URL # BetterStack 心跳 URL (可选)
文件结构
kira-video-worker/
├── fly.toml # Fly.io 部署配置
├── Dockerfile # 构建配置 (Bun + ffmpeg)
├── package.json
└── src/
├── index.ts # 主入口,启动 Consumer + 优雅关闭
├── consumer.ts # NATS JetStream Consumer (并发控制)
├── processor.ts # 任务编排 (计费 → 生成 → 后处理 → 通知)
├── post-processor.ts # 视频下载、缩略图提取、上传
├── storage.ts # Supabase Storage + JSONB 操作
├── billing.ts # 积分检查、扣除、退还
├── notify.ts # Centrifugo WebSocket 推送
├── analytics.ts # PostHog 埋点
├── moderation.ts # 缩略图内容审核 + Thread Flag
├── url.ts # 签名 URL 生成(Redis 缓存)
├── types.ts # 类型定义
└── providers/
├── base.ts # 抽象基类 (创建+轮询模式)
├── index.ts # Provider 工厂 (单例)
├── seedance.ts # BytePlus Seedance 实现
├── grok.ts # xAI Grok Imagine 实现
└── veo.ts # Google Veo 实现
视频 Worker 为纯后台服务,不暴露 HTTP 端口。部署后通过 BetterStack 心跳监控存活状态。