From f09c859974848eed0f5fd2b2616b88ee85e683d2 Mon Sep 17 00:00:00 2001 From: Adam <2363879+adamdotdevin@users.noreply.github.com> Date: Wed, 27 May 2026 09:22:08 -0500 Subject: [PATCH] fix: memory pressure in ingest --- infra/lake.ts | 2 +- packages/stats/server/src/ingest.ts | 93 ++++++++++++++++++----------- packages/stats/server/src/router.ts | 20 +++++-- 3 files changed, 74 insertions(+), 41 deletions(-) diff --git a/infra/lake.ts b/infra/lake.ts index 04a3c46ba4..09534dac62 100644 --- a/infra/lake.ts +++ b/infra/lake.ts @@ -210,7 +210,7 @@ const ingestService = new sst.aws.Service("LakeIngestService", { cluster: lakeCluster, architecture: "arm64", cpu: "0.5 vCPU", - memory: "1 GB", + memory: "2 GB", image: { context: ".", dockerfile: "packages/stats/server/Dockerfile", diff --git a/packages/stats/server/src/ingest.ts b/packages/stats/server/src/ingest.ts index ddb7666066..62972e30fa 100644 --- a/packages/stats/server/src/ingest.ts +++ b/packages/stats/server/src/ingest.ts @@ -9,7 +9,7 @@ const MAX_FIREHOSE_ATTEMPTS = 3 const LAKE_TYPE = /^([A-Za-z0-9_]+)\.([A-Za-z0-9_]+)$/ type IngestEvent = Record -type RoutedEvent = IngestEvent & { _lake_database: string; _lake_table: string; _lake_operation: "insert" } +type LakeRoute = { database: string; table: string } type FirehoseRecord = { Data: Uint8Array } export class IngestError extends Schema.TaggedErrorClass()("IngestError", { @@ -20,7 +20,7 @@ export class IngestError extends Schema.TaggedErrorClass()("IngestE export declare namespace Ingest { export interface Service { - readonly write: (events: IngestEvent[]) => Effect.Effect<{ records: number }, IngestError> + readonly write: (events: unknown[]) => Effect.Effect<{ records: number }, IngestError> } } @@ -30,41 +30,47 @@ export class Ingest extends Context.Service()("@opencode Effect.sync(() => { const client = new FirehoseClient({}) - const write = Effect.fn("Ingest.write")(function* (events: IngestEvent[]) { + const write = Effect.fn("Ingest.write")(function* (events: unknown[]) { if (events.length === 0) return { records: 0 } - const records = events.map(routeEvent).filter((event): event is RoutedEvent => Boolean(event)) - if (records.length !== events.length) { + const counts = countRoutedEvents(events) + if (counts.unsupported > 0) { yield* Effect.logWarning( - `lake ingest rejected ${JSON.stringify({ records: events.length, unsupported: events.length - records.length })}`, + `lake ingest rejected ${JSON.stringify({ records: counts.records, unsupported: counts.unsupported })}`, ) return yield* new IngestError({ message: "Unsupported lake event type", - failed: events.length - records.length, + failed: counts.unsupported, }) } + if (counts.records === 0) return { records: 0 } - const batches = chunks( - records.map((event) => ({ Data: Buffer.from(JSON.stringify(event)) })), - MAX_FIREHOSE_BATCH_SIZE, - ) - yield* Effect.logInfo( - `lake ingest batch prepared ${JSON.stringify({ records: records.length, batches: batches.length })}`, - ) + let batch: FirehoseRecord[] = [] + let batches = 0 + let failed = 0 - const failed = (yield* Effect.all( - batches.map((batch) => putRecords(client, Resource.LakeIngestConfig.streamName, batch)), - { concurrency: 8 }, - )).reduce((sum, item) => sum + item, 0) + for (const event of events) { + if (!isRecord(event)) continue + const route = routeEvent(event) + if (!route) continue + batch.push(toFirehoseRecord(event, route)) + if (batch.length < MAX_FIREHOSE_BATCH_SIZE) continue + failed += yield* putRecords(client, Resource.LakeIngestConfig.streamName, batch) + batches++ + batch = [] + } + + if (batch.length > 0) { + failed += yield* putRecords(client, Resource.LakeIngestConfig.streamName, batch) + batches++ + } if (failed > 0) { - yield* Effect.logWarning(`lake ingest incomplete ${JSON.stringify({ records: records.length, failed })}`) + yield* Effect.logWarning(`lake ingest incomplete ${JSON.stringify({ records: counts.records, failed })}`) return yield* new IngestError({ message: "Failed to ingest all lake records", failed }) } - yield* Effect.logInfo( - `lake ingest complete ${JSON.stringify({ records: records.length, batches: batches.length })}`, - ) - return { records: records.length } + yield* Effect.logInfo(`lake ingest complete ${JSON.stringify({ records: counts.records, batches })}`) + return { records: counts.records } }) return Ingest.of({ write }) @@ -99,9 +105,6 @@ const putRecords: ( return [record] }) ?? [] - yield* Effect.logInfo( - `firehose batch written ${JSON.stringify({ records: records.length, failed: failed.length, attempt })}`, - ) if (failed.length === 0) return 0 if (attempt >= MAX_FIREHOSE_ATTEMPTS) { yield* Effect.logWarning( @@ -117,20 +120,40 @@ const putRecords: ( return yield* putRecords(client, streamName, failed, attempt + 1) }) -function routeEvent(event: IngestEvent): RoutedEvent | undefined { +function countRoutedEvents(events: unknown[]) { + let records = 0 + let unsupported = 0 + for (const event of events) { + if (!isRecord(event)) continue + if (routeEvent(event)) records++ + else unsupported++ + } + return { records, unsupported } +} + +function isRecord(item: unknown): item is IngestEvent { + return Boolean(item) && typeof item === "object" && !Array.isArray(item) +} + +function routeEvent(event: IngestEvent): LakeRoute | undefined { if (typeof event._datalake_key !== "string") return const match = event._datalake_key.match(LAKE_TYPE) if (!match?.[1] || !match[2]) return return { - ...Object.fromEntries(Object.entries(event).filter(([key]) => key !== "_datalake_key")), - _lake_database: match[1], - _lake_table: match[2], - _lake_operation: "insert" as const, + database: match[1], + table: match[2], } } -function chunks(items: T[], size: number) { - return Array.from({ length: Math.ceil(items.length / size) }, (_, index) => - items.slice(index * size, (index + 1) * size), - ) +function toFirehoseRecord(event: IngestEvent, route: LakeRoute): FirehoseRecord { + return { + Data: Buffer.from( + JSON.stringify({ + ...Object.fromEntries(Object.entries(event).filter(([key]) => key !== "_datalake_key")), + _lake_database: route.database, + _lake_table: route.table, + _lake_operation: "insert" as const, + }), + ), + } } diff --git a/packages/stats/server/src/router.ts b/packages/stats/server/src/router.ts index 16408645c5..e951118079 100644 --- a/packages/stats/server/src/router.ts +++ b/packages/stats/server/src/router.ts @@ -1,11 +1,14 @@ import { Buffer } from "node:buffer" import { timingSafeEqual } from "node:crypto" import { Effect, Schema } from "effect" +import * as Semaphore from "effect/Semaphore" import { HttpRouter, HttpServerRequest, HttpServerResponse } from "effect/unstable/http" import { Resource } from "sst/resource" import { Ingest } from "./ingest" import { isShuttingDown } from "./shutdown" +const MAX_CONCURRENT_INGEST_REQUESTS = 8 + const IngestPayload = Schema.Struct({ events: Schema.optional(Schema.Unknown), }) @@ -13,12 +16,13 @@ const IngestPayload = Schema.Struct({ export const Routes = HttpRouter.use((router) => Effect.gen(function* () { const ingestService = yield* Ingest + const ingestRequests = yield* Semaphore.make(MAX_CONCURRENT_INGEST_REQUESTS) yield* Effect.all( [ router.add("GET", "/health", () => json(200, { ok: true })), router.add("GET", "/ready", () => json(isShuttingDown() ? 503 : 200, { ok: !isShuttingDown() })), - router.add("POST", "/", ingest(ingestService)), + router.add("POST", "/", ingestRequests.withPermit(ingest(ingestService))), ], { discard: true }, ) @@ -38,12 +42,14 @@ const ingest = (ingestService: Ingest.Service) => ) if (!payload) return yield* json(400, { ok: false, error: "Invalid JSON body" }) - const events = Array.isArray(payload.events) ? payload.events.filter(isRecord) : [] + const events = Array.isArray(payload.events) ? payload.events : [] if (events.length === 0) return yield* json(202, { ok: true, records: 0 }) return yield* ingestService.write(events).pipe( Effect.flatMap((result) => json(202, { ok: true, records: result.records })), - Effect.catchTag("IngestError", (error) => json(502, { ok: false, records: events.length, failed: error.failed })), + Effect.catchTag("IngestError", (error) => + json(502, { ok: false, records: countRecords(events), failed: error.failed }), + ), ) }) @@ -54,8 +60,12 @@ function isAuthorized(headers: Record) { return timingSafeEqual(actual, expected) } -function isRecord(item: unknown): item is Record { - return Boolean(item) && typeof item === "object" && !Array.isArray(item) +function countRecords(items: unknown[]) { + let records = 0 + for (const item of items) { + if (Boolean(item) && typeof item === "object" && !Array.isArray(item)) records++ + } + return records } function json(status: number, body: Record) {