Skip to main content

Documentation Index

Fetch the complete documentation index at: https://tech.illasoft.com/llms.txt

Use this file to discover all available pages before exploring further.

概述

kira-video-worker 是视频生成 Worker。从 Inngest serverkira-inngest)接收 HTTP push,调用视频 provider(Seedance / Grok / WAN 2.7 / KIE Seedance2 / TencentCloud Kling)生成视频,做后处理(ffmpeg 缩略图 + blurhash + CSAM),上传到 Supabase Storage,通过 Centrifugo 推送前端。 2026-04 重构:由 long-lived NATS consumer 重构为 Inngest HTTP function(commit a9597f6)。每个 toolName:provider 组合都有独立的 Inngest function,支持 durable steps + 内置重试 + priority queue。

部署信息

配置
平台Fly.io (sjc)
资源4 CPU / 4 GB RAM
运行时Bun (oven/bun:1) + ffmpeg
HTTPInngest webhook POST /api/inngest(仅内网)
优雅关闭600s (10 min)
心跳BetterStack (60s)

Inngest Functions

生成类(并发限制)

Function IDEventper-user-per-toolper-providerretrytimeout
video-generation-seedancevideo/generation-seedance.requested1100260min
video-generation-grokvideo/generation-grok.requested160260min
video-motion-control-tencentcloud-vod-klingvideo/motion-control-tencentcloud-vod-kling.requested170260min
video-edit-ws-wan27video/edit-ws-wan27.requested150260min
video-edit-kie-seedance2video/edit-kie-seedance2.requested150260min
video-extend-ws-wan27video/extend-ws-wan27.requested150260min

非生成类(无限并发)

  • video-upload-kira — 用户直传
  • video-trim-kira — 本地 trim(0 credits)

Cleanup

  • video-cancelled-cleanup 监听 inngest/function.cancelled:非 upload/trim 类退款 + 通知 failed/cancelled

视频 Provider

Provider来源用途轮询模式
seedanceBytePlus seedance-1-5-progenerateVideo NSFW 路径状态字段轮询
grokxAI grok-imagine-videogenerateVideo 默认Deferred Response 202/200
ws-wan27WAN 2.7 ProvideoEdit / videoExtend轮询
kie-seedance2Seedance 2 via KIEvideoEdit轮询
TencentCloud KlingVOD KlingmotionControl轮询
Provider 抽象基类:
abstract class BaseProvider {
  abstract createTask(task): Promise<string>;          // 返回 providerTaskId
  abstract pollTask(taskId): Promise<GenerationResult>; // 非阻塞轮询
}
轮询由 Inngest pollWithInngestSteps(step, () => provider.pollTask()) 封装,使用 step.sleep + step.waitForEvent 做 durable polling,Worker 重启不丢状态。

处理 Pipeline(6 步)

src/inngest.ts:187-266 + src/processor.ts
1

step 1: check

  • checkThreadExists(threadId) 线程已删则跳过
  • cost = getVideoCreditsCost(toolName, duration)
  • checkVideoEligibility(userId, cost) Plan + 余额
  • deductCredits(userId, cost, taskId) Supabase RPC deduct_credits,原子扣减(行锁 + 幂等 by taskId)
  • 结束返回 { skip, startTime, cost }
2

step 2: notify-processing

  • updateVideoInJsonb(taskId, { status: "processing" })
  • updateMessageToolOutput(...) 同步 message 的 tool part
  • Centrifugo: { type: "video_status", video: { status: "processing" } }
3

step 3: create-task

  • ratio="auto"detectImageRatio(sourceImageId)
  • provider = getProvider(providerName)
  • providerTaskId = await provider.createTask(task)
4

step 4: poll-result

  • pollWithInngestSteps(step, () => provider.pollTask(providerTaskId))
  • 返回 GenerationResult { success, videoUrl, error }
5

step 5: post-process

  • fetch(videoUrl) → Buffer
  • ffprobe
  • ffmpeg -vframes 1 -q:v 2 → 缩略图 JPEG (95% 质量)
  • generateBlurhash(thumbBuffer) — 4×4 components
  • uploadVideo{userId}/{threadId}/video_{videoId}.mp4
  • uploadThumbnail{userId}/{threadId}/vc_{thumbId}.jpg(metadata 含 blurhash + size)
  • 返回 VideoResult { status, url, thumbUrl, width, height, duration, blurhash }
6

step 6: finalize

  • trackToolUsage({ userId, threadId, taskId, tool_name, success, duration_ms, credits_consumed })
  • trackMediaUsage({ taskId, toolName, provider, model, duration_ms, output_duration_ms }) — 仅时长,无金额
  • handleSuccess(taskId, userId, threadId, videoResult, task)
    • checkVideoCsam(thumbBuffer, videoFileId) — 命中则删除 + failed_uploads + notify csam_blocked不退款
    • updateVideoInJsonb(taskId, { status: "completed", videoId, thumbId, ... })
    • Centrifugo: { type: "video_status", video: { status: "completed", url, thumbUrl, ... } }

临时文件

