Skip to main content

Broker 角色

kira-agent 用 Redis 把 agent runSSE stream 解耦,并在多 instance 间做协调 —— 不用 PUB/SUB,只用普通 key + Stream + 轮询。实现见 src/lib/agentTask.ts
  • Redis = Aiven Dragonfly,独立实例 kira-agent-dragonfly,TLS(rediss://…aivencloud.com:19782)。
  • 连接串走 REDIS_URL(Fly secret);ioredisrediss:// scheme 自动启 TLS,无需额外配置。
  • 单例 client 走 redis;SSE handler 的 XREAD BLOCK 用独占连接 createBlockingRedis()(blocking 调用会独占连接,共享会让所有 reader 串行)。

Key Schema

Key类型用途TTL
agent:lock:{threadId}STRINGthread 级 dedup 锁,value = {instanceId}:{taskId}EX 60,heartbeat 每 20s 续
agent:task:{taskId}HASHtask 元数据(status / threadId / userId / instanceId / startedAt / lastHeartbeatAt / error / isAborted / userMessage)启动 600s,complete/fail 降到 120s
agent:stream:{taskId}STREAMchunk batch + sentinel,单 field v(JSON envelope)与 task 同步(complete/fail 时 EXPIRE 到 120s)
agent:abort:{taskId}STRINGabort 信号 "1"EX 600,消费方走 TTL,不主动 DEL
// src/lib/agentTask.ts
const LOCK_TTL_SEC = 60;
const TASK_TTL_SEC = 600;
const COMPLETED_TTL_SEC = 120;
const ABORT_TTL_SEC = 600;
stream / task HASH 走 TTL 自然蒸发,锁走 check-and-set DEL。cleanup 不主动 DEL stream,这样 reader 永远能读到完整收尾(包括 sentinel),避免 race。没有启动期的 stale-task sweep —— 锁 TTL 自管。

dedup lock(threadId 维度)

同一 thread 不能并发跑两个 task:
// 抢锁:NX + EX,value = instanceId:taskId
await redis.set(lockKey(threadId), `${instanceId}:${taskId}`, "EX", 60, "NX");
  • 续锁:run 每 20s heartbeat,Lua check-and-set —— 只有 GET == instanceId:taskIdEXPIRE 续 TTL。锁过期 = 原 owner 死了,thread 可被重新抢。
  • 释放:run 结束时同样 check-and-set DEL,只有 owner 能删,避免误删别人的锁。
  • 409 续连:POST /task 抢锁失败时,读 lock value 取第二段(taskId)返回给客户端,让它直接 GET /stream/:taskId

abort 信号

export async function requestAbort(taskId: string) {
  await redis.set(abortKey(taskId), "1", "EX", 600);
}
export async function isAbortRequested(taskId: string) {
  return (await redis.exists(abortKey(taskId))) === 1; // O(1) EXISTS
}
run loop 每 2s 轮询 EXISTS agent:abort:{taskId},命中就 controller.abort()DELETE /task/:taskId 只在 task 仍 running 时调 requestAbort(幂等)。abort key 用独立 STRING + EXISTS 做 O(1) 判断,不塞进 task HASH。
abort key 靠 TTL(600s)自然过期,run 消费后不主动 DEL —— isAbortRequested 只做 EXISTS,无删除路径。

Stream:单 field v 的 JSON envelope

每个 stream entry 只有一个 field v,装一个 JSON envelope。这样 reader 无需区分多种 shape:
type StreamEnvelope =
  | { type: "chunks"; chunks: unknown[] }   // 普通 chunk batch
  | { type: "done" }                         // 收尾 sentinel(成功)
  | { type: "error"; message: string };      // 收尾 sentinel(失败/aborted)

// 写入
await redis.xadd(streamKey(taskId), "*", "v", JSON.stringify(env));
  • 批量写:run 期间用 ChunkBatcher,后台 timer 每 75ms flush 一次缓冲(把多个 chunk 合成一个 chunks envelope 再 XADD),减少写放大。
  • sentinel:done / error 也是普通 stream entry,是流的最后一条 —— reader 见到非 chunks 的 envelope 就退出。

SSE 续读:XREAD + Last-Event-ID

GET /stream/:taskId 用客户端的 Last-Event-ID header 作为游标续读:
1

Replay 已有 entries

XRANGE(lastId 之后的所有 entries 先补发(readChunksFrom)。遇到 sentinel 直接结束。
2

BLOCK 等新 entries

XREAD BLOCK 3000 COUNT 50 STREAMS agent:stream:{taskId} <cursor>(blockingReadChunksAfter,独占连接)。游标为空时用 "0"
3

超时探活

BLOCK 3s 超时返 null:查 getTask,若 task 已被 GC(TTL 过期)发 event: error {message:"stream expired"} 收尾;否则写一行 SSE keepalive 注释 : keepalive 防各层 idle 关连接,继续 BLOCK。
4

逐帧下发

每条 entry 翻成 SSE 帧并带 id(= Redis entry id),供下次 Last-Event-ID 续读;遇 sentinel 关闭。
帧格式见 Stream API。浏览器断开时 reader 在 finallyblocking.disconnect(),run 不受影响。

参考