diff --git a/app/lib/queue.ts b/app/lib/queue.ts index 1a7746f..cc60775 100644 --- a/app/lib/queue.ts +++ b/app/lib/queue.ts @@ -7,13 +7,17 @@ import { db } from "./db.server"; const REDIS_URL = process.env.REDIS_URL; -// If REDIS_URL is provided, use BullMQ + Redis; otherwise fallback to an in-process queue for dev/tests +let analyzeQueue: any = undefined; +let worker: any = undefined; +let enqueueAnalyze: (ticker: string, input: any) => Promise | string; +let getJob: (jobId: string) => Promise; + if (REDIS_URL) { const redis = new IORedis(REDIS_URL as string); - export const analyzeQueue = new Queue("analyze", { connection: redis }); + analyzeQueue = new Queue("analyze", { connection: redis }); // Worker to process analyze jobs - export const worker = new Worker( + worker = new Worker( "analyze", async (job: any) => { console.log("[queue] Processing analyze job", job.id, job.data.ticker); @@ -61,19 +65,19 @@ if (REDIS_URL) { { connection: redis } ); - export async function enqueueAnalyze(ticker: string, input: any) { + enqueueAnalyze = async (ticker: string, input: any) => { const job = await analyzeQueue.add("analyze", { ticker, input }); return job.id?.toString(); - } + }; - export async function getJob(jobId: string) { + getJob = async (jobId: string) => { const job = await analyzeQueue.getJob(jobId); if (!job) return null; const state = await job.getState(); const failedReason = job.failedReason || null; const returnValue = job.returnvalue || null; return { id: job.id, state, failedReason, returnValue }; - } + }; } else { // In-process fallback queue for environments without Redis (dev/tests) type Job = { id: string; ticker: string; input: any; state: "queued" | "processing" | "completed" | "failed"; result?: any; failedReason?: string }; @@ -85,14 +89,14 @@ if (REDIS_URL) { return `${Date.now()}-${Math.floor(Math.random() * 100000)}`; } - export function enqueueAnalyze(ticker: string, input: any) { + enqueueAnalyze = (ticker: string, input: any) => { const id = makeId(); const job: Job = { id, ticker, input, state: "queued" }; queue.push(job); jobsById[id] = job; if (!processing) processQueue().catch((e) => console.error("inproc queue error:", e)); return id; - } + }; async function processQueue() { processing = true; @@ -142,9 +146,12 @@ if (REDIS_URL) { processing = false; } - export async function getJob(jobId: string) { + getJob = async (jobId: string) => { const job = jobsById[jobId]; if (!job) return null; return { id: job.id, state: job.state, failedReason: job.failedReason || null, returnValue: job.result || null }; - } + }; } + +export { enqueueAnalyze, getJob, analyzeQueue, worker }; +