From 3234a090963b1b634f119128ce5a9cb254f09360 Mon Sep 17 00:00:00 2001 From: Henry Winkel Date: Sat, 16 May 2026 14:28:34 +0200 Subject: [PATCH] Add job status endpoint, persist lastJobId; replace in-process queue with BullMQ-based queue and worker; link job status in UI --- app/components/MostActiveStocks.tsx | 16 +++++- app/lib/queue.ts | 77 +++++++++++++++++++++++++++++ app/routes/analyze.ticker.tsx | 64 +++++++++++++++++++++++- app/routes/api/jobs/$jobId.ts | 17 +++++++ app/routes/api/stocks/index.ts | 3 ++ prisma/schema.prisma | 1 + 6 files changed, 174 insertions(+), 4 deletions(-) create mode 100644 app/lib/queue.ts create mode 100644 app/routes/api/jobs/$jobId.ts diff --git a/app/components/MostActiveStocks.tsx b/app/components/MostActiveStocks.tsx index 3845344..7c07e1d 100644 --- a/app/components/MostActiveStocks.tsx +++ b/app/components/MostActiveStocks.tsx @@ -62,8 +62,20 @@ export default function MostActiveStocks() { const data = await res.json().catch(() => null); throw new Error(data?.error || "Failed to save stock"); } - // trigger analysis in background (non-blocking) - fetch(`/api/analyze`, { method: "POST", headers: { "Content-Type": "application/json" }, body: JSON.stringify({ ticker: symbol, background: true }) }).catch(() => {}); + // trigger analysis in background (non-blocking) and persist jobId to stock record + try { + const analyzeRes = await fetch(`/api/analyze`, { method: "POST", headers: { "Content-Type": "application/json" }, body: JSON.stringify({ ticker: symbol, background: true }) }); + const analyzeData = await analyzeRes.json().catch(() => null); + if (analyzeRes.ok && analyzeData?.jobId) { + const fd = new FormData(); + fd.append("ticker", symbol); + fd.append("lastJobId", analyzeData.jobId.toString()); + await fetch("/api/stocks", { method: "POST", body: fd }); + setSaved((p) => ({ ...p, [symbol]: true })); + } + } catch (err) { + console.warn("Failed to enqueue background analyze:", err); + } setSaved((p) => ({ ...p, [symbol]: true })); } catch (err) { console.error(err); diff --git a/app/lib/queue.ts b/app/lib/queue.ts new file mode 100644 index 0000000..4a2552c --- /dev/null +++ b/app/lib/queue.ts @@ -0,0 +1,77 @@ +import { Queue, Worker, QueueScheduler } from "bullmq"; +import IORedis from "ioredis"; +import { OpenRouterClient } from "./openrouter"; +import { TradingGraph } from "../agents/tradingGraph"; +import { db } from "./db.server"; + +const connection = process.env.REDIS_URL ? { connection: process.env.REDIS_URL } : { host: "127.0.0.1", port: 6379 } as any; + +// BullMQ expects a Redis connection object; use ioredis instance for workers where needed +const redis = process.env.REDIS_URL ? new IORedis(process.env.REDIS_URL) : new IORedis(); + +export const analyzeQueue = new Queue("analyze", { connection: redis }); +export const queueScheduler = new QueueScheduler("analyze", { connection: redis }); + +// Worker to process analyze jobs +export const worker = new Worker( + "analyze", + async (job) => { + console.log("[queue] Processing analyze job", job.id, job.data.ticker); + const { ticker, input } = job.data as { ticker: string; input: any }; + const apiKey = process.env.OPENROUTER_API_KEY; + if (!apiKey || apiKey === "your_openrouter_api_key_here") { + console.log("[queue] mock mode for analyze", ticker); + const mockDecision = { + action: "hold", + confidence: 0.6, + reasoning: `${ticker} analysis - Mock mode (background)`, + }; + await db.stock.upsert({ + where: { ticker }, + create: { ticker, lastDecision: mockDecision.action, lastExplanation: mockDecision.reasoning, lastJobId: job.id?.toString() }, + update: { lastDecision: mockDecision.action, lastExplanation: mockDecision.reasoning, lastJobId: job.id?.toString() }, + }); + return mockDecision; + } + + const client = new OpenRouterClient(apiKey); + const graph = new TradingGraph(client); + const decision = await graph.propagate(ticker, input); + + await db.stock.upsert({ + where: { ticker }, + create: { + ticker, + lastDecision: decision.action as string, + lastExplanation: (decision as any).reasoning || null, + lastExecutionPlan: decision.executionPlan ? JSON.stringify(decision.executionPlan) : null, + lastJobId: job.id?.toString(), + }, + update: { + lastDecision: decision.action as string, + lastExplanation: (decision as any).reasoning || null, + lastExecutionPlan: decision.executionPlan ? JSON.stringify(decision.executionPlan) : null, + lastJobId: job.id?.toString(), + }, + }); + + console.log("[queue] Job complete and saved for", ticker); + return decision; + }, + { connection: redis } +); + +// Simple enqueue function +export async function enqueueAnalyze(ticker: string, input: any) { + const job = await analyzeQueue.add("analyze", { ticker, input }); + return job.id?.toString(); +} + +export async function getJob(jobId: string) { + const job = await analyzeQueue.getJob(jobId); + if (!job) return null; + const state = await job.getState(); + const failedReason = job.failedReason || null; + const returnValue = job.returnvalue || null; + return { id: job.id, state, failedReason, returnValue }; +} diff --git a/app/routes/analyze.ticker.tsx b/app/routes/analyze.ticker.tsx index cf7b6b5..dce5b48 100644 --- a/app/routes/analyze.ticker.tsx +++ b/app/routes/analyze.ticker.tsx @@ -13,6 +13,7 @@ interface LoaderData { bars: any[]; timeframe: string; range: string; + stockRecord?: any; } const TIMEFRAMES = [ @@ -49,6 +50,17 @@ export async function loader({ params, request }: { params: { ticker: string }; let position = null; let orders = []; let bars = []; + let stockRecord: any = null; + let stockRecord: any = null; + try { + const stockRes = await fetch(`${baseUrl}/api/stocks`); + if (stockRes.ok) { + const list = await stockRes.json(); + stockRecord = list.find((s: any) => s.ticker === ticker) || null; + } + } catch (e) { + // ignore + } try { // Fetch positions @@ -65,6 +77,17 @@ export async function loader({ params, request }: { params: { ticker: string }; const barsRes = await fetch(`${baseUrl}/api/alpaca/quote/${ticker}?timeframe=${timeframe}&range=${range}`); const barsData = barsRes.ok ? await barsRes.json() : null; bars = barsData?.bars || []; + + // Fetch stock record (to get lastJobId) + try { + const stockRes = await fetch(`${baseUrl}/api/stocks`); + if (stockRes.ok) { + const list = await stockRes.json(); + stockRecord = list.find((s: any) => s.ticker === ticker) || null; + } + } catch (e) { + // ignore + } } catch (err) { console.error(`analyze/${ticker}: loader error`, err); } @@ -73,7 +96,7 @@ export async function loader({ params, request }: { params: { ticker: string }; } export default function StockDetail() { - const { ticker, position, orders, bars, timeframe, range } = useLoaderData() as LoaderData; + const { ticker, position, orders, bars, timeframe, range, stockRecord } = useLoaderData() as LoaderData; const navigate = useNavigate(); const location = useLocation(); @@ -83,6 +106,8 @@ export default function StockDetail() { const [decision, setDecision] = useState(null); const [showAnalysts, setShowAnalysts] = useState(false); const [showDebate, setShowDebate] = useState(false); + const [jobStatus, setJobStatus] = useState(null); + const [jobPolling, setJobPolling] = useState(false); // Cache key for this ticker const cacheKey = `tradinggraph-${ticker}`; @@ -100,7 +125,32 @@ export default function StockDetail() { console.error("Failed to parse cached trading graph data:", e); } } - }, [cacheKey]); + + // If stock record contains a job id, start polling job status + if (stockRecord?.lastJobId) { + setJobPolling(true); + let cancelled = false; + const poll = async () => { + try { + const res = await fetch(`/api/jobs/${stockRecord.lastJobId}`); + if (!res.ok) return; + const j = await res.json(); + if (cancelled) return; + setJobStatus(j); + if (j.state === "completed" || j.state === "failed") { + setJobPolling(false); + cancelled = true; + return; + } + } catch (e) { + console.warn("Failed to poll job status:", e); + } + setTimeout(poll, 1000); + }; + poll(); + return () => { cancelled = true; }; + } + }, [cacheKey, stockRecord]); const updateParams = (newTimeframe: string, newRange: string) => { const searchParams = new URLSearchParams(location.search); @@ -245,6 +295,16 @@ export default function StockDetail() { > {analysisLoading ? "Running Trading Graph..." : "Run Trading Graph Analysis"} + + {/* Job status link */} + {stockRecord?.lastJobId && ( +
+ Background job: {stockRecord.lastJobId} + {jobStatus && ( + Status: {jobStatus.state} + )} +
+ )}
diff --git a/app/routes/api/jobs/$jobId.ts b/app/routes/api/jobs/$jobId.ts new file mode 100644 index 0000000..861cc40 --- /dev/null +++ b/app/routes/api/jobs/$jobId.ts @@ -0,0 +1,17 @@ +import { Queue } from "bullmq"; + +export async function loader({ params }: { params: { jobId: string } }) { + const jobId = params.jobId; + if (!jobId) return Response.json({ error: "jobId required" }, { status: 400 }); + + try { + const q = new Queue("analyze", { connection: process.env.REDIS_URL ? { connection: process.env.REDIS_URL } : undefined }); + const job = await q.getJob(jobId); + if (!job) return Response.json({ error: "Job not found" }, { status: 404 }); + const state = await job.getState(); + return Response.json({ id: job.id, state, failedReason: job.failedReason || null, returnValue: job.returnvalue || null }); + } catch (err) { + console.error("/api/jobs loader error:", err); + return Response.json({ error: "internal" }, { status: 500 }); + } +} diff --git a/app/routes/api/stocks/index.ts b/app/routes/api/stocks/index.ts index b31fac3..2e22706 100644 --- a/app/routes/api/stocks/index.ts +++ b/app/routes/api/stocks/index.ts @@ -27,6 +27,7 @@ export async function action({ request }: { request: Request }) { const lastDecision = formData.get("lastDecision")?.toString(); const lastExplanation = formData.get("lastExplanation")?.toString(); const lastExecutionPlan = formData.get("lastExecutionPlan")?.toString(); + const lastJobId = formData.get("lastJobId")?.toString(); // Upsert the stock record so ticker is ensured and optional fields are saved const stock = await db.stock.upsert({ @@ -35,12 +36,14 @@ export async function action({ request }: { request: Request }) { lastDecision: lastDecision ?? undefined, lastExplanation: lastExplanation ?? undefined, lastExecutionPlan: lastExecutionPlan ?? undefined, + lastJobId: lastJobId ?? undefined, }, create: { ticker, lastDecision: lastDecision ?? undefined, lastExplanation: lastExplanation ?? undefined, lastExecutionPlan: lastExecutionPlan ?? undefined, + lastJobId: lastJobId ?? undefined, }, }); diff --git a/prisma/schema.prisma b/prisma/schema.prisma index 47d80c6..82c4a60 100644 --- a/prisma/schema.prisma +++ b/prisma/schema.prisma @@ -15,6 +15,7 @@ model Stock { lastDecision String? lastExplanation String? lastExecutionPlan String? + lastJobId String? createdAt DateTime @default(now()) updatedAt DateTime @updatedAt }