Skip to main content

概述

kira-centrifugo 是基于 Centrifugo 的实时消息服务,部署在 SJC 和 FRA 两个区域,为客户端提供低延迟的 WebSocket 连接。

部署信息

配置
平台Fly.io
区域sjc (San Jose), fra (Frankfurt)
内存4GB
CPU4 vCPU (shared)
端口8000 (HTTP/WebSocket)
公网地址wss://ws.kira.art/connection/websocket
内部地址kira-centrifugo.internal:8000

架构设计

双区域 + Redis 同步

两个区域的 Centrifugo 实例通过 Redis (kira-dragonfly) 同步消息:
  • 客户端连接到最近的区域(Cloudflare Anycast)
  • 消息通过 Redis 在两个区域间同步
  • 任一区域发布的消息,所有客户端都能收到

认证

使用 Supabase JWT 进行客户端认证:
{
  "client": {
    "token": {
      "jwks_public_endpoint": "https://xxx.supabase.co/auth/v1/.well-known/jwks.json",
      "issuer": "https://xxx.supabase.co/auth/v1",
      "audience": "authenticated"
    }
  }
}
客户端使用 Supabase 的 access_token 直接连接 Centrifugo,无需额外的 token 生成。

客户端连接

Web (Raw WebSocket)

客户端使用原生 WebSocket 连接 Centrifugo,通过 JSON 协议发送命令:
const ws = new WebSocket("wss://ws.kira.art/connection/websocket");

// 连接认证(使用 Supabase JWT)
ws.onopen = () => {
  ws.send(JSON.stringify({
    id: 1,
    connect: { token: session.access_token },
  }));
};

// 订阅频道
ws.send(JSON.stringify({
  id: 2,
  subscribe: { channel: `${userId}/${threadId}#${userId}` },
}));

// 接收消息
ws.onmessage = (event) => {
  const message = JSON.parse(event.data);

  // 服务端 ping — 回复 pong
  if (Object.keys(message).length === 0) {
    ws.send(JSON.stringify({}));
    return;
  }

  // 推送消息
  if (message.push?.channel && message.push?.pub?.data) {
    console.log("收到消息:", message.push.pub.data);
  }
};
完整实现参见 kira-web/components/providers/centrifugo-provider.tsx,包含自动重连、Auth 状态监听等逻辑。

Server API

发布消息

curl -X POST "https://ws.kira.art/api/publish" \
  -H "Content-Type: application/json" \
  -H "X-API-Key: $CENTRIFUGO_API_KEY" \
  -d '{"channel":"user:123","data":{"type":"notification","message":"Hello"}}'

从 kira-video-worker 发布

kira-video-worker 是唯一通过 API 发布消息到 Centrifugo 的服务(kira-be 不使用 Centrifugo):
// kira-video-worker/src/notify.ts
const channel = `${userId}/${threadId}#${userId}`;

const response = await fetch("http://kira-centrifugo.internal:8000/api/publish", {
  method: "POST",
  headers: {
    "Content-Type": "application/json",
    "X-API-Key": process.env.CENTRIFUGO_API_KEY,
  },
  body: JSON.stringify({
    channel,
    data: { type: "video_status", taskId, video, timestamp: new Date().toISOString() },
  }),
});

运维

部署

cd kira-centrifugo

# 首次部署
fly apps create kira-centrifugo
fly secrets set \
  CENTRIFUGO_API_KEY="xxx" \
  CENTRIFUGO_ADMIN_PASSWORD="xxx" \
  CENTRIFUGO_ADMIN_SECRET="xxx"
fly deploy

# 扩展到双区域
fly scale count 1 --region sjc
fly scale count 1 --region fra

添加 SSL 证书

fly certs create ws.kira.art -a kira-centrifugo

测试 API

# 获取节点信息
curl -X POST "https://ws.kira.art/api/info" \
  -H "Content-Type: application/json" \
  -H "X-API-Key: $CENTRIFUGO_API_KEY" \
  -d "{}"

监控

健康检查

# 内部
curl http://kira-centrifugo.internal:8000/health

# 公网
curl https://ws.kira.art/health

关键指标

指标说明
num_clients当前连接数
num_channels活跃频道数
num_users认证用户数

Admin UI

访问 https://ws.kira.art/ 使用 Admin 密码登录查看实时状态。

文件结构

kira-centrifugo/
├── .github/
│   └── workflows/
│       └── deploy.yml    # CI/CD
├── Dockerfile            # 基于 centrifugo:v6
├── config.json           # Centrifugo 配置
└── fly.toml              # Fly.io 配置

配置详情

config.json

{
  "engine": {
    "type": "redis",
    "redis": {
      "address": "redis://:<token>@kira-dragonfly.internal:6379"
    }
  },
  "http_api": {},
  "admin": {
    "enabled": true
  },
  "client": {
    "allowed_origins": [
      "https://kira.art",
      "https://www.kira.art",
      "https://poisson.art",
      "https://www.poisson.art",
      "http://localhost:3000"
    ],
    "token": {
      "jwks_public_endpoint": "https://xxx.supabase.co/auth/v1/.well-known/jwks.json",
      "issuer": "https://xxx.supabase.co/auth/v1",
      "audience": "authenticated"
    }
  },
  "channel": {
    "without_namespace": {
      "allow_subscribe_for_client": true
    }
  }
}

fly.toml

app = "kira-centrifugo"
primary_region = "sjc"

[build]
  dockerfile = "Dockerfile"

[env]
  CENTRIFUGO_CONFIG = "/centrifugo/config.json"

[http_service]
  internal_port = 8000
  force_https = true
  auto_stop_machines = false
  auto_start_machines = true
  min_machines_running = 2

[[vm]]
  memory = "4gb"
  cpu_kind = "shared"
  cpus = 4

频道设计

频道格式

{userId}/{threadId}#{userId}
这是代码库中使用的唯一频道格式,由 kira-video-worker 发布,kira-web 订阅。格式说明:
  • {userId}/{threadId} — 斜杠分隔的路径,标识用户和线程
  • # — Centrifugo 的 user channel boundary 标记
  • #{userId}# 后的部分是 Centrifugo 的用户命名空间,Centrifugo 会自动校验订阅者的 JWT sub 字段与 # 后的用户 ID 匹配,确保只有对应用户可订阅该频道
用途:
  • 视频生成状态推送(processing / completed / failed)
  • 积分不足 / 套餐不支持通知
详见 Video Worker 文档