Skip to main content

概述

kira-nats 是基于 NATS JetStream 的分布式消息队列,部署在 SJC 区域的 3 节点集群,提供高可用的持久化消息服务。

部署信息

配置
平台Fly.io
区域sjc (San Jose)
节点数3 (RAFT 集群)
内存2GB / 节点
CPU2 vCPU (shared) / 节点
存储1GB Volume / 节点
客户端端口4222
集群端口6222
监控端口8222
内部地址kira-nats.internal:4222

架构设计

3 节点 RAFT 集群

  • 高可用:任意 1 节点故障,集群继续工作
  • 数据持久化:JetStream 将消息存储到 Volume
  • 自动选主:RAFT 共识算法自动选举 Leader
  • 全连接:每个节点与其他所有节点保持连接

动态节点发现

启动脚本自动发现所有节点:
# 查询 DNS 获取所有节点 IP
PEERS=$(getent hosts kira-nats.internal)

# 动态生成配置
routes: [
  "nats-route://[ip1]:6222",
  "nats-route://[ip2]:6222",
  "nats-route://[ip3]:6222"
]

跨区域访问

FRA 区域的 kira-be 可以通过 Fly.io 内网访问 SJC 的 NATS:
  • 延迟:约 130-150ms(FRA → SJC)
  • 连接:通过 .internal DNS 自动路由

客户端连接

Node.js / Bun

import { connect, StringCodec } from "nats";

const nc = await connect({
  servers: "kira-nats.internal:4222",
  token: process.env.NATS_AUTH_TOKEN,
});

const sc = StringCodec();

// 发布消息
nc.publish("tasks.video.abc123", sc.encode(JSON.stringify({ taskId: "abc123", userId: "user1" })));

// 订阅消息
const sub = nc.subscribe("tasks.video.*");
for await (const msg of sub) {
  const data = JSON.parse(sc.decode(msg.data));
  console.log("收到任务:", data);
}

JetStream (持久化)

import { connect } from "nats";

const nc = await connect({
  servers: "kira-nats.internal:4222",
  token: process.env.NATS_AUTH_TOKEN,
});

const js = nc.jetstream();

// 创建 Stream
const jsm = await nc.jetstreamManager();
await jsm.streams.add({
  name: "TASKS",
  subjects: ["tasks.>"],
  retention: "workqueue", // 消费后删除
  storage: "file",        // 持久化到磁盘
  replicas: 3,            // 3 副本
});

// 发布消息
await js.publish("tasks.video.abc123", sc.encode(JSON.stringify({ taskId: "abc123", userId: "user1" })));

// 消费消息
const consumer = await js.consumers.get("TASKS", "worker");
const messages = await consumer.consume();
for await (const msg of messages) {
  const data = JSON.parse(sc.decode(msg.data));
  await processTask(data);
  msg.ack(); // 确认消费
}

运维

部署

cd kira-nats

# 首次部署
fly apps create kira-nats

# 创建 3 个 Volume(每个节点一个)
fly volumes create nats_data --region sjc --size 1 --yes
fly volumes create nats_data --region sjc --size 1 --yes
fly volumes create nats_data --region sjc --size 1 --yes

# 设置认证 Token
fly secrets set NATS_AUTH_TOKEN="xxx"

# 部署
fly deploy

# 扩展到 3 节点
fly scale count 3 --region sjc --yes

扩展节点

# 先创建 Volume
fly volumes create nats_data --region sjc --size 1 --yes
fly volumes create nats_data --region sjc --size 1 --yes

# 扩展到 5 节点
fly scale count 5 --region sjc --yes

# 重启现有节点以发现新节点
fly machines restart -a kira-nats
RAFT 集群建议使用奇数节点(3, 5, 7)以避免脑裂。

测试集群

# SSH 进入节点
fly ssh console -a kira-nats

# 查看集群状态
wget -qO- http://localhost:8222/jsz

