import pkg from "bullmq"; const { Queue, Worker } = pkg as any; import IORedis from "ioredis"; import { OpenRouterClient } from "./openrouter"; import { TradingGraph } from "../agents/tradingGraph"; import { db } from "./db.server"; const REDIS_URL = process.env.REDIS_URL; let analyzeQueue: any = undefined; let worker: any = undefined; let enqueueAnalyze: (ticker: string, input: any) => Promise | string; let getJob: (jobId: string) => Promise; let listRecentJobs: (ticker?: string, limit?: number) => Promise; let cancelJob: (jobId: string) => Promise; if (REDIS_URL) { const redis = new IORedis(REDIS_URL as string); analyzeQueue = new Queue("analyze", { connection: redis }); // Worker to process analyze jobs worker = new Worker( "analyze", async (job: any) => { console.log("[queue] Processing analyze job", job.id, job.data.ticker); const { ticker, input } = job.data as { ticker: string; input: any }; const apiKey = process.env.OPENROUTER_API_KEY; if (!apiKey || apiKey === "your_openrouter_api_key_here") { console.log("[queue] mock mode for analyze", ticker); const mockDecision = { action: "hold", confidence: 0.6, reasoning: `${ticker} analysis - Mock mode (background)`, }; await db.stock.upsert({ where: { ticker }, create: { ticker, lastDecision: mockDecision.action, lastExplanation: mockDecision.reasoning, lastJobId: job.id?.toString() }, update: { lastDecision: mockDecision.action, lastExplanation: mockDecision.reasoning, lastJobId: job.id?.toString() }, }); return mockDecision; } const client = new OpenRouterClient(apiKey); const graph = new TradingGraph(client); const decision = await graph.propagate(ticker, input); await db.stock.upsert({ where: { ticker }, create: { ticker, lastDecision: decision.action as string, lastExplanation: (decision as any).reasoning || null, lastExecutionPlan: decision.executionPlan ? JSON.stringify(decision.executionPlan) : null, lastJobId: job.id?.toString(), }, update: { lastDecision: decision.action as string, lastExplanation: (decision as any).reasoning || null, lastExecutionPlan: decision.executionPlan ? JSON.stringify(decision.executionPlan) : null, lastJobId: job.id?.toString(), }, }); console.log("[queue] Job complete and saved for", ticker); return decision; }, { connection: redis } ); enqueueAnalyze = async (ticker: string, input: any) => { const job = await analyzeQueue.add("analyze", { ticker, input }); return job.id?.toString(); }; getJob = async (jobId: string) => { const job = await analyzeQueue.getJob(jobId); if (!job) return null; const state = await job.getState(); const failedReason = job.failedReason || null; const returnValue = job.returnvalue || null; return { id: job.id, state, failedReason, returnValue }; }; listRecentJobs = async (ticker?: string, limit = 50) => { const jobs = await analyzeQueue.getJobs(["waiting", "active", "completed", "failed", "delayed"], 0, limit - 1); const mapped = await Promise.all(jobs.map(async (j: any) => ({ id: j.id, name: j.name, data: j.data, state: await j.getState(), returnValue: j.returnvalue || null }))); if (ticker) return mapped.filter((j: any) => j.data?.ticker === ticker); return mapped; }; cancelJob = async (jobId: string) => { try { const job = await analyzeQueue.getJob(jobId); if (!job) return false; const state = await job.getState(); if (state === "waiting" || state === "delayed") { await job.remove(); return true; } return false; } catch (err) { console.error("cancelJob error:", err); return false; } }; } else { // In-process fallback queue for environments without Redis (dev/tests) type Job = { id: string; ticker: string; input: any; state: "queued" | "processing" | "completed" | "failed"; result?: any; failedReason?: string }; const queue: Job[] = []; const jobsById: Record = {}; let processing = false; function makeId() { return `${Date.now()}-${Math.floor(Math.random() * 100000)}`; } enqueueAnalyze = (ticker: string, input: any) => { const id = makeId(); const job: Job = { id, ticker, input, state: "queued" }; queue.push(job); jobsById[id] = job; if (!processing) processQueue().catch((e) => console.error("inproc queue error:", e)); return id; }; listRecentJobs = async (ticker?: string, limit = 50) => { const items = Object.values(jobsById).slice(-limit).reverse().map((j) => ({ id: j.id, data: { ticker: j.ticker }, state: j.state, returnValue: j.result || null })); if (ticker) return items.filter((it) => it.data?.ticker === ticker); return items; }; cancelJob = async (jobId: string) => { const job = jobsById[jobId]; if (!job) return false; // If queued but not yet processing, remove from queue if (job.state === "queued") { const idx = queue.findIndex((q) => q.id === jobId); if (idx !== -1) queue.splice(idx, 1); job.state = "failed"; job.failedReason = "cancelled"; return true; } // Can't cancel if already processing/completed/failed return false; }; async function processQueue() { processing = true; while (queue.length > 0) { const job = queue.shift()!; job.state = "processing"; try { const apiKey = process.env.OPENROUTER_API_KEY; if (!apiKey || apiKey === "your_openrouter_api_key_here") { const mockDecision = { action: "hold", confidence: 0.6, reasoning: `${job.ticker} analysis - Mock (inproc)` }; job.result = mockDecision; job.state = "completed"; await db.stock.upsert({ where: { ticker: job.ticker }, create: { ticker: job.ticker, lastDecision: mockDecision.action, lastExplanation: mockDecision.reasoning, lastJobId: job.id }, update: { lastDecision: mockDecision.action, lastExplanation: mockDecision.reasoning, lastJobId: job.id }, }); continue; } const client = new OpenRouterClient(process.env.OPENROUTER_API_KEY as string); const graph = new TradingGraph(client); const decision = await graph.propagate(job.ticker, job.input); job.result = decision; job.state = "completed"; await db.stock.upsert({ where: { ticker: job.ticker }, create: { ticker: job.ticker, lastDecision: decision.action as string, lastExplanation: (decision as any).reasoning || null, lastExecutionPlan: decision.executionPlan ? JSON.stringify(decision.executionPlan) : null, lastJobId: job.id, }, update: { lastDecision: decision.action as string, lastExplanation: (decision as any).reasoning || null, lastExecutionPlan: decision.executionPlan ? JSON.stringify(decision.executionPlan) : null, lastJobId: job.id, }, }); } catch (err: any) { console.error("[inproc queue] job failed:", err); job.state = "failed"; job.failedReason = err?.message || String(err); } } processing = false; } getJob = async (jobId: string) => { const job = jobsById[jobId]; if (!job) return null; return { id: job.id, state: job.state, failedReason: job.failedReason || null, returnValue: job.result || null }; }; } export { enqueueAnalyze, getJob, listRecentJobs, cancelJob, analyzeQueue, worker };