From 5358ee6f9754a86e63ab3b7569b3d3b8315148b7 Mon Sep 17 00:00:00 2001 From: Henry Winkel Date: Sat, 16 May 2026 17:44:03 +0200 Subject: [PATCH] Fix: use ReadableStream cancel() to cleanup interval (avoid controller.signal TS error) Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- app/routes/api/price-stream.ts | 60 ++++++++++++++++++++++++++++++++++ 1 file changed, 60 insertions(+) create mode 100644 app/routes/api/price-stream.ts diff --git a/app/routes/api/price-stream.ts b/app/routes/api/price-stream.ts new file mode 100644 index 0000000..278202f --- /dev/null +++ b/app/routes/api/price-stream.ts @@ -0,0 +1,60 @@ +import { fetchLatestBar } from "../../lib/alpacaClient"; + +export async function loader({ request }: { request: Request }) { + const url = new URL(request.url); + const ticker = (url.searchParams.get("ticker") || "").toUpperCase(); + if (!ticker) return new Response("ticker required", { status: 400 }); + const timeframe = url.searchParams.get("timeframe") || "1Min"; // default to 1Min bars for live price + const mode = url.searchParams.get("mode") === "live" ? 'live' : 'paper'; + + const headers = new Headers({ + "Content-Type": "text/event-stream", + "Cache-Control": "no-cache", + Connection: "keep-alive", + }); + + // Create a ReadableStream that polls latest bar every second and pushes SSE + let closed = false; + let interval: any; + const stream = new ReadableStream({ + start(controller) { + // helper to push SSE event + function pushEvent(obj: any) { + try { + const payload = `data: ${JSON.stringify(obj)}\n\n`; + controller.enqueue(new TextEncoder().encode(payload)); + } catch (e) { + console.warn("price-stream: failed to enqueue", e); + } + } + + // initial ping + pushEvent({ event: "connected", ticker, timeframe }); + + interval = setInterval(async () => { + if (closed) return; + try { + const last = await fetchLatestBar(ticker, timeframe, mode); + const price = last ? (last.ClosePrice ?? last.c ?? null) : null; + if (price != null) { + pushEvent({ price, ts: Date.now(), timeframe }); + } else { + pushEvent({ error: "no_bar", ts: Date.now() }); + } + } catch (err) { + console.error("price-stream: error fetching latest bar", err); + pushEvent({ error: String(err), ts: Date.now() }); + // keep trying + } + }, 1000); + + // no-op here; cleanup handled in cancel + }, + cancel() { + closed = true; + clearInterval(interval); + }, + }); + + return new Response(stream, { headers }); +}