Add job queue for background analyze, enqueue from API, update MostActiveStocks form POST, add Playwright E2E for Save button
This commit is contained in:
@@ -52,10 +52,11 @@ export default function MostActiveStocks() {
|
|||||||
setSaving((p) => ({ ...p, [symbol]: true }));
|
setSaving((p) => ({ ...p, [symbol]: true }));
|
||||||
setSaved((p) => ({ ...p, [symbol]: false }));
|
setSaved((p) => ({ ...p, [symbol]: false }));
|
||||||
try {
|
try {
|
||||||
|
const form = new FormData();
|
||||||
|
form.set("ticker", symbol);
|
||||||
const res = await fetch("/api/stocks", {
|
const res = await fetch("/api/stocks", {
|
||||||
method: "POST",
|
method: "POST",
|
||||||
headers: { "Content-Type": "application/json" },
|
body: form,
|
||||||
body: JSON.stringify({ symbol }),
|
|
||||||
});
|
});
|
||||||
if (!res.ok) {
|
if (!res.ok) {
|
||||||
const data = await res.json().catch(() => null);
|
const data = await res.json().catch(() => null);
|
||||||
|
|||||||
@@ -0,0 +1,81 @@
|
|||||||
|
import { OpenRouterClient } from "./openrouter";
|
||||||
|
import { TradingGraph } from "../agents/tradingGraph";
|
||||||
|
import { db } from "./db.server";
|
||||||
|
|
||||||
|
type AnalyzeInput = {
|
||||||
|
financialData: string;
|
||||||
|
technicalData: { prices: number[]; sma: number; ema: number; rsi: number; macd: number };
|
||||||
|
sentimentData: { headlines: string[]; source?: "news" | "social" | "stocktwits" };
|
||||||
|
};
|
||||||
|
|
||||||
|
type Job = {
|
||||||
|
id: string;
|
||||||
|
type: "analyze";
|
||||||
|
ticker: string;
|
||||||
|
input: AnalyzeInput;
|
||||||
|
};
|
||||||
|
|
||||||
|
const queue: Job[] = [];
|
||||||
|
let processing = false;
|
||||||
|
|
||||||
|
function makeId() {
|
||||||
|
return `${Date.now()}-${Math.floor(Math.random() * 100000)}`;
|
||||||
|
}
|
||||||
|
|
||||||
|
export function enqueueAnalyze(ticker: string, input: AnalyzeInput) {
|
||||||
|
const id = makeId();
|
||||||
|
queue.push({ id, type: "analyze", ticker, input });
|
||||||
|
if (!processing) {
|
||||||
|
processQueue().catch((err) => console.error("jobQueue error:", err));
|
||||||
|
}
|
||||||
|
return id;
|
||||||
|
}
|
||||||
|
|
||||||
|
async function processQueue() {
|
||||||
|
processing = true;
|
||||||
|
while (queue.length > 0) {
|
||||||
|
const job = queue.shift()!;
|
||||||
|
console.log("[jobQueue] Processing job", job.id, job.type, job.ticker);
|
||||||
|
try {
|
||||||
|
if (job.type === "analyze") {
|
||||||
|
const apiKey = process.env.OPENROUTER_API_KEY;
|
||||||
|
if (!apiKey || apiKey === "your_openrouter_api_key_here") {
|
||||||
|
console.log("[jobQueue] mock mode for analyze", job.ticker);
|
||||||
|
const mockDecision = {
|
||||||
|
action: "hold",
|
||||||
|
confidence: 0.6,
|
||||||
|
reasoning: `${job.ticker} analysis - Mock mode (background)`,
|
||||||
|
};
|
||||||
|
await db.stock.upsert({
|
||||||
|
where: { ticker: job.ticker },
|
||||||
|
create: { ticker: job.ticker, lastDecision: mockDecision.action, lastExplanation: mockDecision.reasoning },
|
||||||
|
update: { lastDecision: mockDecision.action, lastExplanation: mockDecision.reasoning },
|
||||||
|
});
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
const client = new OpenRouterClient(apiKey);
|
||||||
|
const graph = new TradingGraph(client);
|
||||||
|
const decision = await graph.propagate(job.ticker, job.input);
|
||||||
|
await db.stock.upsert({
|
||||||
|
where: { ticker: job.ticker },
|
||||||
|
create: {
|
||||||
|
ticker: job.ticker,
|
||||||
|
lastDecision: decision.action as string,
|
||||||
|
lastExplanation: (decision as any).reasoning || null,
|
||||||
|
lastExecutionPlan: decision.executionPlan ? JSON.stringify(decision.executionPlan) : null,
|
||||||
|
},
|
||||||
|
update: {
|
||||||
|
lastDecision: decision.action as string,
|
||||||
|
lastExplanation: (decision as any).reasoning || null,
|
||||||
|
lastExecutionPlan: decision.executionPlan ? JSON.stringify(decision.executionPlan) : null,
|
||||||
|
},
|
||||||
|
});
|
||||||
|
console.log("[jobQueue] Saved background decision for", job.ticker);
|
||||||
|
}
|
||||||
|
} catch (err) {
|
||||||
|
console.error("[jobQueue] job failed:", err);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
processing = false;
|
||||||
|
}
|
||||||
@@ -75,33 +75,15 @@ export async function action({ request }: { request: Request }) {
|
|||||||
console.log("[analyze] Running trading graph...");
|
console.log("[analyze] Running trading graph...");
|
||||||
|
|
||||||
if (body.background) {
|
if (body.background) {
|
||||||
// Run in background: start async propagation and return 202 immediately
|
// Enqueue background analyze job and return 202 immediately
|
||||||
(async () => {
|
|
||||||
try {
|
try {
|
||||||
const decision = await graph.propagate(ticker, input);
|
const { enqueueAnalyze } = await import("../../lib/jobQueue");
|
||||||
console.log("[analyze] Background decision received:", JSON.stringify(decision));
|
const jobId = enqueueAnalyze(ticker, input);
|
||||||
// persist last decision to DB
|
return Response.json({ status: "queued", jobId }, { status: 202 });
|
||||||
await db.stock.upsert({
|
} catch (enqueueErr) {
|
||||||
where: { ticker },
|
console.error("[analyze] enqueue error:", enqueueErr);
|
||||||
create: {
|
return Response.json({ error: "failed to enqueue" }, { status: 500 });
|
||||||
ticker,
|
|
||||||
lastDecision: decision.action as string,
|
|
||||||
lastExplanation: (decision as any).reasoning || null,
|
|
||||||
lastExecutionPlan: decision.executionPlan ? JSON.stringify(decision.executionPlan) : null,
|
|
||||||
},
|
|
||||||
update: {
|
|
||||||
lastDecision: decision.action as string,
|
|
||||||
lastExplanation: (decision as any).reasoning || null,
|
|
||||||
lastExecutionPlan: decision.executionPlan ? JSON.stringify(decision.executionPlan) : null,
|
|
||||||
},
|
|
||||||
});
|
|
||||||
console.log("[analyze] Background decision saved to DB for", ticker);
|
|
||||||
} catch (bgErr) {
|
|
||||||
console.error("[analyze] Background error:", bgErr);
|
|
||||||
}
|
}
|
||||||
})();
|
|
||||||
|
|
||||||
return Response.json({ status: "queued" }, { status: 202 });
|
|
||||||
}
|
}
|
||||||
|
|
||||||
const decision = await graph.propagate(ticker, input);
|
const decision = await graph.propagate(ticker, input);
|
||||||
|
|||||||
@@ -0,0 +1,31 @@
|
|||||||
|
import { test, expect } from "@playwright/test";
|
||||||
|
|
||||||
|
test("Save button enqueues analyze and upserts stock", async ({ page }) => {
|
||||||
|
await page.goto("/stocks");
|
||||||
|
await page.waitForSelector("table tbody tr");
|
||||||
|
const firstRow = page.locator("tbody tr").first();
|
||||||
|
const symbol = (await firstRow.locator("td a").textContent()) || "";
|
||||||
|
|
||||||
|
// Click the Save button in the first row and wait for analyze enqueue response
|
||||||
|
const [resp] = await Promise.all([
|
||||||
|
page.waitForResponse((r) => r.url().endsWith('/api/analyze') && (r.status() === 202 || r.status() === 200)),
|
||||||
|
firstRow.locator("button:has-text('Save')").click(),
|
||||||
|
]);
|
||||||
|
|
||||||
|
expect([200, 202]).toContain(resp.status());
|
||||||
|
|
||||||
|
// Poll the /api/stocks until the ticker appears (give it a few seconds for background job)
|
||||||
|
const base = new URL(page.url()).origin;
|
||||||
|
let found = false;
|
||||||
|
for (let i = 0; i < 20; i++) {
|
||||||
|
const r = await page.request.get(`${base}/api/stocks`);
|
||||||
|
const list = await r.json();
|
||||||
|
if (list.some((s: any) => s.ticker === symbol.trim())) {
|
||||||
|
found = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
await page.waitForTimeout(300);
|
||||||
|
}
|
||||||
|
|
||||||
|
expect(found).toBeTruthy();
|
||||||
|
});
|
||||||
Reference in New Issue
Block a user