diff --git a/app/lib/queue.ts b/app/lib/queue.ts index eb722d7..e3ea1c4 100644 --- a/app/lib/queue.ts +++ b/app/lib/queue.ts @@ -22,11 +22,9 @@ if (REDIS_URL) { worker = new Worker( "analyze", async (job: any) => { - console.log("[queue] Processing analyze job", job.id, job.data.ticker); const { ticker, input } = job.data as { ticker: string; input: any }; const apiKey = process.env.OPENROUTER_API_KEY; if (!apiKey || apiKey === "your_openrouter_api_key_here") { - console.log("[queue] mock mode for analyze", ticker); const mockDecision = { action: "hold", confidence: 0.6, @@ -41,7 +39,6 @@ if (REDIS_URL) { } const { graph, config } = await buildTradingGraph(apiKey); - console.log("[queue] Trading config:", config); // Fetch latest Alpaca account and prices; abort job if unavailable so work runs on fresh data try { const account = await fetchAccount(); @@ -89,7 +86,6 @@ if (REDIS_URL) { }, }); - console.log("[queue] Job complete and saved for", ticker); return decision; }, { connection: redis } @@ -106,12 +102,12 @@ if (REDIS_URL) { const state = await job.getState(); const failedReason = job.failedReason || null; const returnValue = job.returnvalue || null; - return { id: job.id, state, failedReason, returnValue }; + return { id: job.id, state, failedReason, returnValue, timestamp: job.timestamp ?? job.data?.timestamp ?? null }; }; listRecentJobs = async (ticker?: string, limit = 50) => { const jobs = await analyzeQueue.getJobs(["waiting", "active", "completed", "failed", "delayed"], 0, limit - 1); - const mapped = await Promise.all(jobs.map(async (j: any) => ({ id: j.id, name: j.name, data: j.data, state: await j.getState(), returnValue: j.returnvalue || null }))); + const mapped = await Promise.all(jobs.map(async (j: any) => ({ id: j.id, name: j.name, data: j.data, state: await j.getState(), returnValue: j.returnvalue || null, timestamp: j.timestamp ?? j.data?.timestamp ?? null }))); if (ticker) return mapped.filter((j: any) => j.data?.ticker === ticker); return mapped; }; @@ -133,7 +129,7 @@ if (REDIS_URL) { }; } else { // In-process fallback queue for environments without Redis (dev/tests) - type Job = { id: string; ticker: string; input: any; state: "queued" | "processing" | "completed" | "failed"; result?: any; failedReason?: string }; + type Job = { id: string; ticker: string; input: any; state: "queued" | "processing" | "completed" | "failed"; result?: any; failedReason?: string; timestamp: number }; const queue: Job[] = []; const jobsById: Record = {}; let processing = false; @@ -144,7 +140,7 @@ if (REDIS_URL) { enqueueAnalyze = (ticker: string, input: any) => { const id = makeId(); - const job: Job = { id, ticker, input, state: "queued" }; + const job: Job = { id, ticker, input, state: "queued", timestamp: input?.timestamp ?? Date.now() }; queue.push(job); jobsById[id] = job; if (!processing) processQueue().catch((e) => console.error("inproc queue error:", e)); @@ -152,7 +148,7 @@ if (REDIS_URL) { }; listRecentJobs = async (ticker?: string, limit = 50) => { - const items = Object.values(jobsById).slice(-limit).reverse().map((j) => ({ id: j.id, data: { ticker: j.ticker }, state: j.state, returnValue: j.result || null })); + const items = Object.values(jobsById).slice(-limit).reverse().map((j) => ({ id: j.id, data: { ticker: j.ticker, timestamp: j.timestamp }, state: j.state, returnValue: j.result || null, timestamp: j.timestamp })); if (ticker) return items.filter((it) => it.data?.ticker === ticker); return items; }; @@ -252,7 +248,7 @@ if (REDIS_URL) { getJob = async (jobId: string) => { const job = jobsById[jobId]; if (!job) return null; - return { id: job.id, state: job.state, failedReason: job.failedReason || null, returnValue: job.result || null }; + return { id: job.id, state: job.state, failedReason: job.failedReason || null, returnValue: job.result || null, timestamp: job.timestamp }; }; } diff --git a/app/routes/analyze.tsx b/app/routes/analyze.tsx index 3b1dbe2..979e3fb 100644 --- a/app/routes/analyze.tsx +++ b/app/routes/analyze.tsx @@ -386,13 +386,6 @@ export default function Analyze() { const quote = quoteRes.ok ? await quoteRes.json() : null; const indicatorsData = indicatorsRes.ok ? await indicatorsRes.json() : null; - const analysisRes = await fetch("/api/analyze", { - method: "POST", - headers: { "Content-Type": "application/json" }, - body: JSON.stringify({ ticker }), - }); - const analysis = analysisRes.ok ? await analysisRes.json() : null; - const indicators: Indicators = { rsi: indicatorsData?.indicators?.rsi ?? null, sma20: indicatorsData?.indicators?.sma ?? null, @@ -407,6 +400,48 @@ export default function Analyze() { avgVolume: indicatorsData?.indicators?.avgVolume ?? null, }; + const analysisRes = await fetch("/api/analyze", { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ ticker }), + }); + + if (analysisRes.status === 202) { + // Background job queued - poll for completion + const data = await analysisRes.json(); + const jobId = data.jobId; + let cancelled = false; + let timer: ReturnType | null = null; + + const poll = async () => { + try { + const jr = await fetch(`/api/jobs/${jobId}`); + if (!jr.ok) { if (!cancelled) timer = setTimeout(poll, 2000); return; } + const j = await jr.json(); + if (cancelled) return; + + if (j.state === "completed" && j.returnValue) { + setStocks((s) => s.map((st) => + st.id === id ? { ...st, loading: false, currentPrice: quote?.price ?? null, indicators, analysis: j.returnValue } : st + )); + cancelled = true; + return; + } + if (j.state === "failed") { + setStocks((s) => s.map((st) => st.id === id ? { ...st, loading: false } : st)); + cancelled = true; + return; + } + } catch (e) { /* ignore */ } + if (!cancelled) timer = setTimeout(poll, 2000); + }; + poll(); + + return () => { cancelled = true; if (timer) clearTimeout(timer); }; + } + + // Fallback: synchronous response + const analysis = analysisRes.ok ? await analysisRes.json() : null; setStocks((s) => s.map((st) => st.id === id ? { ...st, loading: false, currentPrice: quote?.price ?? null, indicators, analysis } diff --git a/app/routes/api/analyze.ts b/app/routes/api/analyze.ts index cdf8d2d..1208ff4 100644 --- a/app/routes/api/analyze.ts +++ b/app/routes/api/analyze.ts @@ -2,6 +2,8 @@ // Server-only imports are loaded dynamically inside the action to avoid client bundling issues +const JOB_TIMEOUT_MS = 5 * 60 * 1000; // 5 minutes + export async function action({ request }: { request: Request }) { const body = await request.json(); @@ -12,66 +14,58 @@ export async function action({ request }: { request: Request }) { return Response.json({ error: "ticker is required" }, { status: 400 }); } - // Load server-only modules dynamically to prevent them from being included in client bundles - const { buildTradingGraph } = await import("../../lib/tradingConfig.server"); const { db } = await import("../../lib/db.server"); const { fetchAccount, fetchRecentCloses, fetchBars } = await import("../../lib/alpacaClient"); + const { getJob, listRecentJobs, cancelJob } = await import("../../lib/queue"); - const apiKey = process.env.OPENROUTER_API_KEY; - - if (!apiKey || apiKey === "your_openrouter_api_key_here") { - const mockDecision = { - action: "hold" as const, - confidence: 0.75, - reasoning: `${ticker} analysis - Mock mode: positive momentum detected with neutral technical signals`, - agentSignals: [ - { - agent: "fundamentals" as const, - signal: "bullish" as const, - confidence: 0.7, - reasoning: "Strong fundamentals with positive earnings outlook", - timestamp: new Date().toISOString(), - }, - { - agent: "technical" as const, - signal: "neutral" as const, - confidence: 0.6, - reasoning: "Mixed technical indicators", - timestamp: new Date().toISOString(), - }, - ], - debateRounds: [ - { - bullishView: "Bullish case supported by fundamentals and momentum", - bearishView: "Bearish case from mixed technical signals", - researcher: "bullish" as const, - }, - ], - }; - return Response.json(mockDecision); - } + // Clean up old unfinished jobs for this ticker (older than timeout) + try { + const recentJobs = await listRecentJobs(ticker, 50); + for (const j of recentJobs) { + if (j.state === "waiting" || j.state === "active" || j.state === "delayed") { + // Check if the job is too old + const jobCreatedAt = j.data?.timestamp; + if (jobCreatedAt && Date.now() - jobCreatedAt > JOB_TIMEOUT_MS) { + await cancelJob(j.id); + } + } + } + } catch (e) { /* ignore cleanup errors */ } - const { graph, config } = await buildTradingGraph(apiKey); + // Check if there's a recent unfinished job that can be reused + try { + const recentJobs = await listRecentJobs(ticker, 10); + const activeJob = recentJobs.find((j: any) => j.state === "waiting" || j.state === "active"); + if (activeJob) { + // Return existing job ID instead of creating a new one + const jobId = activeJob.id; + // Update the stock record with this job ID + await db.stock.upsert({ + where: { ticker }, + update: { lastJobId: jobId }, + create: { ticker, lastJobId: jobId }, + }); + return Response.json({ status: "queued", jobId }, { status: 202 }); + } + } catch (e) { /* ignore */ } - // Fetch latest Alpaca account and recent prices; abort if unavailable + // Fetch latest Alpaca account and recent prices let account: any = undefined; let prices: number[] = []; let recentBars: any[] = []; try { account = await fetchAccount(); prices = await fetchRecentCloses(ticker); - // Also fetch recent intraday bars to enable deterministic execution plan calculation try { recentBars = await fetchBars(ticker, '1Min', { limit: 200 }); - // derive prices from bars if available (prefer freshest closes) if (recentBars && recentBars.length) { prices = recentBars.map((b: any) => (typeof b.ClosePrice === 'number' ? b.ClosePrice : (typeof b.c === 'number' ? b.c : 0))).filter((p: number) => p > 0); } } catch (barErr) { - console.warn('[analyze] Failed to fetch recent bars for deterministic execution plan:', barErr); + console.warn('[analyze] Failed to fetch recent bars:', barErr); } } catch (e) { - console.error("[analyze] Failed to fetch Alpaca data before analysis:", e); + console.error("[analyze] Failed to fetch Alpaca data:", e); return Response.json({ error: "Failed to fetch Alpaca data: " + String(e) }, { status: 502 }); } @@ -90,42 +84,24 @@ export async function action({ request }: { request: Request }) { source: "news" as const, }, account, + timestamp: Date.now(), }; + // Always enqueue as background job try { - if (body.background) { - // Enqueue background analyze job and return 202 immediately - try { - const { enqueueAnalyze } = await import("../../lib/queue"); - const jobId = await enqueueAnalyze(ticker, input); - return Response.json({ status: "queued", jobId }, { status: 202 }); - } catch (enqueueErr) { - console.error("[analyze] enqueue error:", enqueueErr); - return Response.json({ error: "failed to enqueue" }, { status: 500 }); - } - } + const { enqueueAnalyze } = await import("../../lib/queue"); + const jobId = await enqueueAnalyze(ticker, input); - let decision = await graph.propagate(ticker, input); - // Enrich executionPlan deterministically on server-side - try { - const { enrichExecutionPlan, verifyExecutionPlanWithLLM } = await import("../../lib/execution"); - decision = enrichExecutionPlan(decision, input); - // Optionally ask LLM to verify/adjust the computed plan if API key is present - if (process.env.OPENROUTER_API_KEY) { - try { - decision = await verifyExecutionPlanWithLLM(decision, input, config.model); - } catch (e) { - console.warn("LLM verification failed:", e); - } - } - } catch (e) { - console.warn("Failed to enrich execution plan:", e); - } + // Save jobId to DB stock record + await db.stock.upsert({ + where: { ticker }, + update: { lastJobId: jobId }, + create: { ticker, lastJobId: jobId }, + }); - return Response.json(decision); - } catch (error) { - const message = error instanceof Error ? error.message : "Unknown error"; - console.error("[analyze] Error:", error); - return Response.json({ error: message }, { status: 500 }); + return Response.json({ status: "queued", jobId }, { status: 202 }); + } catch (enqueueErr) { + console.error("[analyze] enqueue error:", enqueueErr); + return Response.json({ error: "failed to enqueue" }, { status: 500 }); } }