diff --git a/app/lib/queue.ts b/app/lib/queue.ts index cc60775..4b560f5 100644 --- a/app/lib/queue.ts +++ b/app/lib/queue.ts @@ -11,6 +11,7 @@ let analyzeQueue: any = undefined; let worker: any = undefined; let enqueueAnalyze: (ticker: string, input: any) => Promise | string; let getJob: (jobId: string) => Promise; +let listRecentJobs: (ticker?: string, limit?: number) => Promise; if (REDIS_URL) { const redis = new IORedis(REDIS_URL as string); @@ -78,6 +79,13 @@ if (REDIS_URL) { const returnValue = job.returnvalue || null; return { id: job.id, state, failedReason, returnValue }; }; + + listRecentJobs = async (ticker?: string, limit = 50) => { + const jobs = await analyzeQueue.getJobs(["waiting", "active", "completed", "failed", "delayed"], 0, limit - 1); + const mapped = await Promise.all(jobs.map(async (j: any) => ({ id: j.id, name: j.name, data: j.data, state: await j.getState(), returnValue: j.returnvalue || null }))); + if (ticker) return mapped.filter((j: any) => j.data?.ticker === ticker); + return mapped; + }; } else { // In-process fallback queue for environments without Redis (dev/tests) type Job = { id: string; ticker: string; input: any; state: "queued" | "processing" | "completed" | "failed"; result?: any; failedReason?: string }; @@ -98,6 +106,13 @@ if (REDIS_URL) { return id; }; + listRecentJobs = async (ticker?: string, limit = 50) => { + const items = Object.values(jobsById).slice(-limit).reverse().map((j) => ({ id: j.id, data: { ticker: j.ticker }, state: j.state, returnValue: j.result || null })); + if (ticker) return items.filter((it) => it.data?.ticker === ticker); + return items; + }; + + async function processQueue() { processing = true; while (queue.length > 0) { @@ -153,5 +168,5 @@ if (REDIS_URL) { }; } -export { enqueueAnalyze, getJob, analyzeQueue, worker }; +export { enqueueAnalyze, getJob, listRecentJobs, analyzeQueue, worker }; diff --git a/app/routes/api/jobs/$jobId.ts b/app/routes/api/jobs/$jobId.ts index 861cc40..fb87c61 100644 --- a/app/routes/api/jobs/$jobId.ts +++ b/app/routes/api/jobs/$jobId.ts @@ -1,15 +1,13 @@ -import { Queue } from "bullmq"; +import { getJob } from "../../../lib/queue"; export async function loader({ params }: { params: { jobId: string } }) { const jobId = params.jobId; if (!jobId) return Response.json({ error: "jobId required" }, { status: 400 }); try { - const q = new Queue("analyze", { connection: process.env.REDIS_URL ? { connection: process.env.REDIS_URL } : undefined }); - const job = await q.getJob(jobId); + const job = await getJob(jobId); if (!job) return Response.json({ error: "Job not found" }, { status: 404 }); - const state = await job.getState(); - return Response.json({ id: job.id, state, failedReason: job.failedReason || null, returnValue: job.returnvalue || null }); + return Response.json(job); } catch (err) { console.error("/api/jobs loader error:", err); return Response.json({ error: "internal" }, { status: 500 }); diff --git a/app/routes/api/jobs/index.ts b/app/routes/api/jobs/index.ts new file mode 100644 index 0000000..86794ec --- /dev/null +++ b/app/routes/api/jobs/index.ts @@ -0,0 +1,14 @@ +import { listRecentJobs } from "../../../lib/queue"; + +export async function loader({ request }: { request: Request }) { + const url = new URL(request.url); + const ticker = url.searchParams.get("ticker") || undefined; + const limit = parseInt(url.searchParams.get("limit") || "50", 10); + try { + const jobs = await listRecentJobs(ticker || undefined, limit); + return Response.json({ jobs }); + } catch (err) { + console.error("/api/jobs index error:", err); + return Response.json({ error: "internal" }, { status: 500 }); + } +}