chore: better sync/ingest logging

This commit is contained in:
Adam 2026-05-26 06:35:49 -05:00
parent ca354f8e8a
commit 46140b0cc8
No known key found for this signature in database
GPG key ID: 9CB48779AF150E75
4 changed files with 40 additions and 14 deletions

View file

@ -52,8 +52,8 @@ export const syncStats: () => Effect.Effect<
discard: true,
})
yield* Effect.logInfo("stats sync complete").pipe(
Effect.annotateLogs({
yield* Effect.logInfo(
`stats sync complete ${JSON.stringify({
startedAt: startedAt.toISOString(),
periodStart: periodStart.toISOString(),
periodEnd: periodEnd.toISOString(),
@ -61,7 +61,7 @@ export const syncStats: () => Effect.Effect<
providerRows: providerRows.length,
geoRows: geoRows.length,
stage: Resource.App.stage,
}),
})}`,
)
return {
@ -74,8 +74,8 @@ export const syncStats: () => Effect.Effect<
})
function logRuntimeCheck() {
return Effect.logInfo("athena stats runtime check").pipe(
Effect.annotateLogs({
return Effect.logInfo(
`athena stats runtime check ${JSON.stringify({
catalog: Resource.InferenceEvent.catalog,
database: Resource.InferenceEvent.database,
dataset: Resource.StatsSyncConfig.dataset,
@ -83,6 +83,6 @@ function logRuntimeCheck() {
workgroup: Resource.InferenceEvent.workgroup,
region: Resource.InferenceEvent.region,
stage: Resource.App.stage,
}),
})}`,
)
}

View file

@ -34,24 +34,36 @@ export class Ingest extends Context.Service<Ingest, Ingest.Service>()("@opencode
if (events.length === 0) return { records: 0 }
const records = events.map(routeEvent).filter((event): event is RoutedEvent => Boolean(event))
if (records.length !== events.length) {
yield* Effect.logWarning(
`lake ingest rejected ${JSON.stringify({ records: events.length, unsupported: events.length - records.length })}`,
)
return yield* new IngestError({
message: "Unsupported lake event type",
failed: events.length - records.length,
})
}
const batches = chunks(
records.map((event) => ({ Data: Buffer.from(JSON.stringify(event)) })),
MAX_FIREHOSE_BATCH_SIZE,
)
yield* Effect.logInfo(
`lake ingest batch prepared ${JSON.stringify({ records: records.length, batches: batches.length })}`,
)
const failed = (yield* Effect.all(
chunks(
records.map((event) => ({ Data: Buffer.from(JSON.stringify(event)) })),
MAX_FIREHOSE_BATCH_SIZE,
).map((batch) => putRecords(client, Resource.LakeIngestConfig.streamName, batch)),
batches.map((batch) => putRecords(client, Resource.LakeIngestConfig.streamName, batch)),
{ concurrency: 8 },
)).reduce((sum, item) => sum + item, 0)
if (failed > 0) {
yield* Effect.logWarning(`lake ingest incomplete ${JSON.stringify({ records: records.length, failed })}`)
return yield* new IngestError({ message: "Failed to ingest all lake records", failed })
}
yield* Effect.logInfo(
`lake ingest complete ${JSON.stringify({ records: records.length, batches: batches.length })}`,
)
return { records: records.length }
})
@ -75,7 +87,11 @@ const putRecords: (
try: () => client.send(new PutRecordBatchCommand({ DeliveryStreamName: streamName, Records: records })),
catch: (cause) =>
new IngestError({ message: "Failed to write lake records to Firehose", failed: records.length, cause }),
})
}).pipe(
Effect.tapError(() =>
Effect.logWarning(`firehose batch write failed ${JSON.stringify({ records: records.length, attempt })}`),
),
)
const failed =
result.RequestResponses?.flatMap((item, index) => {
const record = records[index]
@ -83,9 +99,20 @@ const putRecords: (
return [record]
}) ?? []
yield* Effect.logInfo(
`firehose batch written ${JSON.stringify({ records: records.length, failed: failed.length, attempt })}`,
)
if (failed.length === 0) return 0
if (attempt >= MAX_FIREHOSE_ATTEMPTS) return failed.length
if (attempt >= MAX_FIREHOSE_ATTEMPTS) {
yield* Effect.logWarning(
`firehose batch failed ${JSON.stringify({ records: failed.length, attempts: MAX_FIREHOSE_ATTEMPTS })}`,
)
return failed.length
}
yield* Effect.logWarning(
`firehose batch retrying ${JSON.stringify({ records: failed.length, attempt: attempt + 1 })}`,
)
yield* Effect.sleep(`${250 * 2 ** (attempt - 1)} millis`)
return yield* putRecords(client, streamName, failed, attempt + 1)
})

View file

@ -9,7 +9,7 @@ const SYNC_INTERVAL = "1 hour"
const runtimeLayer = Layer.mergeAll(statsLayer, Athena.layer)
const syncPass = syncStats().pipe(
Effect.catchCause((cause) =>
Effect.logWarning("stats sync failed").pipe(Effect.annotateLogs({ cause: Cause.pretty(cause) })),
Effect.logWarning(`stats sync failed ${JSON.stringify({ cause: Cause.pretty(cause) })}`),
),
)
const daemon = Effect.logInfo("stats sync daemon started").pipe(

1
sst-env.d.ts vendored
View file

@ -153,7 +153,6 @@ declare module "sst" {
}
"STRIPE_WEBHOOK_SECRET": {
"type": "sst.sst.Linkable"
"value": string
}
"Stat": import("@cloudflare/workers-types").Service
"StatsDatabase": {