kira-nats 是基于 NATS JetStream 的分布式消息队列,部署在 SJC 区域的 3 节点集群,提供高可用的持久化消息服务。
部署信息
| 配置 | 值 |
|---|
| 平台 | Fly.io |
| 区域 | sjc (San Jose) |
| 节点数 | 3 (RAFT 集群) |
| 内存 | 2GB / 节点 |
| CPU | 2 vCPU (shared) / 节点 |
| 存储 | 1GB Volume / 节点 |
| 客户端端口 | 4222 |
| 集群端口 | 6222 |
| 监控端口 | 8222 |
| 内部地址 | kira-nats.internal:4222 |
架构设计
3 节点 RAFT 集群
- 高可用:任意 1 节点故障,集群继续工作
- 数据持久化:JetStream 将消息存储到 Volume
- 自动选主:RAFT 共识算法自动选举 Leader
- 全连接:每个节点与其他所有节点保持连接
动态节点发现
启动脚本自动发现所有节点:
# 查询 DNS 获取所有节点 IP
PEERS=$(getent hosts kira-nats.internal)
# 动态生成配置
routes: [
"nats-route://[ip1]:6222",
"nats-route://[ip2]:6222",
"nats-route://[ip3]:6222"
]
跨区域访问
FRA 区域的 kira-be 可以通过 Fly.io 内网访问 SJC 的 NATS:
- 延迟:约 130-150ms(FRA → SJC)
- 连接:通过
.internal DNS 自动路由
客户端连接
Node.js / Bun
import { connect, StringCodec } from "nats";
const nc = await connect({
servers: "kira-nats.internal:4222",
token: process.env.NATS_AUTH_TOKEN,
});
const sc = StringCodec();
// 发布消息
nc.publish("tasks.video.abc123", sc.encode(JSON.stringify({ taskId: "abc123", userId: "user1" })));
// 订阅消息
const sub = nc.subscribe("tasks.video.*");
for await (const msg of sub) {
const data = JSON.parse(sc.decode(msg.data));
console.log("收到任务:", data);
}
JetStream (持久化)
import { connect } from "nats";
const nc = await connect({
servers: "kira-nats.internal:4222",
token: process.env.NATS_AUTH_TOKEN,
});
const js = nc.jetstream();
// 创建 Stream
const jsm = await nc.jetstreamManager();
await jsm.streams.add({
name: "TASKS",
subjects: ["tasks.>"],
retention: "workqueue", // 消费后删除
storage: "file", // 持久化到磁盘
replicas: 3, // 3 副本
});
// 发布消息
await js.publish("tasks.video.abc123", sc.encode(JSON.stringify({ taskId: "abc123", userId: "user1" })));
// 消费消息
const consumer = await js.consumers.get("TASKS", "worker");
const messages = await consumer.consume();
for await (const msg of messages) {
const data = JSON.parse(sc.decode(msg.data));
await processTask(data);
msg.ack(); // 确认消费
}
cd kira-nats
# 首次部署
fly apps create kira-nats
# 创建 3 个 Volume(每个节点一个)
fly volumes create nats_data --region sjc --size 1 --yes
fly volumes create nats_data --region sjc --size 1 --yes
fly volumes create nats_data --region sjc --size 1 --yes
# 设置认证 Token
fly secrets set NATS_AUTH_TOKEN="xxx"
# 部署
fly deploy
# 扩展到 3 节点
fly scale count 3 --region sjc --yes
扩展节点
# 先创建 Volume
fly volumes create nats_data --region sjc --size 1 --yes
fly volumes create nats_data --region sjc --size 1 --yes
# 扩展到 5 节点
fly scale count 5 --region sjc --yes
# 重启现有节点以发现新节点
fly machines restart -a kira-nats
RAFT 集群建议使用奇数节点(3, 5, 7)以避免脑裂。
测试集群
# SSH 进入节点
fly ssh console -a kira-nats
# 查看集群状态
wget -qO- http://localhost:8222/jsz
# 查看路由连接
wget -qO- http://localhost:8222/routez
监控端点
| 端点 | 说明 |
|---|
/healthz | 健康检查 |
/varz | 服务器状态 |
/connz | 客户端连接 |
/routez | 集群路由 |
/jsz | JetStream 状态 |
关键指标
# JetStream 集群状态
curl http://kira-nats.internal:8222/jsz
返回:
{
"meta_cluster": {
"name": "kira-nats-cluster",
"leader": "nats-xxx",
"cluster_size": 3
},
"streams": 1,
"consumers": 2,
"messages": 1000,
"bytes": 102400
}
建议报警
| 指标 | Warning | Critical |
|---|
| cluster_size | < 3 | < 2 |
| pending messages | > 1000 | > 10000 |
| consumer lag | > 100 | > 1000 |
文件结构
kira-nats/
├── .github/
│ └── workflows/
│ └── deploy.yml # CI/CD
├── Dockerfile # NATS + 启动脚本
├── docker-entrypoint.sh # 动态节点发现
├── nats.conf # NATS 配置模板
└── fly.toml # Fly.io 配置
配置详情
docker-entrypoint.sh
#!/bin/sh
# 动态发现所有节点
PEERS=$(getent hosts kira-nats.internal | awk '{print $1}')
# 构建 routes 配置
ROUTES=""
for ip in $PEERS; do
ROUTES="${ROUTES}\"nats-route://[$ip]:6222\","
done
# 生成运行时配置
cat > /etc/nats/nats-runtime.conf << EOF
server_name: nats-$(hostname | cut -c1-12)
port: 4222
http_port: 8222
cluster {
name: kira-nats-cluster
port: 6222
routes: [$ROUTES]
}
jetstream {
store_dir: /data/jetstream
max_memory_store: 512MB
max_file_store: 2GB
}
authorization {
token: $NATS_AUTH_TOKEN
}
EOF
exec nats-server -c /etc/nats/nats-runtime.conf
fly.toml
app = "kira-nats"
primary_region = "sjc"
[build]
dockerfile = "Dockerfile"
[mounts]
source = "nats_data"
destination = "/data"
[[services]]
internal_port = 4222
protocol = "tcp"
auto_stop_machines = false
[[services]]
internal_port = 6222
protocol = "tcp"
[[services]]
internal_port = 8222
protocol = "tcp"
[[vm]]
memory = '2gb'
cpu_kind = 'shared'
cpus = 2
使用场景
视频生成队列
// 生产者 (kira-be)
await js.publish(`tasks.video.${taskId}`, sc.encode(JSON.stringify({
taskId,
userId: "123",
threadId: "456",
provider: "seedance",
prompt: "A beautiful sunset",
duration: "5",
ratio: "16:9",
})));
// 消费者 (kira-video-worker)
for await (const msg of consumer) {
const task = JSON.parse(sc.decode(msg.data));
await processVideoTask(task);
msg.ack();
}
音乐生成队列
// 生产者 (kira-be)
await js.publish(`tasks.music.suno_unofficial.${taskId}`, sc.encode(JSON.stringify({
taskId,
userId: "123",
threadId: "456",
provider: "suno_unofficial",
title: "My Song",
lyrics: "[Verse]\nHello world...",
prompt: "upbeat pop",
})));
// 消费者 (kira-music-worker)
// Subject: tasks.music.* (匹配所有音乐 Provider)
for await (const msg of consumer) {
const task = JSON.parse(sc.decode(msg.data));
await processMusicTask(task);
msg.ack();
}
音乐任务 subject 格式为 tasks.music.{provider}.{taskId},其中 provider 为 suno_unofficial / suno_instrumental / suno_add_vocals。
异步通知
// 发送通知
await js.publish("notifications.email", {
to: "user@example.com",
subject: "Your image is ready",
});
JetStream 的 workqueue 模式确保每条消息只被消费一次,适合任务队列场景。