diff --git a/package.json b/package.json index 5fecf69bab..f2e79709dc 100644 --- a/package.json +++ b/package.json @@ -10,7 +10,7 @@ "dev:desktop": "bun --cwd packages/desktop dev", "dev:web": "bun --cwd packages/app dev", "dev:console": "ulimit -n 10240 2>/dev/null; bun run --cwd packages/console/app dev", - "dev:stats": "bun sst shell --stage=production -- bun run --cwd packages/stats/app dev", + "dev:stats": "bun sst shell --stage=dev -- bun run --cwd packages/stats/app dev", "dev:storybook": "bun --cwd packages/storybook storybook", "lint": "oxlint", "typecheck": "bun turbo typecheck", diff --git a/packages/stats/core/src/honeycomb-backfill.ts b/packages/stats/core/src/honeycomb-backfill.ts index 2b497bdf9e..93fc237e97 100644 --- a/packages/stats/core/src/honeycomb-backfill.ts +++ b/packages/stats/core/src/honeycomb-backfill.ts @@ -19,11 +19,11 @@ import { statPeriodKey, synthesizeAllTierRows, toStatBaseRow, - UPSERT_CHUNK_SIZE, type StatBaseAggregate, } from "./domain/stat" const DAY_MS = 86_400_000 +const DEFAULT_UPSERT_CHUNK_SIZE = 100 const DEFAULT_TIERS = ["Go", "Free", "Paid"] const FREE_MODELS = new Set(["gpt-5-nano", "grok-code", "big-pickle"]) @@ -44,6 +44,7 @@ type ImportOptions = { directories: string[] dryRun: boolean periodStart: Date | undefined + upsertChunkSize: number files: Partial> } type ModelAggregate = StatBaseAggregate & { provider: string; model: string; provider_model: string } @@ -168,6 +169,7 @@ async function importFiles(args: string[]) { providerRows: providerRows.length, geoRows: geoRows.length, dryRun: opts.dryRun, + upsertChunkSize: opts.upsertChunkSize, }, null, 2, @@ -178,9 +180,9 @@ async function importFiles(args: string[]) { if (!opts.databaseUrl) fail("DATABASE_URL is required unless --dry-run is set") const db = drizzle({ client: new Client({ url: opts.databaseUrl }) }) - await upsertModelRows(db, modelRows) - await upsertProviderRows(db, providerRows) - await upsertGeoRows(db, geoRows) + await upsertModelRows(db, modelRows, opts.upsertChunkSize) + await upsertProviderRows(db, providerRows, opts.upsertChunkSize) + await upsertGeoRows(db, geoRows, opts.upsertChunkSize) } function buildQueries(limit: number, tiers: string[]): QuerySpec[] { @@ -783,125 +785,125 @@ function isRecord(value: unknown): value is Record { return typeof value === "object" && value !== null && !Array.isArray(value) } -async function upsertModelRows(db: ReturnType, rows: ModelStatRow[]) { - await Promise.all( - chunks(rows, UPSERT_CHUNK_SIZE).map((chunk) => - db - .insert(modelStat) - .values(chunk) - .onDuplicateKeyUpdate({ - set: { - provider_model: inserted("provider_model"), - sessions: inserted("sessions"), - requests: inserted("requests"), - input_tokens: inserted("input_tokens"), - output_tokens: inserted("output_tokens"), - reasoning_tokens: inserted("reasoning_tokens"), - cache_read_tokens: inserted("cache_read_tokens"), - total_tokens: inserted("total_tokens"), - input_cost_microcents: inserted("input_cost_microcents"), - output_cost_microcents: inserted("output_cost_microcents"), - total_cost_microcents: inserted("total_cost_microcents"), - avg_duration_ms: inserted("avg_duration_ms"), - p50_duration_ms: inserted("p50_duration_ms"), - p95_duration_ms: inserted("p95_duration_ms"), - avg_ttfb_ms: inserted("avg_ttfb_ms"), - p50_ttfb_ms: inserted("p50_ttfb_ms"), - p95_ttfb_ms: inserted("p95_ttfb_ms"), - avg_output_tps: inserted("avg_output_tps"), - success_count: inserted("success_count"), - error_count: inserted("error_count"), - sample_count: inserted("sample_count"), - rank_by_tokens: inserted("rank_by_tokens"), - rank_by_requests: inserted("rank_by_requests"), - rank_by_cost: inserted("rank_by_cost"), - }, - }), - ), - ) +async function upsertModelRows(db: ReturnType, rows: ModelStatRow[], chunkSize: number) { + const batches = chunks(rows, chunkSize) + console.log(JSON.stringify({ table: "model_stat", batches: batches.length, chunkSize })) + for (const chunk of batches) { + await db + .insert(modelStat) + .values(chunk) + .onDuplicateKeyUpdate({ + set: { + provider_model: inserted("provider_model"), + sessions: inserted("sessions"), + requests: inserted("requests"), + input_tokens: inserted("input_tokens"), + output_tokens: inserted("output_tokens"), + reasoning_tokens: inserted("reasoning_tokens"), + cache_read_tokens: inserted("cache_read_tokens"), + total_tokens: inserted("total_tokens"), + input_cost_microcents: inserted("input_cost_microcents"), + output_cost_microcents: inserted("output_cost_microcents"), + total_cost_microcents: inserted("total_cost_microcents"), + avg_duration_ms: inserted("avg_duration_ms"), + p50_duration_ms: inserted("p50_duration_ms"), + p95_duration_ms: inserted("p95_duration_ms"), + avg_ttfb_ms: inserted("avg_ttfb_ms"), + p50_ttfb_ms: inserted("p50_ttfb_ms"), + p95_ttfb_ms: inserted("p95_ttfb_ms"), + avg_output_tps: inserted("avg_output_tps"), + success_count: inserted("success_count"), + error_count: inserted("error_count"), + sample_count: inserted("sample_count"), + rank_by_tokens: inserted("rank_by_tokens"), + rank_by_requests: inserted("rank_by_requests"), + rank_by_cost: inserted("rank_by_cost"), + }, + }) + } } -async function upsertProviderRows(db: ReturnType, rows: ProviderStatRow[]) { - await Promise.all( - chunks(rows, UPSERT_CHUNK_SIZE).map((chunk) => - db - .insert(providerStat) - .values(chunk) - .onDuplicateKeyUpdate({ - set: { - sessions: inserted("sessions"), - requests: inserted("requests"), - input_tokens: inserted("input_tokens"), - output_tokens: inserted("output_tokens"), - reasoning_tokens: inserted("reasoning_tokens"), - cache_read_tokens: inserted("cache_read_tokens"), - total_tokens: inserted("total_tokens"), - input_cost_microcents: inserted("input_cost_microcents"), - output_cost_microcents: inserted("output_cost_microcents"), - total_cost_microcents: inserted("total_cost_microcents"), - avg_duration_ms: inserted("avg_duration_ms"), - p50_duration_ms: inserted("p50_duration_ms"), - p95_duration_ms: inserted("p95_duration_ms"), - avg_ttfb_ms: inserted("avg_ttfb_ms"), - p50_ttfb_ms: inserted("p50_ttfb_ms"), - p95_ttfb_ms: inserted("p95_ttfb_ms"), - avg_output_tps: inserted("avg_output_tps"), - success_count: inserted("success_count"), - error_count: inserted("error_count"), - sample_count: inserted("sample_count"), - market_share_tokens: inserted("market_share_tokens"), - market_share_requests: inserted("market_share_requests"), - market_share_sessions: inserted("market_share_sessions"), - rank_by_tokens: inserted("rank_by_tokens"), - rank_by_requests: inserted("rank_by_requests"), - rank_by_sessions: inserted("rank_by_sessions"), - rank_by_cost: inserted("rank_by_cost"), - }, - }), - ), - ) +async function upsertProviderRows(db: ReturnType, rows: ProviderStatRow[], chunkSize: number) { + const batches = chunks(rows, chunkSize) + console.log(JSON.stringify({ table: "provider_stat", batches: batches.length, chunkSize })) + for (const chunk of batches) { + await db + .insert(providerStat) + .values(chunk) + .onDuplicateKeyUpdate({ + set: { + sessions: inserted("sessions"), + requests: inserted("requests"), + input_tokens: inserted("input_tokens"), + output_tokens: inserted("output_tokens"), + reasoning_tokens: inserted("reasoning_tokens"), + cache_read_tokens: inserted("cache_read_tokens"), + total_tokens: inserted("total_tokens"), + input_cost_microcents: inserted("input_cost_microcents"), + output_cost_microcents: inserted("output_cost_microcents"), + total_cost_microcents: inserted("total_cost_microcents"), + avg_duration_ms: inserted("avg_duration_ms"), + p50_duration_ms: inserted("p50_duration_ms"), + p95_duration_ms: inserted("p95_duration_ms"), + avg_ttfb_ms: inserted("avg_ttfb_ms"), + p50_ttfb_ms: inserted("p50_ttfb_ms"), + p95_ttfb_ms: inserted("p95_ttfb_ms"), + avg_output_tps: inserted("avg_output_tps"), + success_count: inserted("success_count"), + error_count: inserted("error_count"), + sample_count: inserted("sample_count"), + market_share_tokens: inserted("market_share_tokens"), + market_share_requests: inserted("market_share_requests"), + market_share_sessions: inserted("market_share_sessions"), + rank_by_tokens: inserted("rank_by_tokens"), + rank_by_requests: inserted("rank_by_requests"), + rank_by_sessions: inserted("rank_by_sessions"), + rank_by_cost: inserted("rank_by_cost"), + }, + }) + } } -async function upsertGeoRows(db: ReturnType, rows: GeoStatRow[]) { - await Promise.all( - chunks(rows, UPSERT_CHUNK_SIZE).map((chunk) => - db - .insert(geoStat) - .values(chunk) - .onDuplicateKeyUpdate({ - set: { - continent: inserted("continent"), - sessions: inserted("sessions"), - requests: inserted("requests"), - input_tokens: inserted("input_tokens"), - output_tokens: inserted("output_tokens"), - reasoning_tokens: inserted("reasoning_tokens"), - cache_read_tokens: inserted("cache_read_tokens"), - total_tokens: inserted("total_tokens"), - input_cost_microcents: inserted("input_cost_microcents"), - output_cost_microcents: inserted("output_cost_microcents"), - total_cost_microcents: inserted("total_cost_microcents"), - avg_duration_ms: inserted("avg_duration_ms"), - p50_duration_ms: inserted("p50_duration_ms"), - p95_duration_ms: inserted("p95_duration_ms"), - avg_ttfb_ms: inserted("avg_ttfb_ms"), - p50_ttfb_ms: inserted("p50_ttfb_ms"), - p95_ttfb_ms: inserted("p95_ttfb_ms"), - avg_output_tps: inserted("avg_output_tps"), - success_count: inserted("success_count"), - error_count: inserted("error_count"), - sample_count: inserted("sample_count"), - market_share_tokens: inserted("market_share_tokens"), - market_share_requests: inserted("market_share_requests"), - market_share_sessions: inserted("market_share_sessions"), - rank_by_tokens: inserted("rank_by_tokens"), - rank_by_requests: inserted("rank_by_requests"), - rank_by_sessions: inserted("rank_by_sessions"), - rank_by_cost: inserted("rank_by_cost"), - }, - }), - ), - ) +async function upsertGeoRows(db: ReturnType, rows: GeoStatRow[], chunkSize: number) { + const batches = chunks(rows, chunkSize) + console.log(JSON.stringify({ table: "geo_stat", batches: batches.length, chunkSize })) + for (const chunk of batches) { + await db + .insert(geoStat) + .values(chunk) + .onDuplicateKeyUpdate({ + set: { + continent: inserted("continent"), + sessions: inserted("sessions"), + requests: inserted("requests"), + input_tokens: inserted("input_tokens"), + output_tokens: inserted("output_tokens"), + reasoning_tokens: inserted("reasoning_tokens"), + cache_read_tokens: inserted("cache_read_tokens"), + total_tokens: inserted("total_tokens"), + input_cost_microcents: inserted("input_cost_microcents"), + output_cost_microcents: inserted("output_cost_microcents"), + total_cost_microcents: inserted("total_cost_microcents"), + avg_duration_ms: inserted("avg_duration_ms"), + p50_duration_ms: inserted("p50_duration_ms"), + p95_duration_ms: inserted("p95_duration_ms"), + avg_ttfb_ms: inserted("avg_ttfb_ms"), + p50_ttfb_ms: inserted("p50_ttfb_ms"), + p95_ttfb_ms: inserted("p95_ttfb_ms"), + avg_output_tps: inserted("avg_output_tps"), + success_count: inserted("success_count"), + error_count: inserted("error_count"), + sample_count: inserted("sample_count"), + market_share_tokens: inserted("market_share_tokens"), + market_share_requests: inserted("market_share_requests"), + market_share_sessions: inserted("market_share_sessions"), + rank_by_tokens: inserted("rank_by_tokens"), + rank_by_requests: inserted("rank_by_requests"), + rank_by_sessions: inserted("rank_by_sessions"), + rank_by_cost: inserted("rank_by_cost"), + }, + }) + } } function parseImportOptions(args: string[]): ImportOptions { @@ -917,6 +919,7 @@ function parseImportOptions(args: string[]): ImportOptions { directories: flags.get("dir") ?? flags.get("directory") ?? [], dryRun: flags.has("dry-run"), periodStart: parseDateFlag(flags, "period-start"), + upsertChunkSize: parseIntegerFlag(flags, "upsert-chunk-size") ?? DEFAULT_UPSERT_CHUNK_SIZE, files, } } @@ -969,8 +972,8 @@ function parseListFlag(flags: Map, name: string) { function usage(): never { fail(`Usage: bun src/honeycomb-backfill.ts queries [--tiers Go,Free,Paid] [--limit 1000] - bun src/honeycomb-backfill.ts import [--dry-run] [--database-url URL] --dir downloads - bun src/honeycomb-backfill.ts import [--dry-run] [--database-url URL] --model-day file.csv [--model-day more.csv] ...`) + bun src/honeycomb-backfill.ts import [--dry-run] [--upsert-chunk-size 100] [--database-url URL] --dir downloads + bun src/honeycomb-backfill.ts import [--dry-run] [--upsert-chunk-size 100] [--database-url URL] --model-day file.csv [--model-day more.csv] ...`) } function fail(message: string): never {