Jobs API: expose getJob and listRecentJobs; use unified queue module for job status and history; UI can query /api/jobs?ticker=...\n\nCo-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
This commit is contained in:
+16
-1
@@ -11,6 +11,7 @@ let analyzeQueue: any = undefined;
|
|||||||
let worker: any = undefined;
|
let worker: any = undefined;
|
||||||
let enqueueAnalyze: (ticker: string, input: any) => Promise<string> | string;
|
let enqueueAnalyze: (ticker: string, input: any) => Promise<string> | string;
|
||||||
let getJob: (jobId: string) => Promise<any | null>;
|
let getJob: (jobId: string) => Promise<any | null>;
|
||||||
|
let listRecentJobs: (ticker?: string, limit?: number) => Promise<any[]>;
|
||||||
|
|
||||||
if (REDIS_URL) {
|
if (REDIS_URL) {
|
||||||
const redis = new IORedis(REDIS_URL as string);
|
const redis = new IORedis(REDIS_URL as string);
|
||||||
@@ -78,6 +79,13 @@ if (REDIS_URL) {
|
|||||||
const returnValue = job.returnvalue || null;
|
const returnValue = job.returnvalue || null;
|
||||||
return { id: job.id, state, failedReason, returnValue };
|
return { id: job.id, state, failedReason, returnValue };
|
||||||
};
|
};
|
||||||
|
|
||||||
|
listRecentJobs = async (ticker?: string, limit = 50) => {
|
||||||
|
const jobs = await analyzeQueue.getJobs(["waiting", "active", "completed", "failed", "delayed"], 0, limit - 1);
|
||||||
|
const mapped = await Promise.all(jobs.map(async (j: any) => ({ id: j.id, name: j.name, data: j.data, state: await j.getState(), returnValue: j.returnvalue || null })));
|
||||||
|
if (ticker) return mapped.filter((j: any) => j.data?.ticker === ticker);
|
||||||
|
return mapped;
|
||||||
|
};
|
||||||
} else {
|
} else {
|
||||||
// In-process fallback queue for environments without Redis (dev/tests)
|
// In-process fallback queue for environments without Redis (dev/tests)
|
||||||
type Job = { id: string; ticker: string; input: any; state: "queued" | "processing" | "completed" | "failed"; result?: any; failedReason?: string };
|
type Job = { id: string; ticker: string; input: any; state: "queued" | "processing" | "completed" | "failed"; result?: any; failedReason?: string };
|
||||||
@@ -98,6 +106,13 @@ if (REDIS_URL) {
|
|||||||
return id;
|
return id;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
listRecentJobs = async (ticker?: string, limit = 50) => {
|
||||||
|
const items = Object.values(jobsById).slice(-limit).reverse().map((j) => ({ id: j.id, data: { ticker: j.ticker }, state: j.state, returnValue: j.result || null }));
|
||||||
|
if (ticker) return items.filter((it) => it.data?.ticker === ticker);
|
||||||
|
return items;
|
||||||
|
};
|
||||||
|
|
||||||
|
|
||||||
async function processQueue() {
|
async function processQueue() {
|
||||||
processing = true;
|
processing = true;
|
||||||
while (queue.length > 0) {
|
while (queue.length > 0) {
|
||||||
@@ -153,5 +168,5 @@ if (REDIS_URL) {
|
|||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
export { enqueueAnalyze, getJob, analyzeQueue, worker };
|
export { enqueueAnalyze, getJob, listRecentJobs, analyzeQueue, worker };
|
||||||
|
|
||||||
|
|||||||
@@ -1,15 +1,13 @@
|
|||||||
import { Queue } from "bullmq";
|
import { getJob } from "../../../lib/queue";
|
||||||
|
|
||||||
export async function loader({ params }: { params: { jobId: string } }) {
|
export async function loader({ params }: { params: { jobId: string } }) {
|
||||||
const jobId = params.jobId;
|
const jobId = params.jobId;
|
||||||
if (!jobId) return Response.json({ error: "jobId required" }, { status: 400 });
|
if (!jobId) return Response.json({ error: "jobId required" }, { status: 400 });
|
||||||
|
|
||||||
try {
|
try {
|
||||||
const q = new Queue("analyze", { connection: process.env.REDIS_URL ? { connection: process.env.REDIS_URL } : undefined });
|
const job = await getJob(jobId);
|
||||||
const job = await q.getJob(jobId);
|
|
||||||
if (!job) return Response.json({ error: "Job not found" }, { status: 404 });
|
if (!job) return Response.json({ error: "Job not found" }, { status: 404 });
|
||||||
const state = await job.getState();
|
return Response.json(job);
|
||||||
return Response.json({ id: job.id, state, failedReason: job.failedReason || null, returnValue: job.returnvalue || null });
|
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
console.error("/api/jobs loader error:", err);
|
console.error("/api/jobs loader error:", err);
|
||||||
return Response.json({ error: "internal" }, { status: 500 });
|
return Response.json({ error: "internal" }, { status: 500 });
|
||||||
|
|||||||
@@ -0,0 +1,14 @@
|
|||||||
|
import { listRecentJobs } from "../../../lib/queue";
|
||||||
|
|
||||||
|
export async function loader({ request }: { request: Request }) {
|
||||||
|
const url = new URL(request.url);
|
||||||
|
const ticker = url.searchParams.get("ticker") || undefined;
|
||||||
|
const limit = parseInt(url.searchParams.get("limit") || "50", 10);
|
||||||
|
try {
|
||||||
|
const jobs = await listRecentJobs(ticker || undefined, limit);
|
||||||
|
return Response.json({ jobs });
|
||||||
|
} catch (err) {
|
||||||
|
console.error("/api/jobs index error:", err);
|
||||||
|
return Response.json({ error: "internal" }, { status: 500 });
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user