Add job status endpoint, persist lastJobId; replace in-process queue with BullMQ-based queue and worker; link job status in UI

This commit is contained in:
2026-05-16 14:28:34 +02:00
parent d9f9150d68
commit 3234a09096
6 changed files with 174 additions and 4 deletions
+14 -2
View File
@@ -62,8 +62,20 @@ export default function MostActiveStocks() {
const data = await res.json().catch(() => null); const data = await res.json().catch(() => null);
throw new Error(data?.error || "Failed to save stock"); throw new Error(data?.error || "Failed to save stock");
} }
// trigger analysis in background (non-blocking) // trigger analysis in background (non-blocking) and persist jobId to stock record
fetch(`/api/analyze`, { method: "POST", headers: { "Content-Type": "application/json" }, body: JSON.stringify({ ticker: symbol, background: true }) }).catch(() => {}); try {
const analyzeRes = await fetch(`/api/analyze`, { method: "POST", headers: { "Content-Type": "application/json" }, body: JSON.stringify({ ticker: symbol, background: true }) });
const analyzeData = await analyzeRes.json().catch(() => null);
if (analyzeRes.ok && analyzeData?.jobId) {
const fd = new FormData();
fd.append("ticker", symbol);
fd.append("lastJobId", analyzeData.jobId.toString());
await fetch("/api/stocks", { method: "POST", body: fd });
setSaved((p) => ({ ...p, [symbol]: true }));
}
} catch (err) {
console.warn("Failed to enqueue background analyze:", err);
}
setSaved((p) => ({ ...p, [symbol]: true })); setSaved((p) => ({ ...p, [symbol]: true }));
} catch (err) { } catch (err) {
console.error(err); console.error(err);
+77
View File
@@ -0,0 +1,77 @@
import { Queue, Worker, QueueScheduler } from "bullmq";
import IORedis from "ioredis";
import { OpenRouterClient } from "./openrouter";
import { TradingGraph } from "../agents/tradingGraph";
import { db } from "./db.server";
const connection = process.env.REDIS_URL ? { connection: process.env.REDIS_URL } : { host: "127.0.0.1", port: 6379 } as any;
// BullMQ expects a Redis connection object; use ioredis instance for workers where needed
const redis = process.env.REDIS_URL ? new IORedis(process.env.REDIS_URL) : new IORedis();
export const analyzeQueue = new Queue("analyze", { connection: redis });
export const queueScheduler = new QueueScheduler("analyze", { connection: redis });
// Worker to process analyze jobs
export const worker = new Worker(
"analyze",
async (job) => {
console.log("[queue] Processing analyze job", job.id, job.data.ticker);
const { ticker, input } = job.data as { ticker: string; input: any };
const apiKey = process.env.OPENROUTER_API_KEY;
if (!apiKey || apiKey === "your_openrouter_api_key_here") {
console.log("[queue] mock mode for analyze", ticker);
const mockDecision = {
action: "hold",
confidence: 0.6,
reasoning: `${ticker} analysis - Mock mode (background)`,
};
await db.stock.upsert({
where: { ticker },
create: { ticker, lastDecision: mockDecision.action, lastExplanation: mockDecision.reasoning, lastJobId: job.id?.toString() },
update: { lastDecision: mockDecision.action, lastExplanation: mockDecision.reasoning, lastJobId: job.id?.toString() },
});
return mockDecision;
}
const client = new OpenRouterClient(apiKey);
const graph = new TradingGraph(client);
const decision = await graph.propagate(ticker, input);
await db.stock.upsert({
where: { ticker },
create: {
ticker,
lastDecision: decision.action as string,
lastExplanation: (decision as any).reasoning || null,
lastExecutionPlan: decision.executionPlan ? JSON.stringify(decision.executionPlan) : null,
lastJobId: job.id?.toString(),
},
update: {
lastDecision: decision.action as string,
lastExplanation: (decision as any).reasoning || null,
lastExecutionPlan: decision.executionPlan ? JSON.stringify(decision.executionPlan) : null,
lastJobId: job.id?.toString(),
},
});
console.log("[queue] Job complete and saved for", ticker);
return decision;
},
{ connection: redis }
);
// Simple enqueue function
export async function enqueueAnalyze(ticker: string, input: any) {
const job = await analyzeQueue.add("analyze", { ticker, input });
return job.id?.toString();
}
export async function getJob(jobId: string) {
const job = await analyzeQueue.getJob(jobId);
if (!job) return null;
const state = await job.getState();
const failedReason = job.failedReason || null;
const returnValue = job.returnvalue || null;
return { id: job.id, state, failedReason, returnValue };
}
+62 -2
View File
@@ -13,6 +13,7 @@ interface LoaderData {
bars: any[]; bars: any[];
timeframe: string; timeframe: string;
range: string; range: string;
stockRecord?: any;
} }
const TIMEFRAMES = [ const TIMEFRAMES = [
@@ -49,6 +50,17 @@ export async function loader({ params, request }: { params: { ticker: string };
let position = null; let position = null;
let orders = []; let orders = [];
let bars = []; let bars = [];
let stockRecord: any = null;
let stockRecord: any = null;
try {
const stockRes = await fetch(`${baseUrl}/api/stocks`);
if (stockRes.ok) {
const list = await stockRes.json();
stockRecord = list.find((s: any) => s.ticker === ticker) || null;
}
} catch (e) {
// ignore
}
try { try {
// Fetch positions // Fetch positions
@@ -65,6 +77,17 @@ export async function loader({ params, request }: { params: { ticker: string };
const barsRes = await fetch(`${baseUrl}/api/alpaca/quote/${ticker}?timeframe=${timeframe}&range=${range}`); const barsRes = await fetch(`${baseUrl}/api/alpaca/quote/${ticker}?timeframe=${timeframe}&range=${range}`);
const barsData = barsRes.ok ? await barsRes.json() : null; const barsData = barsRes.ok ? await barsRes.json() : null;
bars = barsData?.bars || []; bars = barsData?.bars || [];
// Fetch stock record (to get lastJobId)
try {
const stockRes = await fetch(`${baseUrl}/api/stocks`);
if (stockRes.ok) {
const list = await stockRes.json();
stockRecord = list.find((s: any) => s.ticker === ticker) || null;
}
} catch (e) {
// ignore
}
} catch (err) { } catch (err) {
console.error(`analyze/${ticker}: loader error`, err); console.error(`analyze/${ticker}: loader error`, err);
} }
@@ -73,7 +96,7 @@ export async function loader({ params, request }: { params: { ticker: string };
} }
export default function StockDetail() { export default function StockDetail() {
const { ticker, position, orders, bars, timeframe, range } = useLoaderData() as LoaderData; const { ticker, position, orders, bars, timeframe, range, stockRecord } = useLoaderData() as LoaderData;
const navigate = useNavigate(); const navigate = useNavigate();
const location = useLocation(); const location = useLocation();
@@ -83,6 +106,8 @@ export default function StockDetail() {
const [decision, setDecision] = useState<TradingDecision | null>(null); const [decision, setDecision] = useState<TradingDecision | null>(null);
const [showAnalysts, setShowAnalysts] = useState(false); const [showAnalysts, setShowAnalysts] = useState(false);
const [showDebate, setShowDebate] = useState(false); const [showDebate, setShowDebate] = useState(false);
const [jobStatus, setJobStatus] = useState<any>(null);
const [jobPolling, setJobPolling] = useState(false);
// Cache key for this ticker // Cache key for this ticker
const cacheKey = `tradinggraph-${ticker}`; const cacheKey = `tradinggraph-${ticker}`;
@@ -100,7 +125,32 @@ export default function StockDetail() {
console.error("Failed to parse cached trading graph data:", e); console.error("Failed to parse cached trading graph data:", e);
} }
} }
}, [cacheKey]);
// If stock record contains a job id, start polling job status
if (stockRecord?.lastJobId) {
setJobPolling(true);
let cancelled = false;
const poll = async () => {
try {
const res = await fetch(`/api/jobs/${stockRecord.lastJobId}`);
if (!res.ok) return;
const j = await res.json();
if (cancelled) return;
setJobStatus(j);
if (j.state === "completed" || j.state === "failed") {
setJobPolling(false);
cancelled = true;
return;
}
} catch (e) {
console.warn("Failed to poll job status:", e);
}
setTimeout(poll, 1000);
};
poll();
return () => { cancelled = true; };
}
}, [cacheKey, stockRecord]);
const updateParams = (newTimeframe: string, newRange: string) => { const updateParams = (newTimeframe: string, newRange: string) => {
const searchParams = new URLSearchParams(location.search); const searchParams = new URLSearchParams(location.search);
@@ -245,6 +295,16 @@ export default function StockDetail() {
> >
{analysisLoading ? "Running Trading Graph..." : "Run Trading Graph Analysis"} {analysisLoading ? "Running Trading Graph..." : "Run Trading Graph Analysis"}
</button> </button>
{/* Job status link */}
{stockRecord?.lastJobId && (
<div className="mt-3 text-sm text-gray-600">
Background job: <a href={`/api/jobs/${stockRecord.lastJobId}`} className="text-blue-600 hover:underline">{stockRecord.lastJobId}</a>
{jobStatus && (
<span className="ml-3">Status: <strong>{jobStatus.state}</strong></span>
)}
</div>
)}
</div> </div>
<div className="mt-6 bg-white rounded-xl shadow-lg p-6 border border-gray-200"> <div className="mt-6 bg-white rounded-xl shadow-lg p-6 border border-gray-200">
+17
View File
@@ -0,0 +1,17 @@
import { Queue } from "bullmq";
export async function loader({ params }: { params: { jobId: string } }) {
const jobId = params.jobId;
if (!jobId) return Response.json({ error: "jobId required" }, { status: 400 });
try {
const q = new Queue("analyze", { connection: process.env.REDIS_URL ? { connection: process.env.REDIS_URL } : undefined });
const job = await q.getJob(jobId);
if (!job) return Response.json({ error: "Job not found" }, { status: 404 });
const state = await job.getState();
return Response.json({ id: job.id, state, failedReason: job.failedReason || null, returnValue: job.returnvalue || null });
} catch (err) {
console.error("/api/jobs loader error:", err);
return Response.json({ error: "internal" }, { status: 500 });
}
}
+3
View File
@@ -27,6 +27,7 @@ export async function action({ request }: { request: Request }) {
const lastDecision = formData.get("lastDecision")?.toString(); const lastDecision = formData.get("lastDecision")?.toString();
const lastExplanation = formData.get("lastExplanation")?.toString(); const lastExplanation = formData.get("lastExplanation")?.toString();
const lastExecutionPlan = formData.get("lastExecutionPlan")?.toString(); const lastExecutionPlan = formData.get("lastExecutionPlan")?.toString();
const lastJobId = formData.get("lastJobId")?.toString();
// Upsert the stock record so ticker is ensured and optional fields are saved // Upsert the stock record so ticker is ensured and optional fields are saved
const stock = await db.stock.upsert({ const stock = await db.stock.upsert({
@@ -35,12 +36,14 @@ export async function action({ request }: { request: Request }) {
lastDecision: lastDecision ?? undefined, lastDecision: lastDecision ?? undefined,
lastExplanation: lastExplanation ?? undefined, lastExplanation: lastExplanation ?? undefined,
lastExecutionPlan: lastExecutionPlan ?? undefined, lastExecutionPlan: lastExecutionPlan ?? undefined,
lastJobId: lastJobId ?? undefined,
}, },
create: { create: {
ticker, ticker,
lastDecision: lastDecision ?? undefined, lastDecision: lastDecision ?? undefined,
lastExplanation: lastExplanation ?? undefined, lastExplanation: lastExplanation ?? undefined,
lastExecutionPlan: lastExecutionPlan ?? undefined, lastExecutionPlan: lastExecutionPlan ?? undefined,
lastJobId: lastJobId ?? undefined,
}, },
}); });
+1
View File
@@ -15,6 +15,7 @@ model Stock {
lastDecision String? lastDecision String?
lastExplanation String? lastExplanation String?
lastExecutionPlan String? lastExecutionPlan String?
lastJobId String?
createdAt DateTime @default(now()) createdAt DateTime @default(now())
updatedAt DateTime @updatedAt updatedAt DateTime @updatedAt
} }