diff --git a/app/components/JobHistory.tsx b/app/components/JobHistory.tsx index 342d5c0..3c32b12 100644 --- a/app/components/JobHistory.tsx +++ b/app/components/JobHistory.tsx @@ -9,6 +9,7 @@ export default function JobHistory({ ticker }: Props) { const [jobs, setJobs] = useState([]); const [loading, setLoading] = useState(false); const [selected, setSelected] = useState(null); + const navigate = useNavigate(); const fetchJobs = async () => { setLoading(true); @@ -28,6 +29,21 @@ export default function JobHistory({ ticker }: Props) { } }; + const cancel = async (jobId: string) => { + try { + const res = await fetch(`/api/jobs/${jobId}/cancel`, { method: "POST" }); + const data = await res.json(); + if (res.ok) { + // Refresh list + fetchJobs(); + return data.cancelled === true; + } + } catch (e) { + console.warn("Cancel failed:", e); + } + return false; + }; + useEffect(() => { fetchJobs(); const id = setInterval(fetchJobs, 8000); @@ -58,7 +74,13 @@ export default function JobHistory({ ticker }: Props) { {loading ? "Loading..." : `${jobs.length} jobs`}
- {jobs.length === 0 ? ( + {loading ? ( +
+
+
+
+
+ ) : jobs.length === 0 ? (

No recent jobs for {ticker}

) : ( jobs.map((j: any) => ( @@ -66,7 +88,7 @@ export default function JobHistory({ ticker }: Props) {
Job: {j.id}
-
State: {j.state}
+
State: {j.state}
API @@ -76,6 +98,14 @@ export default function JobHistory({ ticker }: Props) { > Details + {(j.state === 'waiting' || j.state === 'queued') && ( + + )}
diff --git a/app/lib/queue.ts b/app/lib/queue.ts index 4b560f5..862e952 100644 --- a/app/lib/queue.ts +++ b/app/lib/queue.ts @@ -12,6 +12,7 @@ let worker: any = undefined; let enqueueAnalyze: (ticker: string, input: any) => Promise | string; let getJob: (jobId: string) => Promise; let listRecentJobs: (ticker?: string, limit?: number) => Promise; +let cancelJob: (jobId: string) => Promise; if (REDIS_URL) { const redis = new IORedis(REDIS_URL as string); @@ -86,6 +87,22 @@ if (REDIS_URL) { if (ticker) return mapped.filter((j: any) => j.data?.ticker === ticker); return mapped; }; + + cancelJob = async (jobId: string) => { + try { + const job = await analyzeQueue.getJob(jobId); + if (!job) return false; + const state = await job.getState(); + if (state === "waiting" || state === "delayed") { + await job.remove(); + return true; + } + return false; + } catch (err) { + console.error("cancelJob error:", err); + return false; + } + }; } else { // In-process fallback queue for environments without Redis (dev/tests) type Job = { id: string; ticker: string; input: any; state: "queued" | "processing" | "completed" | "failed"; result?: any; failedReason?: string }; @@ -112,6 +129,20 @@ if (REDIS_URL) { return items; }; + cancelJob = async (jobId: string) => { + const job = jobsById[jobId]; + if (!job) return false; + // If queued but not yet processing, remove from queue + if (job.state === "queued") { + const idx = queue.findIndex((q) => q.id === jobId); + if (idx !== -1) queue.splice(idx, 1); + job.state = "failed"; + job.failedReason = "cancelled"; + return true; + } + // Can't cancel if already processing/completed/failed + return false; + }; async function processQueue() { processing = true; @@ -168,5 +199,5 @@ if (REDIS_URL) { }; } -export { enqueueAnalyze, getJob, listRecentJobs, analyzeQueue, worker }; +export { enqueueAnalyze, getJob, listRecentJobs, cancelJob, analyzeQueue, worker }; diff --git a/app/routes/api/jobs/$jobId/cancel.ts b/app/routes/api/jobs/$jobId/cancel.ts new file mode 100644 index 0000000..d16c877 --- /dev/null +++ b/app/routes/api/jobs/$jobId/cancel.ts @@ -0,0 +1,14 @@ +import { cancelJob } from "../../../lib/queue"; + +export async function action({ params }: { params: { jobId: string } }) { + const jobId = params.jobId; + if (!jobId) return Response.json({ error: "jobId required" }, { status: 400 }); + + try { + const ok = await cancelJob(jobId); + return Response.json({ cancelled: ok }); + } catch (err) { + console.error("/api/jobs cancel error:", err); + return Response.json({ error: "internal" }, { status: 500 }); + } +} diff --git a/app/routes/jobs/$jobId.tsx b/app/routes/jobs/$jobId.tsx index 6d7f7d7..0a65f94 100644 --- a/app/routes/jobs/$jobId.tsx +++ b/app/routes/jobs/$jobId.tsx @@ -58,11 +58,37 @@ export default function JobDetail() { {job?.failedReason && (
Failed: {job.failedReason}
)} + +
+ {job?.state === 'waiting' || job?.state === 'queued' ? ( + + ) : null} + + Open API +
+
Data:
{JSON.stringify(job?.data || job?.returnValue || job, null, 2)}
- Open API +