import alpacaService from "../../lib/alpacaClient"; export async function loader({ request }: { request: Request }) { const url = new URL(request.url); const ticker = (url.searchParams.get("ticker") || "").toUpperCase(); if (!ticker) return new Response("ticker required", { status: 400 }); const timeframe = url.searchParams.get("timeframe") || "1Min"; // default to 1Min bars for live price function mapToAlpacaTimeframe(tf: string) { switch (tf) { case "1H": return "1Hour"; case "1D": return "1Day"; default: return tf; } } const alpacaTimeframe = mapToAlpacaTimeframe(timeframe); const headers = new Headers({ "Content-Type": "text/event-stream", "Cache-Control": "no-cache", Connection: "keep-alive", }); // Create a ReadableStream that polls latest bar with adaptive backoff and SSE let closed = false; const stream = new ReadableStream({ start(controller) { // helper to push SSE event function pushEvent(obj: any) { try { const payload = `data: ${JSON.stringify(obj)}\n\n`; controller.enqueue(new TextEncoder().encode(payload)); } catch (e) { console.warn("price-stream: failed to enqueue", e); } } // initial ping pushEvent({ event: "connected", ticker, timeframe }); const baseDelay = 5000; // start with 5s between Alpaca calls const maxDelay = 60000; // cap backoff at 60s let delay = baseDelay; let lastBarId: string | number | null = null; async function poll() { if (closed) return; try { const last = await alpacaService.fetchLatestBar(ticker, alpacaTimeframe); const price = last ? (last.ClosePrice ?? last.c ?? null) : null; // create a dedupe id from available fields const barId = last ? (last.T ?? last.t ?? last.Timestamp ?? last.ClosePrice ?? last.c ?? null) : null; if (price != null) { if (barId == null || barId !== lastBarId) { lastBarId = barId; pushEvent({ price, ts: Date.now(), timeframe }); } } else { pushEvent({ error: "no_bar", ts: Date.now() }); } // on success, reset backoff delay = baseDelay; } catch (err: any) { const msg = String(err?.message ?? err ?? "error"); console.error("price-stream: error fetching latest bar", msg); pushEvent({ error: msg, ts: Date.now() }); // apply exponential backoff on rate limit errors if (/429|too many requests/i.test(msg)) { delay = Math.min(delay * 2, maxDelay); console.warn(`price-stream: rate limited, backing off to ${delay}ms`); } else { // mild backoff for other errors delay = Math.min(Math.floor(delay * 1.5), maxDelay); } } if (!closed) setTimeout(poll, delay); } // start polling immediately setTimeout(poll, 0); }, cancel() { closed = true; }, }); return new Response(stream, { headers }); }