import { Queue, Worker, QueueScheduler } from "bullmq"; import IORedis from "ioredis"; import { OpenRouterClient } from "./openrouter"; import { TradingGraph } from "../agents/tradingGraph"; import { db } from "./db.server"; const connection = process.env.REDIS_URL ? { connection: process.env.REDIS_URL } : { host: "127.0.0.1", port: 6379 } as any; // BullMQ expects a Redis connection object; use ioredis instance for workers where needed const redis = process.env.REDIS_URL ? new IORedis(process.env.REDIS_URL) : new IORedis(); export const analyzeQueue = new Queue("analyze", { connection: redis }); export const queueScheduler = new QueueScheduler("analyze", { connection: redis }); // Worker to process analyze jobs export const worker = new Worker( "analyze", async (job) => { console.log("[queue] Processing analyze job", job.id, job.data.ticker); const { ticker, input } = job.data as { ticker: string; input: any }; const apiKey = process.env.OPENROUTER_API_KEY; if (!apiKey || apiKey === "your_openrouter_api_key_here") { console.log("[queue] mock mode for analyze", ticker); const mockDecision = { action: "hold", confidence: 0.6, reasoning: `${ticker} analysis - Mock mode (background)`, }; await db.stock.upsert({ where: { ticker }, create: { ticker, lastDecision: mockDecision.action, lastExplanation: mockDecision.reasoning, lastJobId: job.id?.toString() }, update: { lastDecision: mockDecision.action, lastExplanation: mockDecision.reasoning, lastJobId: job.id?.toString() }, }); return mockDecision; } const client = new OpenRouterClient(apiKey); const graph = new TradingGraph(client); const decision = await graph.propagate(ticker, input); await db.stock.upsert({ where: { ticker }, create: { ticker, lastDecision: decision.action as string, lastExplanation: (decision as any).reasoning || null, lastExecutionPlan: decision.executionPlan ? JSON.stringify(decision.executionPlan) : null, lastJobId: job.id?.toString(), }, update: { lastDecision: decision.action as string, lastExplanation: (decision as any).reasoning || null, lastExecutionPlan: decision.executionPlan ? JSON.stringify(decision.executionPlan) : null, lastJobId: job.id?.toString(), }, }); console.log("[queue] Job complete and saved for", ticker); return decision; }, { connection: redis } ); // Simple enqueue function export async function enqueueAnalyze(ticker: string, input: any) { const job = await analyzeQueue.add("analyze", { ticker, input }); return job.id?.toString(); } export async function getJob(jobId: string) { const job = await analyzeQueue.getJob(jobId); if (!job) return null; const state = await job.getState(); const failedReason = job.failedReason || null; const returnValue = job.returnvalue || null; return { id: job.id, state, failedReason, returnValue }; }