Skip to main content

概述

基于 pgmq 的消息队列消费者,用于处理异步邮件任务。

技术栈

组件技术
运行时Bun
队列pgmq (PostgreSQL)
部署Fly.io

架构

队列消费

// kira-queue/index.ts
import { createClient } from "@supabase/supabase-js";

const dailyMailQueue = async () => {
  const supabase = createClient(
    process.env.SUPABASE_URL!,
    process.env.SUPABASE_KEY!
  );

  while (true) {
    // 从 pgmq 队列中 pop 消息
    const { data } = await supabase
      .schema("pgmq_public")
      .rpc("pop", { queue_name: "daily_email" });

    if (data && data.length > 0) {
      for (const item of data) {
        const { userId } = item.message;

        // 调用后端 API 发送邮件
        await fetch(`${process.env.API_URL}/mail/daily`, {
          method: "POST",
          body: JSON.stringify({ userId }),
        });
      }
    }

    // 每 60 秒轮询
    await sleep(60 * 1000);
  }
};

环境变量

变量说明
SUPABASE_URLSupabase 项目 URL
SUPABASE_KEYSupabase service role key
API_URLKira BE API 地址

本地开发

cd kira-queue
bun install
bun run index.ts

部署

部署在 Fly.io:
fly deploy

消息格式

interface DailyEmailMessage {
  userId: string;
}

相关 API

邮件发送由 Kira BE 的 /mail/daily 接口处理,详见 Billing API - Daily