Broker 角色
kira-agent 用 Redis 把 agent run 和 SSE stream 解耦,并在多 instance 间做协调 —— 不用 PUB/SUB,只用普通 key + Stream + 轮询。实现见src/lib/agentTask.ts。
- Redis = Aiven Dragonfly,独立实例
kira-agent-dragonfly,TLS(rediss://…aivencloud.com:19782)。 - 连接串走
REDIS_URL(Fly secret);ioredis见rediss://scheme 自动启 TLS,无需额外配置。 - 单例 client 走
redis;SSE handler 的XREAD BLOCK用独占连接createBlockingRedis()(blocking 调用会独占连接,共享会让所有 reader 串行)。
Key Schema
| Key | 类型 | 用途 | TTL |
|---|---|---|---|
agent:lock:{threadId} | STRING | thread 级 dedup 锁,value = {instanceId}:{taskId} | EX 60,heartbeat 每 20s 续 |
agent:task:{taskId} | HASH | task 元数据(status / threadId / userId / instanceId / startedAt / lastHeartbeatAt / error / isAborted / userMessage) | 启动 600s,complete/fail 降到 120s |
agent:stream:{taskId} | STREAM | chunk batch + sentinel,单 field v(JSON envelope) | 与 task 同步(complete/fail 时 EXPIRE 到 120s) |
agent:abort:{taskId} | STRING | abort 信号 "1" | EX 600,消费方走 TTL,不主动 DEL |
stream / task HASH 走 TTL 自然蒸发,锁走 check-and-set DEL。cleanup 不主动 DEL stream,这样 reader 永远能读到完整收尾(包括 sentinel),避免 race。没有启动期的 stale-task sweep —— 锁 TTL 自管。
dedup lock(threadId 维度)
同一 thread 不能并发跑两个 task:- 续锁:run 每 20s heartbeat,Lua check-and-set —— 只有
GET == instanceId:taskId才EXPIRE续 TTL。锁过期 = 原 owner 死了,thread 可被重新抢。 - 释放:run 结束时同样 check-and-set DEL,只有 owner 能删,避免误删别人的锁。
- 409 续连:
POST /task抢锁失败时,读 lock value 取第二段(taskId)返回给客户端,让它直接GET /stream/:taskId。
abort 信号
EXISTS agent:abort:{taskId},命中就 controller.abort()。DELETE /task/:taskId 只在 task 仍 running 时调 requestAbort(幂等)。abort key 用独立 STRING + EXISTS 做 O(1) 判断,不塞进 task HASH。
abort key 靠 TTL(600s)自然过期,run 消费后不主动 DEL ——
isAbortRequested 只做 EXISTS,无删除路径。Stream:单 field v 的 JSON envelope
每个 stream entry 只有一个 field v,装一个 JSON envelope。这样 reader 无需区分多种 shape:
- 批量写:run 期间用
ChunkBatcher,后台 timer 每 75ms flush 一次缓冲(把多个 chunk 合成一个chunksenvelope 再XADD),减少写放大。 - sentinel:
done/error也是普通 stream entry,是流的最后一条 —— reader 见到非chunks的 envelope 就退出。
SSE 续读:XREAD + Last-Event-ID
GET /stream/:taskId 用客户端的 Last-Event-ID header 作为游标续读:
BLOCK 等新 entries
XREAD BLOCK 3000 COUNT 50 STREAMS agent:stream:{taskId} <cursor>(blockingReadChunksAfter,独占连接)。游标为空时用 "0"。超时探活
BLOCK 3s 超时返 null:查
getTask,若 task 已被 GC(TTL 过期)发 event: error {message:"stream expired"} 收尾;否则写一行 SSE keepalive 注释 : keepalive 防各层 idle 关连接,继续 BLOCK。finally 里 blocking.disconnect(),run 不受影响。