261 lines
10 KiB
TypeScript
261 lines
10 KiB
TypeScript
import pkg from "bullmq";
|
|
const { Queue, Worker } = pkg as any;
|
|
import IORedis from "ioredis";
|
|
import { fetchAccount, fetchRecentCloses } from "./alpacaClient";
|
|
import { buildTradingGraph, getTradingConfig } from "./tradingConfig.server";
|
|
import { db } from "./db.server";
|
|
|
|
const REDIS_URL = process.env.REDIS_URL;
|
|
|
|
let analyzeQueue: any = undefined;
|
|
let worker: any = undefined;
|
|
let enqueueAnalyze: (ticker: string, input: any) => Promise<string> | string;
|
|
let getJob: (jobId: string) => Promise<any | null>;
|
|
let listRecentJobs: (ticker?: string, limit?: number) => Promise<any[]>;
|
|
let cancelJob: (jobId: string) => Promise<boolean>;
|
|
|
|
if (REDIS_URL) {
|
|
const redis = new IORedis(REDIS_URL as string);
|
|
analyzeQueue = new Queue("analyze", { connection: redis });
|
|
|
|
// Worker to process analyze jobs
|
|
worker = new Worker(
|
|
"analyze",
|
|
async (job: any) => {
|
|
console.log("[queue] Processing analyze job", job.id, job.data.ticker);
|
|
const { ticker, input } = job.data as { ticker: string; input: any };
|
|
const apiKey = process.env.OPENROUTER_API_KEY;
|
|
if (!apiKey || apiKey === "your_openrouter_api_key_here") {
|
|
console.log("[queue] mock mode for analyze", ticker);
|
|
const mockDecision = {
|
|
action: "hold",
|
|
confidence: 0.6,
|
|
reasoning: `${ticker} analysis - Mock mode (background)`,
|
|
};
|
|
await db.stock.upsert({
|
|
where: { ticker },
|
|
create: { ticker, lastDecision: mockDecision.action, lastExplanation: mockDecision.reasoning, lastJobId: job.id?.toString() },
|
|
update: { lastDecision: mockDecision.action, lastExplanation: mockDecision.reasoning, lastJobId: job.id?.toString() },
|
|
});
|
|
return mockDecision;
|
|
}
|
|
|
|
const { graph, config } = await buildTradingGraph(apiKey);
|
|
console.log("[queue] Trading config:", config);
|
|
// Fetch latest Alpaca account and prices; abort job if unavailable so work runs on fresh data
|
|
try {
|
|
const account = await fetchAccount();
|
|
const prices = await fetchRecentCloses(ticker);
|
|
input.account = input.account || account;
|
|
input.technicalData = input.technicalData || {};
|
|
input.technicalData.prices = input.technicalData.prices && input.technicalData.prices.length ? input.technicalData.prices : prices;
|
|
} catch (e) {
|
|
console.error("[queue] Failed to fetch Alpaca data, aborting job:", e);
|
|
// Throw to mark the job as failed early
|
|
throw new Error("Failed to fetch Alpaca data: " + String(e));
|
|
}
|
|
|
|
let decision = await graph.propagate(ticker, input);
|
|
|
|
// Enrich executionPlan deterministically server-side before persisting
|
|
try {
|
|
const { enrichExecutionPlan, verifyExecutionPlanWithLLM } = await import("./execution");
|
|
decision = enrichExecutionPlan(decision, input);
|
|
if (process.env.OPENROUTER_API_KEY) {
|
|
try {
|
|
decision = await verifyExecutionPlanWithLLM(decision, input, config.model);
|
|
} catch (e) {
|
|
console.warn("[queue] LLM verification failed:", e);
|
|
}
|
|
}
|
|
} catch (e) {
|
|
console.warn("[queue] Failed to enrich execution plan:", e);
|
|
}
|
|
|
|
await db.stock.upsert({
|
|
where: { ticker },
|
|
create: {
|
|
ticker,
|
|
lastDecision: decision.action as string,
|
|
lastExplanation: (decision as any).reasoning || null,
|
|
lastExecutionPlan: decision.executionPlan ? JSON.stringify(decision.executionPlan) : null,
|
|
lastJobId: job.id?.toString(),
|
|
},
|
|
update: {
|
|
lastDecision: decision.action as string,
|
|
lastExplanation: (decision as any).reasoning || null,
|
|
lastExecutionPlan: decision.executionPlan ? JSON.stringify(decision.executionPlan) : null,
|
|
lastJobId: job.id?.toString(),
|
|
},
|
|
});
|
|
|
|
console.log("[queue] Job complete and saved for", ticker);
|
|
return decision;
|
|
},
|
|
{ connection: redis }
|
|
);
|
|
|
|
enqueueAnalyze = async (ticker: string, input: any) => {
|
|
const job = await analyzeQueue.add("analyze", { ticker, input });
|
|
return job.id?.toString();
|
|
};
|
|
|
|
getJob = async (jobId: string) => {
|
|
const job = await analyzeQueue.getJob(jobId);
|
|
if (!job) return null;
|
|
const state = await job.getState();
|
|
const failedReason = job.failedReason || null;
|
|
const returnValue = job.returnvalue || null;
|
|
return { id: job.id, state, failedReason, returnValue };
|
|
};
|
|
|
|
listRecentJobs = async (ticker?: string, limit = 50) => {
|
|
const jobs = await analyzeQueue.getJobs(["waiting", "active", "completed", "failed", "delayed"], 0, limit - 1);
|
|
const mapped = await Promise.all(jobs.map(async (j: any) => ({ id: j.id, name: j.name, data: j.data, state: await j.getState(), returnValue: j.returnvalue || null })));
|
|
if (ticker) return mapped.filter((j: any) => j.data?.ticker === ticker);
|
|
return mapped;
|
|
};
|
|
|
|
cancelJob = async (jobId: string) => {
|
|
try {
|
|
const job = await analyzeQueue.getJob(jobId);
|
|
if (!job) return false;
|
|
const state = await job.getState();
|
|
if (state === "waiting" || state === "delayed") {
|
|
await job.remove();
|
|
return true;
|
|
}
|
|
return false;
|
|
} catch (err) {
|
|
console.error("cancelJob error:", err);
|
|
return false;
|
|
}
|
|
};
|
|
} else {
|
|
// In-process fallback queue for environments without Redis (dev/tests)
|
|
type Job = { id: string; ticker: string; input: any; state: "queued" | "processing" | "completed" | "failed"; result?: any; failedReason?: string };
|
|
const queue: Job[] = [];
|
|
const jobsById: Record<string, Job> = {};
|
|
let processing = false;
|
|
|
|
function makeId() {
|
|
return `${Date.now()}-${Math.floor(Math.random() * 100000)}`;
|
|
}
|
|
|
|
enqueueAnalyze = (ticker: string, input: any) => {
|
|
const id = makeId();
|
|
const job: Job = { id, ticker, input, state: "queued" };
|
|
queue.push(job);
|
|
jobsById[id] = job;
|
|
if (!processing) processQueue().catch((e) => console.error("inproc queue error:", e));
|
|
return id;
|
|
};
|
|
|
|
listRecentJobs = async (ticker?: string, limit = 50) => {
|
|
const items = Object.values(jobsById).slice(-limit).reverse().map((j) => ({ id: j.id, data: { ticker: j.ticker }, state: j.state, returnValue: j.result || null }));
|
|
if (ticker) return items.filter((it) => it.data?.ticker === ticker);
|
|
return items;
|
|
};
|
|
|
|
cancelJob = async (jobId: string) => {
|
|
const job = jobsById[jobId];
|
|
if (!job) return false;
|
|
// If queued but not yet processing, remove from queue
|
|
if (job.state === "queued") {
|
|
const idx = queue.findIndex((q) => q.id === jobId);
|
|
if (idx !== -1) queue.splice(idx, 1);
|
|
job.state = "failed";
|
|
job.failedReason = "cancelled";
|
|
return true;
|
|
}
|
|
// Can't cancel if already processing/completed/failed
|
|
return false;
|
|
};
|
|
|
|
async function processQueue() {
|
|
processing = true;
|
|
while (queue.length > 0) {
|
|
const job = queue.shift()!;
|
|
job.state = "processing";
|
|
try {
|
|
const apiKey = process.env.OPENROUTER_API_KEY;
|
|
if (!apiKey || apiKey === "your_openrouter_api_key_here") {
|
|
const mockDecision = { action: "hold", confidence: 0.6, reasoning: `${job.ticker} analysis - Mock (inproc)` };
|
|
job.result = mockDecision;
|
|
job.state = "completed";
|
|
await db.stock.upsert({
|
|
where: { ticker: job.ticker },
|
|
create: { ticker: job.ticker, lastDecision: mockDecision.action, lastExplanation: mockDecision.reasoning, lastJobId: job.id },
|
|
update: { lastDecision: mockDecision.action, lastExplanation: mockDecision.reasoning, lastJobId: job.id },
|
|
});
|
|
continue;
|
|
}
|
|
const { graph, config } = await buildTradingGraph(process.env.OPENROUTER_API_KEY as string);
|
|
// Fetch latest Alpaca account and prices; abort job if unavailable so work runs on fresh data
|
|
try {
|
|
const account = await fetchAccount();
|
|
const prices = await fetchRecentCloses(job.ticker);
|
|
job.input = job.input || {};
|
|
job.input.account = job.input.account || account;
|
|
job.input.technicalData = job.input.technicalData || {};
|
|
job.input.technicalData.prices = job.input.technicalData.prices && job.input.technicalData.prices.length ? job.input.technicalData.prices : prices;
|
|
} catch (e) {
|
|
console.error("[inproc queue] Failed to fetch Alpaca data, aborting job:", e);
|
|
// throw so the outer catch marks job as failed
|
|
throw new Error("Failed to fetch Alpaca data: " + String(e));
|
|
}
|
|
|
|
let decision = await graph.propagate(job.ticker, job.input);
|
|
|
|
// Enrich executionPlan deterministically server-side before persisting
|
|
try {
|
|
const { enrichExecutionPlan, verifyExecutionPlanWithLLM } = await import("./execution");
|
|
decision = enrichExecutionPlan(decision, job.input);
|
|
if (process.env.OPENROUTER_API_KEY) {
|
|
try {
|
|
decision = await verifyExecutionPlanWithLLM(decision, job.input, config.model);
|
|
} catch (e) {
|
|
console.warn("[inproc queue] LLM verification failed:", e);
|
|
}
|
|
}
|
|
} catch (e) {
|
|
console.warn("[inproc queue] Failed to enrich execution plan:", e);
|
|
}
|
|
|
|
job.result = decision;
|
|
job.state = "completed";
|
|
await db.stock.upsert({
|
|
where: { ticker: job.ticker },
|
|
create: {
|
|
ticker: job.ticker,
|
|
lastDecision: decision.action as string,
|
|
lastExplanation: (decision as any).reasoning || null,
|
|
lastExecutionPlan: decision.executionPlan ? JSON.stringify(decision.executionPlan) : null,
|
|
lastJobId: job.id,
|
|
},
|
|
update: {
|
|
lastDecision: decision.action as string,
|
|
lastExplanation: (decision as any).reasoning || null,
|
|
lastExecutionPlan: decision.executionPlan ? JSON.stringify(decision.executionPlan) : null,
|
|
lastJobId: job.id,
|
|
},
|
|
});
|
|
} catch (err: any) {
|
|
console.error("[inproc queue] job failed:", err);
|
|
job.state = "failed";
|
|
job.failedReason = err?.message || String(err);
|
|
}
|
|
}
|
|
processing = false;
|
|
}
|
|
|
|
getJob = async (jobId: string) => {
|
|
const job = jobsById[jobId];
|
|
if (!job) return null;
|
|
return { id: job.id, state: job.state, failedReason: job.failedReason || null, returnValue: job.result || null };
|
|
};
|
|
}
|
|
|
|
export { enqueueAnalyze, getJob, listRecentJobs, cancelJob, analyzeQueue, worker };
|
|
|