Files
AITrader/app/lib/queue.ts
T

79 lines
3.0 KiB
TypeScript

import pkg from "bullmq";
const { Queue, Worker, QueueScheduler } = pkg as any;
import IORedis from "ioredis";
import { OpenRouterClient } from "./openrouter";
import { TradingGraph } from "../agents/tradingGraph";
import { db } from "./db.server";
const connection = process.env.REDIS_URL ? { connection: process.env.REDIS_URL } : { host: "127.0.0.1", port: 6379 } as any;
// BullMQ expects a Redis connection object; use ioredis instance for workers where needed
const redis = process.env.REDIS_URL ? new IORedis(process.env.REDIS_URL) : new IORedis();
export const analyzeQueue = new Queue("analyze", { connection: redis });
export const queueScheduler = new QueueScheduler("analyze", { connection: redis });
// Worker to process analyze jobs
export const worker = new Worker(
"analyze",
async (job: any) => {
console.log("[queue] Processing analyze job", job.id, job.data.ticker);
const { ticker, input } = job.data as { ticker: string; input: any };
const apiKey = process.env.OPENROUTER_API_KEY;
if (!apiKey || apiKey === "your_openrouter_api_key_here") {
console.log("[queue] mock mode for analyze", ticker);
const mockDecision = {
action: "hold",
confidence: 0.6,
reasoning: `${ticker} analysis - Mock mode (background)`,
};
await db.stock.upsert({
where: { ticker },
create: { ticker, lastDecision: mockDecision.action, lastExplanation: mockDecision.reasoning, lastJobId: job.id?.toString() },
update: { lastDecision: mockDecision.action, lastExplanation: mockDecision.reasoning, lastJobId: job.id?.toString() },
});
return mockDecision;
}
const client = new OpenRouterClient(apiKey);
const graph = new TradingGraph(client);
const decision = await graph.propagate(ticker, input);
await db.stock.upsert({
where: { ticker },
create: {
ticker,
lastDecision: decision.action as string,
lastExplanation: (decision as any).reasoning || null,
lastExecutionPlan: decision.executionPlan ? JSON.stringify(decision.executionPlan) : null,
lastJobId: job.id?.toString(),
},
update: {
lastDecision: decision.action as string,
lastExplanation: (decision as any).reasoning || null,
lastExecutionPlan: decision.executionPlan ? JSON.stringify(decision.executionPlan) : null,
lastJobId: job.id?.toString(),
},
});
console.log("[queue] Job complete and saved for", ticker);
return decision;
},
{ connection: redis }
);
// Simple enqueue function
export async function enqueueAnalyze(ticker: string, input: any) {
const job = await analyzeQueue.add("analyze", { ticker, input });
return job.id?.toString();
}
export async function getJob(jobId: string) {
const job = await analyzeQueue.getJob(jobId);
if (!job) return null;
const state = await job.getState();
const failedReason = job.failedReason || null;
const returnValue = job.returnvalue || null;
return { id: job.id, state, failedReason, returnValue };
}