From 46140b0cc868c3d1b778b7dfd6b8dae3ce041db2 Mon Sep 17 00:00:00 2001 From: Adam <2363879+adamdotdevin@users.noreply.github.com> Date: Tue, 26 May 2026 06:35:49 -0500 Subject: [PATCH] chore: better sync/ingest logging --- packages/stats/core/src/stat-sync.ts | 12 ++++---- packages/stats/server/src/ingest.ts | 39 ++++++++++++++++++++++---- packages/stats/server/src/stat-sync.ts | 2 +- sst-env.d.ts | 1 - 4 files changed, 40 insertions(+), 14 deletions(-) diff --git a/packages/stats/core/src/stat-sync.ts b/packages/stats/core/src/stat-sync.ts index a64b334936..7f72cd2ca6 100644 --- a/packages/stats/core/src/stat-sync.ts +++ b/packages/stats/core/src/stat-sync.ts @@ -52,8 +52,8 @@ export const syncStats: () => Effect.Effect< discard: true, }) - yield* Effect.logInfo("stats sync complete").pipe( - Effect.annotateLogs({ + yield* Effect.logInfo( + `stats sync complete ${JSON.stringify({ startedAt: startedAt.toISOString(), periodStart: periodStart.toISOString(), periodEnd: periodEnd.toISOString(), @@ -61,7 +61,7 @@ export const syncStats: () => Effect.Effect< providerRows: providerRows.length, geoRows: geoRows.length, stage: Resource.App.stage, - }), + })}`, ) return { @@ -74,8 +74,8 @@ export const syncStats: () => Effect.Effect< }) function logRuntimeCheck() { - return Effect.logInfo("athena stats runtime check").pipe( - Effect.annotateLogs({ + return Effect.logInfo( + `athena stats runtime check ${JSON.stringify({ catalog: Resource.InferenceEvent.catalog, database: Resource.InferenceEvent.database, dataset: Resource.StatsSyncConfig.dataset, @@ -83,6 +83,6 @@ function logRuntimeCheck() { workgroup: Resource.InferenceEvent.workgroup, region: Resource.InferenceEvent.region, stage: Resource.App.stage, - }), + })}`, ) } diff --git a/packages/stats/server/src/ingest.ts b/packages/stats/server/src/ingest.ts index f2806204f7..ddb7666066 100644 --- a/packages/stats/server/src/ingest.ts +++ b/packages/stats/server/src/ingest.ts @@ -34,24 +34,36 @@ export class Ingest extends Context.Service()("@opencode if (events.length === 0) return { records: 0 } const records = events.map(routeEvent).filter((event): event is RoutedEvent => Boolean(event)) if (records.length !== events.length) { + yield* Effect.logWarning( + `lake ingest rejected ${JSON.stringify({ records: events.length, unsupported: events.length - records.length })}`, + ) return yield* new IngestError({ message: "Unsupported lake event type", failed: events.length - records.length, }) } + const batches = chunks( + records.map((event) => ({ Data: Buffer.from(JSON.stringify(event)) })), + MAX_FIREHOSE_BATCH_SIZE, + ) + yield* Effect.logInfo( + `lake ingest batch prepared ${JSON.stringify({ records: records.length, batches: batches.length })}`, + ) + const failed = (yield* Effect.all( - chunks( - records.map((event) => ({ Data: Buffer.from(JSON.stringify(event)) })), - MAX_FIREHOSE_BATCH_SIZE, - ).map((batch) => putRecords(client, Resource.LakeIngestConfig.streamName, batch)), + batches.map((batch) => putRecords(client, Resource.LakeIngestConfig.streamName, batch)), { concurrency: 8 }, )).reduce((sum, item) => sum + item, 0) if (failed > 0) { + yield* Effect.logWarning(`lake ingest incomplete ${JSON.stringify({ records: records.length, failed })}`) return yield* new IngestError({ message: "Failed to ingest all lake records", failed }) } + yield* Effect.logInfo( + `lake ingest complete ${JSON.stringify({ records: records.length, batches: batches.length })}`, + ) return { records: records.length } }) @@ -75,7 +87,11 @@ const putRecords: ( try: () => client.send(new PutRecordBatchCommand({ DeliveryStreamName: streamName, Records: records })), catch: (cause) => new IngestError({ message: "Failed to write lake records to Firehose", failed: records.length, cause }), - }) + }).pipe( + Effect.tapError(() => + Effect.logWarning(`firehose batch write failed ${JSON.stringify({ records: records.length, attempt })}`), + ), + ) const failed = result.RequestResponses?.flatMap((item, index) => { const record = records[index] @@ -83,9 +99,20 @@ const putRecords: ( return [record] }) ?? [] + yield* Effect.logInfo( + `firehose batch written ${JSON.stringify({ records: records.length, failed: failed.length, attempt })}`, + ) if (failed.length === 0) return 0 - if (attempt >= MAX_FIREHOSE_ATTEMPTS) return failed.length + if (attempt >= MAX_FIREHOSE_ATTEMPTS) { + yield* Effect.logWarning( + `firehose batch failed ${JSON.stringify({ records: failed.length, attempts: MAX_FIREHOSE_ATTEMPTS })}`, + ) + return failed.length + } + yield* Effect.logWarning( + `firehose batch retrying ${JSON.stringify({ records: failed.length, attempt: attempt + 1 })}`, + ) yield* Effect.sleep(`${250 * 2 ** (attempt - 1)} millis`) return yield* putRecords(client, streamName, failed, attempt + 1) }) diff --git a/packages/stats/server/src/stat-sync.ts b/packages/stats/server/src/stat-sync.ts index dcbe179483..a5e841f8c0 100644 --- a/packages/stats/server/src/stat-sync.ts +++ b/packages/stats/server/src/stat-sync.ts @@ -9,7 +9,7 @@ const SYNC_INTERVAL = "1 hour" const runtimeLayer = Layer.mergeAll(statsLayer, Athena.layer) const syncPass = syncStats().pipe( Effect.catchCause((cause) => - Effect.logWarning("stats sync failed").pipe(Effect.annotateLogs({ cause: Cause.pretty(cause) })), + Effect.logWarning(`stats sync failed ${JSON.stringify({ cause: Cause.pretty(cause) })}`), ), ) const daemon = Effect.logInfo("stats sync daemon started").pipe( diff --git a/sst-env.d.ts b/sst-env.d.ts index 9f6a5db313..c8acbde01f 100644 --- a/sst-env.d.ts +++ b/sst-env.d.ts @@ -153,7 +153,6 @@ declare module "sst" { } "STRIPE_WEBHOOK_SECRET": { "type": "sst.sst.Linkable" - "value": string } "Stat": import("@cloudflare/workers-types").Service "StatsDatabase": {