fix: memory pressure in ingest

This commit is contained in:
Adam 2026-05-27 09:22:08 -05:00
parent 1fcdb0246a
commit f09c859974
No known key found for this signature in database
GPG key ID: 9CB48779AF150E75
3 changed files with 74 additions and 41 deletions

View file

@ -210,7 +210,7 @@ const ingestService = new sst.aws.Service("LakeIngestService", {
cluster: lakeCluster,
architecture: "arm64",
cpu: "0.5 vCPU",
memory: "1 GB",
memory: "2 GB",
image: {
context: ".",
dockerfile: "packages/stats/server/Dockerfile",

View file

@ -9,7 +9,7 @@ const MAX_FIREHOSE_ATTEMPTS = 3
const LAKE_TYPE = /^([A-Za-z0-9_]+)\.([A-Za-z0-9_]+)$/
type IngestEvent = Record<string, unknown>
type RoutedEvent = IngestEvent & { _lake_database: string; _lake_table: string; _lake_operation: "insert" }
type LakeRoute = { database: string; table: string }
type FirehoseRecord = { Data: Uint8Array }
export class IngestError extends Schema.TaggedErrorClass<IngestError>()("IngestError", {
@ -20,7 +20,7 @@ export class IngestError extends Schema.TaggedErrorClass<IngestError>()("IngestE
export declare namespace Ingest {
export interface Service {
readonly write: (events: IngestEvent[]) => Effect.Effect<{ records: number }, IngestError>
readonly write: (events: unknown[]) => Effect.Effect<{ records: number }, IngestError>
}
}
@ -30,41 +30,47 @@ export class Ingest extends Context.Service<Ingest, Ingest.Service>()("@opencode
Effect.sync(() => {
const client = new FirehoseClient({})
const write = Effect.fn("Ingest.write")(function* (events: IngestEvent[]) {
const write = Effect.fn("Ingest.write")(function* (events: unknown[]) {
if (events.length === 0) return { records: 0 }
const records = events.map(routeEvent).filter((event): event is RoutedEvent => Boolean(event))
if (records.length !== events.length) {
const counts = countRoutedEvents(events)
if (counts.unsupported > 0) {
yield* Effect.logWarning(
`lake ingest rejected ${JSON.stringify({ records: events.length, unsupported: events.length - records.length })}`,
`lake ingest rejected ${JSON.stringify({ records: counts.records, unsupported: counts.unsupported })}`,
)
return yield* new IngestError({
message: "Unsupported lake event type",
failed: events.length - records.length,
failed: counts.unsupported,
})
}
if (counts.records === 0) return { records: 0 }
const batches = chunks(
records.map((event) => ({ Data: Buffer.from(JSON.stringify(event)) })),
MAX_FIREHOSE_BATCH_SIZE,
)
yield* Effect.logInfo(
`lake ingest batch prepared ${JSON.stringify({ records: records.length, batches: batches.length })}`,
)
let batch: FirehoseRecord[] = []
let batches = 0
let failed = 0
const failed = (yield* Effect.all(
batches.map((batch) => putRecords(client, Resource.LakeIngestConfig.streamName, batch)),
{ concurrency: 8 },
)).reduce((sum, item) => sum + item, 0)
for (const event of events) {
if (!isRecord(event)) continue
const route = routeEvent(event)
if (!route) continue
batch.push(toFirehoseRecord(event, route))
if (batch.length < MAX_FIREHOSE_BATCH_SIZE) continue
failed += yield* putRecords(client, Resource.LakeIngestConfig.streamName, batch)
batches++
batch = []
}
if (batch.length > 0) {
failed += yield* putRecords(client, Resource.LakeIngestConfig.streamName, batch)
batches++
}
if (failed > 0) {
yield* Effect.logWarning(`lake ingest incomplete ${JSON.stringify({ records: records.length, failed })}`)
yield* Effect.logWarning(`lake ingest incomplete ${JSON.stringify({ records: counts.records, failed })}`)
return yield* new IngestError({ message: "Failed to ingest all lake records", failed })
}
yield* Effect.logInfo(
`lake ingest complete ${JSON.stringify({ records: records.length, batches: batches.length })}`,
)
return { records: records.length }
yield* Effect.logInfo(`lake ingest complete ${JSON.stringify({ records: counts.records, batches })}`)
return { records: counts.records }
})
return Ingest.of({ write })
@ -99,9 +105,6 @@ const putRecords: (
return [record]
}) ?? []
yield* Effect.logInfo(
`firehose batch written ${JSON.stringify({ records: records.length, failed: failed.length, attempt })}`,
)
if (failed.length === 0) return 0
if (attempt >= MAX_FIREHOSE_ATTEMPTS) {
yield* Effect.logWarning(
@ -117,20 +120,40 @@ const putRecords: (
return yield* putRecords(client, streamName, failed, attempt + 1)
})
function routeEvent(event: IngestEvent): RoutedEvent | undefined {
function countRoutedEvents(events: unknown[]) {
let records = 0
let unsupported = 0
for (const event of events) {
if (!isRecord(event)) continue
if (routeEvent(event)) records++
else unsupported++
}
return { records, unsupported }
}
function isRecord(item: unknown): item is IngestEvent {
return Boolean(item) && typeof item === "object" && !Array.isArray(item)
}
function routeEvent(event: IngestEvent): LakeRoute | undefined {
if (typeof event._datalake_key !== "string") return
const match = event._datalake_key.match(LAKE_TYPE)
if (!match?.[1] || !match[2]) return
return {
...Object.fromEntries(Object.entries(event).filter(([key]) => key !== "_datalake_key")),
_lake_database: match[1],
_lake_table: match[2],
_lake_operation: "insert" as const,
database: match[1],
table: match[2],
}
}
function chunks<T>(items: T[], size: number) {
return Array.from({ length: Math.ceil(items.length / size) }, (_, index) =>
items.slice(index * size, (index + 1) * size),
)
function toFirehoseRecord(event: IngestEvent, route: LakeRoute): FirehoseRecord {
return {
Data: Buffer.from(
JSON.stringify({
...Object.fromEntries(Object.entries(event).filter(([key]) => key !== "_datalake_key")),
_lake_database: route.database,
_lake_table: route.table,
_lake_operation: "insert" as const,
}),
),
}
}

View file

@ -1,11 +1,14 @@
import { Buffer } from "node:buffer"
import { timingSafeEqual } from "node:crypto"
import { Effect, Schema } from "effect"
import * as Semaphore from "effect/Semaphore"
import { HttpRouter, HttpServerRequest, HttpServerResponse } from "effect/unstable/http"
import { Resource } from "sst/resource"
import { Ingest } from "./ingest"
import { isShuttingDown } from "./shutdown"
const MAX_CONCURRENT_INGEST_REQUESTS = 8
const IngestPayload = Schema.Struct({
events: Schema.optional(Schema.Unknown),
})
@ -13,12 +16,13 @@ const IngestPayload = Schema.Struct({
export const Routes = HttpRouter.use((router) =>
Effect.gen(function* () {
const ingestService = yield* Ingest
const ingestRequests = yield* Semaphore.make(MAX_CONCURRENT_INGEST_REQUESTS)
yield* Effect.all(
[
router.add("GET", "/health", () => json(200, { ok: true })),
router.add("GET", "/ready", () => json(isShuttingDown() ? 503 : 200, { ok: !isShuttingDown() })),
router.add("POST", "/", ingest(ingestService)),
router.add("POST", "/", ingestRequests.withPermit(ingest(ingestService))),
],
{ discard: true },
)
@ -38,12 +42,14 @@ const ingest = (ingestService: Ingest.Service) =>
)
if (!payload) return yield* json(400, { ok: false, error: "Invalid JSON body" })
const events = Array.isArray(payload.events) ? payload.events.filter(isRecord) : []
const events = Array.isArray(payload.events) ? payload.events : []
if (events.length === 0) return yield* json(202, { ok: true, records: 0 })
return yield* ingestService.write(events).pipe(
Effect.flatMap((result) => json(202, { ok: true, records: result.records })),
Effect.catchTag("IngestError", (error) => json(502, { ok: false, records: events.length, failed: error.failed })),
Effect.catchTag("IngestError", (error) =>
json(502, { ok: false, records: countRecords(events), failed: error.failed }),
),
)
})
@ -54,8 +60,12 @@ function isAuthorized(headers: Record<string, string | undefined>) {
return timingSafeEqual(actual, expected)
}
function isRecord(item: unknown): item is Record<string, unknown> {
return Boolean(item) && typeof item === "object" && !Array.isArray(item)
function countRecords(items: unknown[]) {
let records = 0
for (const item of items) {
if (Boolean(item) && typeof item === "object" && !Array.isArray(item)) records++
}
return records
}
function json(status: number, body: Record<string, unknown>) {