feat: always enqueue analyze jobs as background, save jobId to DB, reuse active jobs, cleanup stale jobs
This commit is contained in:
+6
-10
@@ -22,11 +22,9 @@ if (REDIS_URL) {
|
|||||||
worker = new Worker(
|
worker = new Worker(
|
||||||
"analyze",
|
"analyze",
|
||||||
async (job: any) => {
|
async (job: any) => {
|
||||||
console.log("[queue] Processing analyze job", job.id, job.data.ticker);
|
|
||||||
const { ticker, input } = job.data as { ticker: string; input: any };
|
const { ticker, input } = job.data as { ticker: string; input: any };
|
||||||
const apiKey = process.env.OPENROUTER_API_KEY;
|
const apiKey = process.env.OPENROUTER_API_KEY;
|
||||||
if (!apiKey || apiKey === "your_openrouter_api_key_here") {
|
if (!apiKey || apiKey === "your_openrouter_api_key_here") {
|
||||||
console.log("[queue] mock mode for analyze", ticker);
|
|
||||||
const mockDecision = {
|
const mockDecision = {
|
||||||
action: "hold",
|
action: "hold",
|
||||||
confidence: 0.6,
|
confidence: 0.6,
|
||||||
@@ -41,7 +39,6 @@ if (REDIS_URL) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
const { graph, config } = await buildTradingGraph(apiKey);
|
const { graph, config } = await buildTradingGraph(apiKey);
|
||||||
console.log("[queue] Trading config:", config);
|
|
||||||
// Fetch latest Alpaca account and prices; abort job if unavailable so work runs on fresh data
|
// Fetch latest Alpaca account and prices; abort job if unavailable so work runs on fresh data
|
||||||
try {
|
try {
|
||||||
const account = await fetchAccount();
|
const account = await fetchAccount();
|
||||||
@@ -89,7 +86,6 @@ if (REDIS_URL) {
|
|||||||
},
|
},
|
||||||
});
|
});
|
||||||
|
|
||||||
console.log("[queue] Job complete and saved for", ticker);
|
|
||||||
return decision;
|
return decision;
|
||||||
},
|
},
|
||||||
{ connection: redis }
|
{ connection: redis }
|
||||||
@@ -106,12 +102,12 @@ if (REDIS_URL) {
|
|||||||
const state = await job.getState();
|
const state = await job.getState();
|
||||||
const failedReason = job.failedReason || null;
|
const failedReason = job.failedReason || null;
|
||||||
const returnValue = job.returnvalue || null;
|
const returnValue = job.returnvalue || null;
|
||||||
return { id: job.id, state, failedReason, returnValue };
|
return { id: job.id, state, failedReason, returnValue, timestamp: job.timestamp ?? job.data?.timestamp ?? null };
|
||||||
};
|
};
|
||||||
|
|
||||||
listRecentJobs = async (ticker?: string, limit = 50) => {
|
listRecentJobs = async (ticker?: string, limit = 50) => {
|
||||||
const jobs = await analyzeQueue.getJobs(["waiting", "active", "completed", "failed", "delayed"], 0, limit - 1);
|
const jobs = await analyzeQueue.getJobs(["waiting", "active", "completed", "failed", "delayed"], 0, limit - 1);
|
||||||
const mapped = await Promise.all(jobs.map(async (j: any) => ({ id: j.id, name: j.name, data: j.data, state: await j.getState(), returnValue: j.returnvalue || null })));
|
const mapped = await Promise.all(jobs.map(async (j: any) => ({ id: j.id, name: j.name, data: j.data, state: await j.getState(), returnValue: j.returnvalue || null, timestamp: j.timestamp ?? j.data?.timestamp ?? null })));
|
||||||
if (ticker) return mapped.filter((j: any) => j.data?.ticker === ticker);
|
if (ticker) return mapped.filter((j: any) => j.data?.ticker === ticker);
|
||||||
return mapped;
|
return mapped;
|
||||||
};
|
};
|
||||||
@@ -133,7 +129,7 @@ if (REDIS_URL) {
|
|||||||
};
|
};
|
||||||
} else {
|
} else {
|
||||||
// In-process fallback queue for environments without Redis (dev/tests)
|
// In-process fallback queue for environments without Redis (dev/tests)
|
||||||
type Job = { id: string; ticker: string; input: any; state: "queued" | "processing" | "completed" | "failed"; result?: any; failedReason?: string };
|
type Job = { id: string; ticker: string; input: any; state: "queued" | "processing" | "completed" | "failed"; result?: any; failedReason?: string; timestamp: number };
|
||||||
const queue: Job[] = [];
|
const queue: Job[] = [];
|
||||||
const jobsById: Record<string, Job> = {};
|
const jobsById: Record<string, Job> = {};
|
||||||
let processing = false;
|
let processing = false;
|
||||||
@@ -144,7 +140,7 @@ if (REDIS_URL) {
|
|||||||
|
|
||||||
enqueueAnalyze = (ticker: string, input: any) => {
|
enqueueAnalyze = (ticker: string, input: any) => {
|
||||||
const id = makeId();
|
const id = makeId();
|
||||||
const job: Job = { id, ticker, input, state: "queued" };
|
const job: Job = { id, ticker, input, state: "queued", timestamp: input?.timestamp ?? Date.now() };
|
||||||
queue.push(job);
|
queue.push(job);
|
||||||
jobsById[id] = job;
|
jobsById[id] = job;
|
||||||
if (!processing) processQueue().catch((e) => console.error("inproc queue error:", e));
|
if (!processing) processQueue().catch((e) => console.error("inproc queue error:", e));
|
||||||
@@ -152,7 +148,7 @@ if (REDIS_URL) {
|
|||||||
};
|
};
|
||||||
|
|
||||||
listRecentJobs = async (ticker?: string, limit = 50) => {
|
listRecentJobs = async (ticker?: string, limit = 50) => {
|
||||||
const items = Object.values(jobsById).slice(-limit).reverse().map((j) => ({ id: j.id, data: { ticker: j.ticker }, state: j.state, returnValue: j.result || null }));
|
const items = Object.values(jobsById).slice(-limit).reverse().map((j) => ({ id: j.id, data: { ticker: j.ticker, timestamp: j.timestamp }, state: j.state, returnValue: j.result || null, timestamp: j.timestamp }));
|
||||||
if (ticker) return items.filter((it) => it.data?.ticker === ticker);
|
if (ticker) return items.filter((it) => it.data?.ticker === ticker);
|
||||||
return items;
|
return items;
|
||||||
};
|
};
|
||||||
@@ -252,7 +248,7 @@ if (REDIS_URL) {
|
|||||||
getJob = async (jobId: string) => {
|
getJob = async (jobId: string) => {
|
||||||
const job = jobsById[jobId];
|
const job = jobsById[jobId];
|
||||||
if (!job) return null;
|
if (!job) return null;
|
||||||
return { id: job.id, state: job.state, failedReason: job.failedReason || null, returnValue: job.result || null };
|
return { id: job.id, state: job.state, failedReason: job.failedReason || null, returnValue: job.result || null, timestamp: job.timestamp };
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
+42
-7
@@ -386,13 +386,6 @@ export default function Analyze() {
|
|||||||
const quote = quoteRes.ok ? await quoteRes.json() : null;
|
const quote = quoteRes.ok ? await quoteRes.json() : null;
|
||||||
const indicatorsData = indicatorsRes.ok ? await indicatorsRes.json() : null;
|
const indicatorsData = indicatorsRes.ok ? await indicatorsRes.json() : null;
|
||||||
|
|
||||||
const analysisRes = await fetch("/api/analyze", {
|
|
||||||
method: "POST",
|
|
||||||
headers: { "Content-Type": "application/json" },
|
|
||||||
body: JSON.stringify({ ticker }),
|
|
||||||
});
|
|
||||||
const analysis = analysisRes.ok ? await analysisRes.json() : null;
|
|
||||||
|
|
||||||
const indicators: Indicators = {
|
const indicators: Indicators = {
|
||||||
rsi: indicatorsData?.indicators?.rsi ?? null,
|
rsi: indicatorsData?.indicators?.rsi ?? null,
|
||||||
sma20: indicatorsData?.indicators?.sma ?? null,
|
sma20: indicatorsData?.indicators?.sma ?? null,
|
||||||
@@ -407,6 +400,48 @@ export default function Analyze() {
|
|||||||
avgVolume: indicatorsData?.indicators?.avgVolume ?? null,
|
avgVolume: indicatorsData?.indicators?.avgVolume ?? null,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
const analysisRes = await fetch("/api/analyze", {
|
||||||
|
method: "POST",
|
||||||
|
headers: { "Content-Type": "application/json" },
|
||||||
|
body: JSON.stringify({ ticker }),
|
||||||
|
});
|
||||||
|
|
||||||
|
if (analysisRes.status === 202) {
|
||||||
|
// Background job queued - poll for completion
|
||||||
|
const data = await analysisRes.json();
|
||||||
|
const jobId = data.jobId;
|
||||||
|
let cancelled = false;
|
||||||
|
let timer: ReturnType<typeof setTimeout> | null = null;
|
||||||
|
|
||||||
|
const poll = async () => {
|
||||||
|
try {
|
||||||
|
const jr = await fetch(`/api/jobs/${jobId}`);
|
||||||
|
if (!jr.ok) { if (!cancelled) timer = setTimeout(poll, 2000); return; }
|
||||||
|
const j = await jr.json();
|
||||||
|
if (cancelled) return;
|
||||||
|
|
||||||
|
if (j.state === "completed" && j.returnValue) {
|
||||||
|
setStocks((s) => s.map((st) =>
|
||||||
|
st.id === id ? { ...st, loading: false, currentPrice: quote?.price ?? null, indicators, analysis: j.returnValue } : st
|
||||||
|
));
|
||||||
|
cancelled = true;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (j.state === "failed") {
|
||||||
|
setStocks((s) => s.map((st) => st.id === id ? { ...st, loading: false } : st));
|
||||||
|
cancelled = true;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
} catch (e) { /* ignore */ }
|
||||||
|
if (!cancelled) timer = setTimeout(poll, 2000);
|
||||||
|
};
|
||||||
|
poll();
|
||||||
|
|
||||||
|
return () => { cancelled = true; if (timer) clearTimeout(timer); };
|
||||||
|
}
|
||||||
|
|
||||||
|
// Fallback: synchronous response
|
||||||
|
const analysis = analysisRes.ok ? await analysisRes.json() : null;
|
||||||
setStocks((s) => s.map((st) =>
|
setStocks((s) => s.map((st) =>
|
||||||
st.id === id
|
st.id === id
|
||||||
? { ...st, loading: false, currentPrice: quote?.price ?? null, indicators, analysis }
|
? { ...st, loading: false, currentPrice: quote?.price ?? null, indicators, analysis }
|
||||||
|
|||||||
+49
-73
@@ -2,6 +2,8 @@
|
|||||||
|
|
||||||
// Server-only imports are loaded dynamically inside the action to avoid client bundling issues
|
// Server-only imports are loaded dynamically inside the action to avoid client bundling issues
|
||||||
|
|
||||||
|
const JOB_TIMEOUT_MS = 5 * 60 * 1000; // 5 minutes
|
||||||
|
|
||||||
export async function action({ request }: { request: Request }) {
|
export async function action({ request }: { request: Request }) {
|
||||||
const body = await request.json();
|
const body = await request.json();
|
||||||
|
|
||||||
@@ -12,66 +14,58 @@ export async function action({ request }: { request: Request }) {
|
|||||||
return Response.json({ error: "ticker is required" }, { status: 400 });
|
return Response.json({ error: "ticker is required" }, { status: 400 });
|
||||||
}
|
}
|
||||||
|
|
||||||
// Load server-only modules dynamically to prevent them from being included in client bundles
|
|
||||||
const { buildTradingGraph } = await import("../../lib/tradingConfig.server");
|
|
||||||
const { db } = await import("../../lib/db.server");
|
const { db } = await import("../../lib/db.server");
|
||||||
const { fetchAccount, fetchRecentCloses, fetchBars } = await import("../../lib/alpacaClient");
|
const { fetchAccount, fetchRecentCloses, fetchBars } = await import("../../lib/alpacaClient");
|
||||||
|
const { getJob, listRecentJobs, cancelJob } = await import("../../lib/queue");
|
||||||
|
|
||||||
const apiKey = process.env.OPENROUTER_API_KEY;
|
// Clean up old unfinished jobs for this ticker (older than timeout)
|
||||||
|
try {
|
||||||
if (!apiKey || apiKey === "your_openrouter_api_key_here") {
|
const recentJobs = await listRecentJobs(ticker, 50);
|
||||||
const mockDecision = {
|
for (const j of recentJobs) {
|
||||||
action: "hold" as const,
|
if (j.state === "waiting" || j.state === "active" || j.state === "delayed") {
|
||||||
confidence: 0.75,
|
// Check if the job is too old
|
||||||
reasoning: `${ticker} analysis - Mock mode: positive momentum detected with neutral technical signals`,
|
const jobCreatedAt = j.data?.timestamp;
|
||||||
agentSignals: [
|
if (jobCreatedAt && Date.now() - jobCreatedAt > JOB_TIMEOUT_MS) {
|
||||||
{
|
await cancelJob(j.id);
|
||||||
agent: "fundamentals" as const,
|
}
|
||||||
signal: "bullish" as const,
|
}
|
||||||
confidence: 0.7,
|
}
|
||||||
reasoning: "Strong fundamentals with positive earnings outlook",
|
} catch (e) { /* ignore cleanup errors */ }
|
||||||
timestamp: new Date().toISOString(),
|
|
||||||
},
|
|
||||||
{
|
|
||||||
agent: "technical" as const,
|
|
||||||
signal: "neutral" as const,
|
|
||||||
confidence: 0.6,
|
|
||||||
reasoning: "Mixed technical indicators",
|
|
||||||
timestamp: new Date().toISOString(),
|
|
||||||
},
|
|
||||||
],
|
|
||||||
debateRounds: [
|
|
||||||
{
|
|
||||||
bullishView: "Bullish case supported by fundamentals and momentum",
|
|
||||||
bearishView: "Bearish case from mixed technical signals",
|
|
||||||
researcher: "bullish" as const,
|
|
||||||
},
|
|
||||||
],
|
|
||||||
};
|
|
||||||
return Response.json(mockDecision);
|
|
||||||
}
|
|
||||||
|
|
||||||
const { graph, config } = await buildTradingGraph(apiKey);
|
// Check if there's a recent unfinished job that can be reused
|
||||||
|
try {
|
||||||
|
const recentJobs = await listRecentJobs(ticker, 10);
|
||||||
|
const activeJob = recentJobs.find((j: any) => j.state === "waiting" || j.state === "active");
|
||||||
|
if (activeJob) {
|
||||||
|
// Return existing job ID instead of creating a new one
|
||||||
|
const jobId = activeJob.id;
|
||||||
|
// Update the stock record with this job ID
|
||||||
|
await db.stock.upsert({
|
||||||
|
where: { ticker },
|
||||||
|
update: { lastJobId: jobId },
|
||||||
|
create: { ticker, lastJobId: jobId },
|
||||||
|
});
|
||||||
|
return Response.json({ status: "queued", jobId }, { status: 202 });
|
||||||
|
}
|
||||||
|
} catch (e) { /* ignore */ }
|
||||||
|
|
||||||
// Fetch latest Alpaca account and recent prices; abort if unavailable
|
// Fetch latest Alpaca account and recent prices
|
||||||
let account: any = undefined;
|
let account: any = undefined;
|
||||||
let prices: number[] = [];
|
let prices: number[] = [];
|
||||||
let recentBars: any[] = [];
|
let recentBars: any[] = [];
|
||||||
try {
|
try {
|
||||||
account = await fetchAccount();
|
account = await fetchAccount();
|
||||||
prices = await fetchRecentCloses(ticker);
|
prices = await fetchRecentCloses(ticker);
|
||||||
// Also fetch recent intraday bars to enable deterministic execution plan calculation
|
|
||||||
try {
|
try {
|
||||||
recentBars = await fetchBars(ticker, '1Min', { limit: 200 });
|
recentBars = await fetchBars(ticker, '1Min', { limit: 200 });
|
||||||
// derive prices from bars if available (prefer freshest closes)
|
|
||||||
if (recentBars && recentBars.length) {
|
if (recentBars && recentBars.length) {
|
||||||
prices = recentBars.map((b: any) => (typeof b.ClosePrice === 'number' ? b.ClosePrice : (typeof b.c === 'number' ? b.c : 0))).filter((p: number) => p > 0);
|
prices = recentBars.map((b: any) => (typeof b.ClosePrice === 'number' ? b.ClosePrice : (typeof b.c === 'number' ? b.c : 0))).filter((p: number) => p > 0);
|
||||||
}
|
}
|
||||||
} catch (barErr) {
|
} catch (barErr) {
|
||||||
console.warn('[analyze] Failed to fetch recent bars for deterministic execution plan:', barErr);
|
console.warn('[analyze] Failed to fetch recent bars:', barErr);
|
||||||
}
|
}
|
||||||
} catch (e) {
|
} catch (e) {
|
||||||
console.error("[analyze] Failed to fetch Alpaca data before analysis:", e);
|
console.error("[analyze] Failed to fetch Alpaca data:", e);
|
||||||
return Response.json({ error: "Failed to fetch Alpaca data: " + String(e) }, { status: 502 });
|
return Response.json({ error: "Failed to fetch Alpaca data: " + String(e) }, { status: 502 });
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -90,42 +84,24 @@ export async function action({ request }: { request: Request }) {
|
|||||||
source: "news" as const,
|
source: "news" as const,
|
||||||
},
|
},
|
||||||
account,
|
account,
|
||||||
|
timestamp: Date.now(),
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// Always enqueue as background job
|
||||||
try {
|
try {
|
||||||
if (body.background) {
|
const { enqueueAnalyze } = await import("../../lib/queue");
|
||||||
// Enqueue background analyze job and return 202 immediately
|
const jobId = await enqueueAnalyze(ticker, input);
|
||||||
try {
|
|
||||||
const { enqueueAnalyze } = await import("../../lib/queue");
|
|
||||||
const jobId = await enqueueAnalyze(ticker, input);
|
|
||||||
return Response.json({ status: "queued", jobId }, { status: 202 });
|
|
||||||
} catch (enqueueErr) {
|
|
||||||
console.error("[analyze] enqueue error:", enqueueErr);
|
|
||||||
return Response.json({ error: "failed to enqueue" }, { status: 500 });
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
let decision = await graph.propagate(ticker, input);
|
// Save jobId to DB stock record
|
||||||
// Enrich executionPlan deterministically on server-side
|
await db.stock.upsert({
|
||||||
try {
|
where: { ticker },
|
||||||
const { enrichExecutionPlan, verifyExecutionPlanWithLLM } = await import("../../lib/execution");
|
update: { lastJobId: jobId },
|
||||||
decision = enrichExecutionPlan(decision, input);
|
create: { ticker, lastJobId: jobId },
|
||||||
// Optionally ask LLM to verify/adjust the computed plan if API key is present
|
});
|
||||||
if (process.env.OPENROUTER_API_KEY) {
|
|
||||||
try {
|
|
||||||
decision = await verifyExecutionPlanWithLLM(decision, input, config.model);
|
|
||||||
} catch (e) {
|
|
||||||
console.warn("LLM verification failed:", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} catch (e) {
|
|
||||||
console.warn("Failed to enrich execution plan:", e);
|
|
||||||
}
|
|
||||||
|
|
||||||
return Response.json(decision);
|
return Response.json({ status: "queued", jobId }, { status: 202 });
|
||||||
} catch (error) {
|
} catch (enqueueErr) {
|
||||||
const message = error instanceof Error ? error.message : "Unknown error";
|
console.error("[analyze] enqueue error:", enqueueErr);
|
||||||
console.error("[analyze] Error:", error);
|
return Response.json({ error: "failed to enqueue" }, { status: 500 });
|
||||||
return Response.json({ error: message }, { status: 500 });
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user