Queue: unify exports; support BullMQ when REDIS_URL set, otherwise in-process fallback

This commit is contained in:
2026-05-16 14:43:52 +02:00
parent 9167bd8912
commit 9771f48028
+18 -11
View File
@@ -7,13 +7,17 @@ import { db } from "./db.server";
const REDIS_URL = process.env.REDIS_URL; const REDIS_URL = process.env.REDIS_URL;
// If REDIS_URL is provided, use BullMQ + Redis; otherwise fallback to an in-process queue for dev/tests let analyzeQueue: any = undefined;
let worker: any = undefined;
let enqueueAnalyze: (ticker: string, input: any) => Promise<string> | string;
let getJob: (jobId: string) => Promise<any | null>;
if (REDIS_URL) { if (REDIS_URL) {
const redis = new IORedis(REDIS_URL as string); const redis = new IORedis(REDIS_URL as string);
export const analyzeQueue = new Queue("analyze", { connection: redis }); analyzeQueue = new Queue("analyze", { connection: redis });
// Worker to process analyze jobs // Worker to process analyze jobs
export const worker = new Worker( worker = new Worker(
"analyze", "analyze",
async (job: any) => { async (job: any) => {
console.log("[queue] Processing analyze job", job.id, job.data.ticker); console.log("[queue] Processing analyze job", job.id, job.data.ticker);
@@ -61,19 +65,19 @@ if (REDIS_URL) {
{ connection: redis } { connection: redis }
); );
export async function enqueueAnalyze(ticker: string, input: any) { enqueueAnalyze = async (ticker: string, input: any) => {
const job = await analyzeQueue.add("analyze", { ticker, input }); const job = await analyzeQueue.add("analyze", { ticker, input });
return job.id?.toString(); return job.id?.toString();
} };
export async function getJob(jobId: string) { getJob = async (jobId: string) => {
const job = await analyzeQueue.getJob(jobId); const job = await analyzeQueue.getJob(jobId);
if (!job) return null; if (!job) return null;
const state = await job.getState(); const state = await job.getState();
const failedReason = job.failedReason || null; const failedReason = job.failedReason || null;
const returnValue = job.returnvalue || null; const returnValue = job.returnvalue || null;
return { id: job.id, state, failedReason, returnValue }; return { id: job.id, state, failedReason, returnValue };
} };
} else { } else {
// In-process fallback queue for environments without Redis (dev/tests) // In-process fallback queue for environments without Redis (dev/tests)
type Job = { id: string; ticker: string; input: any; state: "queued" | "processing" | "completed" | "failed"; result?: any; failedReason?: string }; type Job = { id: string; ticker: string; input: any; state: "queued" | "processing" | "completed" | "failed"; result?: any; failedReason?: string };
@@ -85,14 +89,14 @@ if (REDIS_URL) {
return `${Date.now()}-${Math.floor(Math.random() * 100000)}`; return `${Date.now()}-${Math.floor(Math.random() * 100000)}`;
} }
export function enqueueAnalyze(ticker: string, input: any) { enqueueAnalyze = (ticker: string, input: any) => {
const id = makeId(); const id = makeId();
const job: Job = { id, ticker, input, state: "queued" }; const job: Job = { id, ticker, input, state: "queued" };
queue.push(job); queue.push(job);
jobsById[id] = job; jobsById[id] = job;
if (!processing) processQueue().catch((e) => console.error("inproc queue error:", e)); if (!processing) processQueue().catch((e) => console.error("inproc queue error:", e));
return id; return id;
} };
async function processQueue() { async function processQueue() {
processing = true; processing = true;
@@ -142,9 +146,12 @@ if (REDIS_URL) {
processing = false; processing = false;
} }
export async function getJob(jobId: string) { getJob = async (jobId: string) => {
const job = jobsById[jobId]; const job = jobsById[jobId];
if (!job) return null; if (!job) return null;
return { id: job.id, state: job.state, failedReason: job.failedReason || null, returnValue: job.result || null }; return { id: job.id, state: job.state, failedReason: job.failedReason || null, returnValue: job.result || null };
} };
} }
export { enqueueAnalyze, getJob, analyzeQueue, worker };