# 查看路由连接
wget -qO- http://localhost:8222/routez

监控

监控端点

端点说明
/healthz健康检查
/varz服务器状态
/connz客户端连接
/routez集群路由
/jszJetStream 状态

关键指标

# JetStream 集群状态
curl http://kira-nats.internal:8222/jsz
返回:
{
  "meta_cluster": {
    "name": "kira-nats-cluster",
    "leader": "nats-xxx",
    "cluster_size": 3
  },
  "streams": 1,
  "consumers": 2,
  "messages": 1000,
  "bytes": 102400
}

建议报警

指标WarningCritical
cluster_size< 3< 2
pending messages> 1000> 10000
consumer lag> 100> 1000

文件结构

kira-nats/
├── .github/
│   └── workflows/
│       └── deploy.yml       # CI/CD
├── Dockerfile               # NATS + 启动脚本
├── docker-entrypoint.sh     # 动态节点发现
├── nats.conf                # NATS 配置模板
└── fly.toml                 # Fly.io 配置

配置详情

docker-entrypoint.sh

#!/bin/sh
# 动态发现所有节点
PEERS=$(getent hosts kira-nats.internal | awk '{print $1}')

# 构建 routes 配置
ROUTES=""
for ip in $PEERS; do
  ROUTES="${ROUTES}\"nats-route://[$ip]:6222\","
done

# 生成运行时配置
cat > /etc/nats/nats-runtime.conf << EOF
server_name: nats-$(hostname | cut -c1-12)
port: 4222
http_port: 8222

cluster {
  name: kira-nats-cluster
  port: 6222
  routes: [$ROUTES]
}

jetstream {
  store_dir: /data/jetstream
  max_memory_store: 512MB
  max_file_store: 2GB
}

authorization {
  token: $NATS_AUTH_TOKEN
}
EOF

exec nats-server -c /etc/nats/nats-runtime.conf

fly.toml

app = "kira-nats"
primary_region = "sjc"

[build]
  dockerfile = "Dockerfile"

[mounts]
  source = "nats_data"
  destination = "/data"

[[services]]
  internal_port = 4222
  protocol = "tcp"
  auto_stop_machines = false

[[services]]
  internal_port = 6222
  protocol = "tcp"

[[services]]
  internal_port = 8222
  protocol = "tcp"

[[vm]]
  memory = '2gb'
  cpu_kind = 'shared'
  cpus = 2

使用场景

视频生成队列

// 生产者 (kira-be)
await js.publish(`tasks.video.${taskId}`, sc.encode(JSON.stringify({
  taskId,
  userId: "123",
  threadId: "456",
  provider: "seedance",
  prompt: "A beautiful sunset",
  duration: "5",
  ratio: "16:9",
})));

// 消费者 (kira-video-worker)
for await (const msg of consumer) {
  const task = JSON.parse(sc.decode(msg.data));
  await processVideoTask(task);
  msg.ack();
}

音乐生成队列

// 生产者 (kira-be)
await js.publish(`tasks.music.suno_unofficial.${taskId}`, sc.encode(JSON.stringify({
  taskId,
  userId: "123",
  threadId: "456",
  provider: "suno_unofficial",
  title: "My Song",
  lyrics: "[Verse]\nHello world...",
  prompt: "upbeat pop",
})));

// 消费者 (kira-music-worker)
// Subject: tasks.music.* (匹配所有音乐 Provider)
for await (const msg of consumer) {
  const task = JSON.parse(sc.decode(msg.data));
  await processMusicTask(task);
  msg.ack();
}
音乐任务 subject 格式为 tasks.music.{provider}.{taskId},其中 provider 为 suno_unofficial / suno_instrumental / suno_add_vocals

异步通知

// 发送通知
await js.publish("notifications.email", {
  to: "user@example.com",
  subject: "Your image is ready",
});
JetStream 的 workqueue 模式确保每条消息只被消费一次,适合任务队列场景。