motionControl / videoEdit / videoExtend 会上传临时视频(temp_video_{uuid}.mp4)给 provider 用作 source;finalize 时 deleteStorageFile(prepared.tempVideoPath) 清理。若 finalize 失败可能遗留,可考虑后续加 cron 定期扫描。

Upload 策略

  • Bucket: agent_message
  • 单次 PUT(无分片)
  • 无 imgproxy 中转(视频不走 imgproxy,直接返回 Supabase signed URL)
  • Content-Type 按扩展名映射(mp4 → video/mp4jpg → image/jpeg
  • cacheControl: 3600
  • upsert: false
类型路径metadata
Video{userId}/{threadId}/video_{videoId}.mp4
Thumbnail{userId}/{threadId}/vc_{thumbId}.jpgblurhash, size
水印版本{userId}/{threadId}/wv_{uuid}.mp4按需生成
临时{userId}/{threadId}/temp_video_{uuid}.mp4finalize 时删

CSAM 审核

src/csam.ts:26-79finalize.handleSuccess 内触发:
checkVideoCsam({ thumbBuffer, videoFileId, taskId, userId, threadId })

若 !thumbBuffer → { action: "skip", reason: "no_thumbnail" }

detectCsamFromBuffer(thumbBuffer)
  true → block:
    ├─ deleteStorageFile(videoFileId)
    ├─ 写 failed_uploads 表
    ├─ notify { status: "failed", errorCode: "csam_blocked", errorMessage: "Content involving minors is not allowed." }
    └─ credits 已扣,**不退款**(合规处罚)
  false → { action: "pass" } → 继续

Centrifugo 通知

Channel{userId}/{threadId}#{userId}
{
  "type": "video_status",
  "taskId": "...",
  "video": {
    "taskId": "...",
    "status": "processing" | "completed" | "failed" | "insufficient_credits" | "insufficient_plan",
    "url": "https://...",
    "thumbUrl": "https://...",
    "width": 1920,
    "height": 1080,
    "duration": 5.5,
    "blurhash": "U6PVzY...",
    "errorCode": "csam_blocked",
    "errorMessage": "..."
  },
  "timestamp": "2026-04-14T12:34:56Z"
}
POST 到 {CENTRIFUGO_URL}/api/publishX-API-Key 认证。

Credits 计费

工具成本
generateVideo 5s50 credits
generateVideo 10s100 credits
videoEdit / videoExtend / motionControlceil(duration) × 15
trimVideo / upload0
扣减时机:step 1,原子 + 幂等 by taskId。CSAM 阻断不退款;其他失败(如 provider 错误)触发 cleanup function 退款。

任务取消

kira-be → inngest.send("video/task.cancelled", { taskId })
       → redis SET cancel:{taskId} 1 EX 3600

Worker 每个 step.run 开头:
  if (redis.GET cancel:{taskId}) → throw NonRetriableError
    → Inngest 触发 cleanup function → 退款 + 通知 failed/cancelled

Analytics

tool_usage

{
  "thread_id": "...",
  "tool_name": "generateVideo",
  "task_id": "...",
  "provider": "seedance",
  "success": true,
  "duration_ms": 45000,
  "credits_consumed": 50,
  "error_message": null
}

media_usage

{
  "thread_id": "...",
  "task_id": "...",
  "tool_name": "generateVideo",
  "provider": "seedance",
  "model": "seedance-video",
  "duration_ms": 45000,
  "output_duration_ms": 5000
}

文件结构

kira-video-worker/
├── fly.toml                 # Fly 部署(kill_timeout 600s)
├── Dockerfile               # Bun + ffmpeg
├── package.json
└── src/
    ├── index.ts             # 入口:serve({ client: inngest, functions })
    ├── inngest.ts           # 7 个 function 定义(含 cleanup)
    ├── processor.ts         # 编排 pipeline 6 步
    ├── post-processor.ts    # ffmpeg 缩略图 + blurhash
    ├── poll.ts              # pollWithInngestSteps 辅助
    ├── storage.ts           # Supabase Storage + 签名 URL
    ├── billing.ts           # credit check + deduct + refund
    ├── notify.ts            # Centrifugo publish
    ├── analytics.ts         # PostHog tool_usage / media_usage
    ├── csam.ts              # CSAM 检测 + 删除 + failed_uploads
    ├── types.ts             # 共享类型
    └── providers/
        ├── base.ts          # BaseProvider abstract
        ├── index.ts         # factory
        ├── seedance.ts
        ├── grok.ts
        ├── ws-wan27.ts
        ├── kie-seedance2.ts
        └── tencentcloud-kling.ts

环境变量

SUPABASE_URL
SUPABASE_KEY
REDIS_URL
INNGEST_EVENT_KEY
INNGEST_SIGNING_KEY
BYTEPLUS_API_KEY
XAI_API_KEY
WS_WAN_API_KEY
KIE_AI_API_KEY
TENCENTCLOUD_SECRET_ID
TENCENTCLOUD_SECRET_KEY
CENTRIFUGO_URL
CENTRIFUGO_API_KEY
POSTHOG_API_KEY
OPENAI_API_KEY              # CSAM 审核
SENTRY_DSN
HEARTBEAT_URL               # BetterStack

相关文档