Queue: support REDIS_URL BullMQ mode; fallback to in-process queue for dev/tests
This commit is contained in:
+134
-61
@@ -5,73 +5,146 @@ import { OpenRouterClient } from "./openrouter";
|
||||
import { TradingGraph } from "../agents/tradingGraph";
|
||||
import { db } from "./db.server";
|
||||
|
||||
const connection = process.env.REDIS_URL ? { connection: process.env.REDIS_URL } : { host: "127.0.0.1", port: 6379 } as any;
|
||||
const REDIS_URL = process.env.REDIS_URL;
|
||||
|
||||
// BullMQ expects a Redis connection object; use ioredis instance for workers where needed
|
||||
const redis = process.env.REDIS_URL ? new IORedis(process.env.REDIS_URL) : new IORedis();
|
||||
// If REDIS_URL is provided, use BullMQ + Redis; otherwise fallback to an in-process queue for dev/tests
|
||||
if (REDIS_URL) {
|
||||
const redis = new IORedis(REDIS_URL as string);
|
||||
export const analyzeQueue = new Queue("analyze", { connection: redis });
|
||||
|
||||
export const analyzeQueue = new Queue("analyze", { connection: redis });
|
||||
// Worker to process analyze jobs
|
||||
export const worker = new Worker(
|
||||
"analyze",
|
||||
async (job: any) => {
|
||||
console.log("[queue] Processing analyze job", job.id, job.data.ticker);
|
||||
const { ticker, input } = job.data as { ticker: string; input: any };
|
||||
const apiKey = process.env.OPENROUTER_API_KEY;
|
||||
if (!apiKey || apiKey === "your_openrouter_api_key_here") {
|
||||
console.log("[queue] mock mode for analyze", ticker);
|
||||
const mockDecision = {
|
||||
action: "hold",
|
||||
confidence: 0.6,
|
||||
reasoning: `${ticker} analysis - Mock mode (background)`,
|
||||
};
|
||||
await db.stock.upsert({
|
||||
where: { ticker },
|
||||
create: { ticker, lastDecision: mockDecision.action, lastExplanation: mockDecision.reasoning, lastJobId: job.id?.toString() },
|
||||
update: { lastDecision: mockDecision.action, lastExplanation: mockDecision.reasoning, lastJobId: job.id?.toString() },
|
||||
});
|
||||
return mockDecision;
|
||||
}
|
||||
|
||||
const client = new OpenRouterClient(apiKey);
|
||||
const graph = new TradingGraph(client);
|
||||
const decision = await graph.propagate(ticker, input);
|
||||
|
||||
// Worker to process analyze jobs
|
||||
export const worker = new Worker(
|
||||
"analyze",
|
||||
async (job: any) => {
|
||||
console.log("[queue] Processing analyze job", job.id, job.data.ticker);
|
||||
const { ticker, input } = job.data as { ticker: string; input: any };
|
||||
const apiKey = process.env.OPENROUTER_API_KEY;
|
||||
if (!apiKey || apiKey === "your_openrouter_api_key_here") {
|
||||
console.log("[queue] mock mode for analyze", ticker);
|
||||
const mockDecision = {
|
||||
action: "hold",
|
||||
confidence: 0.6,
|
||||
reasoning: `${ticker} analysis - Mock mode (background)`,
|
||||
};
|
||||
await db.stock.upsert({
|
||||
where: { ticker },
|
||||
create: { ticker, lastDecision: mockDecision.action, lastExplanation: mockDecision.reasoning, lastJobId: job.id?.toString() },
|
||||
update: { lastDecision: mockDecision.action, lastExplanation: mockDecision.reasoning, lastJobId: job.id?.toString() },
|
||||
create: {
|
||||
ticker,
|
||||
lastDecision: decision.action as string,
|
||||
lastExplanation: (decision as any).reasoning || null,
|
||||
lastExecutionPlan: decision.executionPlan ? JSON.stringify(decision.executionPlan) : null,
|
||||
lastJobId: job.id?.toString(),
|
||||
},
|
||||
update: {
|
||||
lastDecision: decision.action as string,
|
||||
lastExplanation: (decision as any).reasoning || null,
|
||||
lastExecutionPlan: decision.executionPlan ? JSON.stringify(decision.executionPlan) : null,
|
||||
lastJobId: job.id?.toString(),
|
||||
},
|
||||
});
|
||||
return mockDecision;
|
||||
|
||||
console.log("[queue] Job complete and saved for", ticker);
|
||||
return decision;
|
||||
},
|
||||
{ connection: redis }
|
||||
);
|
||||
|
||||
export async function enqueueAnalyze(ticker: string, input: any) {
|
||||
const job = await analyzeQueue.add("analyze", { ticker, input });
|
||||
return job.id?.toString();
|
||||
}
|
||||
|
||||
export async function getJob(jobId: string) {
|
||||
const job = await analyzeQueue.getJob(jobId);
|
||||
if (!job) return null;
|
||||
const state = await job.getState();
|
||||
const failedReason = job.failedReason || null;
|
||||
const returnValue = job.returnvalue || null;
|
||||
return { id: job.id, state, failedReason, returnValue };
|
||||
}
|
||||
} else {
|
||||
// In-process fallback queue for environments without Redis (dev/tests)
|
||||
type Job = { id: string; ticker: string; input: any; state: "queued" | "processing" | "completed" | "failed"; result?: any; failedReason?: string };
|
||||
const queue: Job[] = [];
|
||||
const jobsById: Record<string, Job> = {};
|
||||
let processing = false;
|
||||
|
||||
function makeId() {
|
||||
return `${Date.now()}-${Math.floor(Math.random() * 100000)}`;
|
||||
}
|
||||
|
||||
export function enqueueAnalyze(ticker: string, input: any) {
|
||||
const id = makeId();
|
||||
const job: Job = { id, ticker, input, state: "queued" };
|
||||
queue.push(job);
|
||||
jobsById[id] = job;
|
||||
if (!processing) processQueue().catch((e) => console.error("inproc queue error:", e));
|
||||
return id;
|
||||
}
|
||||
|
||||
async function processQueue() {
|
||||
processing = true;
|
||||
while (queue.length > 0) {
|
||||
const job = queue.shift()!;
|
||||
job.state = "processing";
|
||||
try {
|
||||
const apiKey = process.env.OPENROUTER_API_KEY;
|
||||
if (!apiKey || apiKey === "your_openrouter_api_key_here") {
|
||||
const mockDecision = { action: "hold", confidence: 0.6, reasoning: `${job.ticker} analysis - Mock (inproc)` };
|
||||
job.result = mockDecision;
|
||||
job.state = "completed";
|
||||
await db.stock.upsert({
|
||||
where: { ticker: job.ticker },
|
||||
create: { ticker: job.ticker, lastDecision: mockDecision.action, lastExplanation: mockDecision.reasoning, lastJobId: job.id },
|
||||
update: { lastDecision: mockDecision.action, lastExplanation: mockDecision.reasoning, lastJobId: job.id },
|
||||
});
|
||||
continue;
|
||||
}
|
||||
const client = new OpenRouterClient(process.env.OPENROUTER_API_KEY as string);
|
||||
const graph = new TradingGraph(client);
|
||||
const decision = await graph.propagate(job.ticker, job.input);
|
||||
job.result = decision;
|
||||
job.state = "completed";
|
||||
await db.stock.upsert({
|
||||
where: { ticker: job.ticker },
|
||||
create: {
|
||||
ticker: job.ticker,
|
||||
lastDecision: decision.action as string,
|
||||
lastExplanation: (decision as any).reasoning || null,
|
||||
lastExecutionPlan: decision.executionPlan ? JSON.stringify(decision.executionPlan) : null,
|
||||
lastJobId: job.id,
|
||||
},
|
||||
update: {
|
||||
lastDecision: decision.action as string,
|
||||
lastExplanation: (decision as any).reasoning || null,
|
||||
lastExecutionPlan: decision.executionPlan ? JSON.stringify(decision.executionPlan) : null,
|
||||
lastJobId: job.id,
|
||||
},
|
||||
});
|
||||
} catch (err: any) {
|
||||
console.error("[inproc queue] job failed:", err);
|
||||
job.state = "failed";
|
||||
job.failedReason = err?.message || String(err);
|
||||
}
|
||||
}
|
||||
processing = false;
|
||||
}
|
||||
|
||||
const client = new OpenRouterClient(apiKey);
|
||||
const graph = new TradingGraph(client);
|
||||
const decision = await graph.propagate(ticker, input);
|
||||
|
||||
await db.stock.upsert({
|
||||
where: { ticker },
|
||||
create: {
|
||||
ticker,
|
||||
lastDecision: decision.action as string,
|
||||
lastExplanation: (decision as any).reasoning || null,
|
||||
lastExecutionPlan: decision.executionPlan ? JSON.stringify(decision.executionPlan) : null,
|
||||
lastJobId: job.id?.toString(),
|
||||
},
|
||||
update: {
|
||||
lastDecision: decision.action as string,
|
||||
lastExplanation: (decision as any).reasoning || null,
|
||||
lastExecutionPlan: decision.executionPlan ? JSON.stringify(decision.executionPlan) : null,
|
||||
lastJobId: job.id?.toString(),
|
||||
},
|
||||
});
|
||||
|
||||
console.log("[queue] Job complete and saved for", ticker);
|
||||
return decision;
|
||||
},
|
||||
{ connection: redis }
|
||||
);
|
||||
|
||||
// Simple enqueue function
|
||||
export async function enqueueAnalyze(ticker: string, input: any) {
|
||||
const job = await analyzeQueue.add("analyze", { ticker, input });
|
||||
return job.id?.toString();
|
||||
}
|
||||
|
||||
export async function getJob(jobId: string) {
|
||||
const job = await analyzeQueue.getJob(jobId);
|
||||
if (!job) return null;
|
||||
const state = await job.getState();
|
||||
const failedReason = job.failedReason || null;
|
||||
const returnValue = job.returnvalue || null;
|
||||
return { id: job.id, state, failedReason, returnValue };
|
||||
export async function getJob(jobId: string) {
|
||||
const job = jobsById[jobId];
|
||||
if (!job) return null;
|
||||
return { id: job.id, state: job.state, failedReason: job.failedReason || null, returnValue: job.result || null };
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user