From 9167bd8912b3c4d6ca24f0b53219b32b8f87820a Mon Sep 17 00:00:00 2001 From: Henry Winkel Date: Sat, 16 May 2026 14:42:33 +0200 Subject: [PATCH] Queue: support REDIS_URL BullMQ mode; fallback to in-process queue for dev/tests --- app/lib/queue.ts | 195 ++++++++++++++++++++++++++++++++--------------- 1 file changed, 134 insertions(+), 61 deletions(-) diff --git a/app/lib/queue.ts b/app/lib/queue.ts index 7500f62..1a7746f 100644 --- a/app/lib/queue.ts +++ b/app/lib/queue.ts @@ -5,73 +5,146 @@ import { OpenRouterClient } from "./openrouter"; import { TradingGraph } from "../agents/tradingGraph"; import { db } from "./db.server"; -const connection = process.env.REDIS_URL ? { connection: process.env.REDIS_URL } : { host: "127.0.0.1", port: 6379 } as any; +const REDIS_URL = process.env.REDIS_URL; -// BullMQ expects a Redis connection object; use ioredis instance for workers where needed -const redis = process.env.REDIS_URL ? new IORedis(process.env.REDIS_URL) : new IORedis(); +// If REDIS_URL is provided, use BullMQ + Redis; otherwise fallback to an in-process queue for dev/tests +if (REDIS_URL) { + const redis = new IORedis(REDIS_URL as string); + export const analyzeQueue = new Queue("analyze", { connection: redis }); -export const analyzeQueue = new Queue("analyze", { connection: redis }); + // Worker to process analyze jobs + export const worker = new Worker( + "analyze", + async (job: any) => { + console.log("[queue] Processing analyze job", job.id, job.data.ticker); + const { ticker, input } = job.data as { ticker: string; input: any }; + const apiKey = process.env.OPENROUTER_API_KEY; + if (!apiKey || apiKey === "your_openrouter_api_key_here") { + console.log("[queue] mock mode for analyze", ticker); + const mockDecision = { + action: "hold", + confidence: 0.6, + reasoning: `${ticker} analysis - Mock mode (background)`, + }; + await db.stock.upsert({ + where: { ticker }, + create: { ticker, lastDecision: mockDecision.action, lastExplanation: mockDecision.reasoning, lastJobId: job.id?.toString() }, + update: { lastDecision: mockDecision.action, lastExplanation: mockDecision.reasoning, lastJobId: job.id?.toString() }, + }); + return mockDecision; + } + + const client = new OpenRouterClient(apiKey); + const graph = new TradingGraph(client); + const decision = await graph.propagate(ticker, input); -// Worker to process analyze jobs -export const worker = new Worker( - "analyze", - async (job: any) => { - console.log("[queue] Processing analyze job", job.id, job.data.ticker); - const { ticker, input } = job.data as { ticker: string; input: any }; - const apiKey = process.env.OPENROUTER_API_KEY; - if (!apiKey || apiKey === "your_openrouter_api_key_here") { - console.log("[queue] mock mode for analyze", ticker); - const mockDecision = { - action: "hold", - confidence: 0.6, - reasoning: `${ticker} analysis - Mock mode (background)`, - }; await db.stock.upsert({ where: { ticker }, - create: { ticker, lastDecision: mockDecision.action, lastExplanation: mockDecision.reasoning, lastJobId: job.id?.toString() }, - update: { lastDecision: mockDecision.action, lastExplanation: mockDecision.reasoning, lastJobId: job.id?.toString() }, + create: { + ticker, + lastDecision: decision.action as string, + lastExplanation: (decision as any).reasoning || null, + lastExecutionPlan: decision.executionPlan ? JSON.stringify(decision.executionPlan) : null, + lastJobId: job.id?.toString(), + }, + update: { + lastDecision: decision.action as string, + lastExplanation: (decision as any).reasoning || null, + lastExecutionPlan: decision.executionPlan ? JSON.stringify(decision.executionPlan) : null, + lastJobId: job.id?.toString(), + }, }); - return mockDecision; + + console.log("[queue] Job complete and saved for", ticker); + return decision; + }, + { connection: redis } + ); + + export async function enqueueAnalyze(ticker: string, input: any) { + const job = await analyzeQueue.add("analyze", { ticker, input }); + return job.id?.toString(); + } + + export async function getJob(jobId: string) { + const job = await analyzeQueue.getJob(jobId); + if (!job) return null; + const state = await job.getState(); + const failedReason = job.failedReason || null; + const returnValue = job.returnvalue || null; + return { id: job.id, state, failedReason, returnValue }; + } +} else { + // In-process fallback queue for environments without Redis (dev/tests) + type Job = { id: string; ticker: string; input: any; state: "queued" | "processing" | "completed" | "failed"; result?: any; failedReason?: string }; + const queue: Job[] = []; + const jobsById: Record = {}; + let processing = false; + + function makeId() { + return `${Date.now()}-${Math.floor(Math.random() * 100000)}`; + } + + export function enqueueAnalyze(ticker: string, input: any) { + const id = makeId(); + const job: Job = { id, ticker, input, state: "queued" }; + queue.push(job); + jobsById[id] = job; + if (!processing) processQueue().catch((e) => console.error("inproc queue error:", e)); + return id; + } + + async function processQueue() { + processing = true; + while (queue.length > 0) { + const job = queue.shift()!; + job.state = "processing"; + try { + const apiKey = process.env.OPENROUTER_API_KEY; + if (!apiKey || apiKey === "your_openrouter_api_key_here") { + const mockDecision = { action: "hold", confidence: 0.6, reasoning: `${job.ticker} analysis - Mock (inproc)` }; + job.result = mockDecision; + job.state = "completed"; + await db.stock.upsert({ + where: { ticker: job.ticker }, + create: { ticker: job.ticker, lastDecision: mockDecision.action, lastExplanation: mockDecision.reasoning, lastJobId: job.id }, + update: { lastDecision: mockDecision.action, lastExplanation: mockDecision.reasoning, lastJobId: job.id }, + }); + continue; + } + const client = new OpenRouterClient(process.env.OPENROUTER_API_KEY as string); + const graph = new TradingGraph(client); + const decision = await graph.propagate(job.ticker, job.input); + job.result = decision; + job.state = "completed"; + await db.stock.upsert({ + where: { ticker: job.ticker }, + create: { + ticker: job.ticker, + lastDecision: decision.action as string, + lastExplanation: (decision as any).reasoning || null, + lastExecutionPlan: decision.executionPlan ? JSON.stringify(decision.executionPlan) : null, + lastJobId: job.id, + }, + update: { + lastDecision: decision.action as string, + lastExplanation: (decision as any).reasoning || null, + lastExecutionPlan: decision.executionPlan ? JSON.stringify(decision.executionPlan) : null, + lastJobId: job.id, + }, + }); + } catch (err: any) { + console.error("[inproc queue] job failed:", err); + job.state = "failed"; + job.failedReason = err?.message || String(err); + } } + processing = false; + } - const client = new OpenRouterClient(apiKey); - const graph = new TradingGraph(client); - const decision = await graph.propagate(ticker, input); - - await db.stock.upsert({ - where: { ticker }, - create: { - ticker, - lastDecision: decision.action as string, - lastExplanation: (decision as any).reasoning || null, - lastExecutionPlan: decision.executionPlan ? JSON.stringify(decision.executionPlan) : null, - lastJobId: job.id?.toString(), - }, - update: { - lastDecision: decision.action as string, - lastExplanation: (decision as any).reasoning || null, - lastExecutionPlan: decision.executionPlan ? JSON.stringify(decision.executionPlan) : null, - lastJobId: job.id?.toString(), - }, - }); - - console.log("[queue] Job complete and saved for", ticker); - return decision; - }, - { connection: redis } -); - -// Simple enqueue function -export async function enqueueAnalyze(ticker: string, input: any) { - const job = await analyzeQueue.add("analyze", { ticker, input }); - return job.id?.toString(); -} - -export async function getJob(jobId: string) { - const job = await analyzeQueue.getJob(jobId); - if (!job) return null; - const state = await job.getState(); - const failedReason = job.failedReason || null; - const returnValue = job.returnvalue || null; - return { id: job.id, state, failedReason, returnValue }; + export async function getJob(jobId: string) { + const job = jobsById[jobId]; + if (!job) return null; + return { id: job.id, state: job.state, failedReason: job.failedReason || null, returnValue: job.result || null }; + } }