fix(stats): batch honeycomb backfill

This commit is contained in:
Adam 2026-05-28 08:37:43 -05:00
parent 9ac0f3e9ac
commit 7504daa602
No known key found for this signature in database
GPG key ID: 9CB48779AF150E75
2 changed files with 125 additions and 122 deletions

View file

@ -10,7 +10,7 @@
"dev:desktop": "bun --cwd packages/desktop dev",
"dev:web": "bun --cwd packages/app dev",
"dev:console": "ulimit -n 10240 2>/dev/null; bun run --cwd packages/console/app dev",
"dev:stats": "bun sst shell --stage=production -- bun run --cwd packages/stats/app dev",
"dev:stats": "bun sst shell --stage=dev -- bun run --cwd packages/stats/app dev",
"dev:storybook": "bun --cwd packages/storybook storybook",
"lint": "oxlint",
"typecheck": "bun turbo typecheck",

View file

@ -19,11 +19,11 @@ import {
statPeriodKey,
synthesizeAllTierRows,
toStatBaseRow,
UPSERT_CHUNK_SIZE,
type StatBaseAggregate,
} from "./domain/stat"
const DAY_MS = 86_400_000
const DEFAULT_UPSERT_CHUNK_SIZE = 100
const DEFAULT_TIERS = ["Go", "Free", "Paid"]
const FREE_MODELS = new Set(["gpt-5-nano", "grok-code", "big-pickle"])
@ -44,6 +44,7 @@ type ImportOptions = {
directories: string[]
dryRun: boolean
periodStart: Date | undefined
upsertChunkSize: number
files: Partial<Record<ImportKey, string[]>>
}
type ModelAggregate = StatBaseAggregate & { provider: string; model: string; provider_model: string }
@ -168,6 +169,7 @@ async function importFiles(args: string[]) {
providerRows: providerRows.length,
geoRows: geoRows.length,
dryRun: opts.dryRun,
upsertChunkSize: opts.upsertChunkSize,
},
null,
2,
@ -178,9 +180,9 @@ async function importFiles(args: string[]) {
if (!opts.databaseUrl) fail("DATABASE_URL is required unless --dry-run is set")
const db = drizzle({ client: new Client({ url: opts.databaseUrl }) })
await upsertModelRows(db, modelRows)
await upsertProviderRows(db, providerRows)
await upsertGeoRows(db, geoRows)
await upsertModelRows(db, modelRows, opts.upsertChunkSize)
await upsertProviderRows(db, providerRows, opts.upsertChunkSize)
await upsertGeoRows(db, geoRows, opts.upsertChunkSize)
}
function buildQueries(limit: number, tiers: string[]): QuerySpec[] {
@ -783,125 +785,125 @@ function isRecord(value: unknown): value is Record<string, unknown> {
return typeof value === "object" && value !== null && !Array.isArray(value)
}
async function upsertModelRows(db: ReturnType<typeof drizzle>, rows: ModelStatRow[]) {
await Promise.all(
chunks(rows, UPSERT_CHUNK_SIZE).map((chunk) =>
db
.insert(modelStat)
.values(chunk)
.onDuplicateKeyUpdate({
set: {
provider_model: inserted("provider_model"),
sessions: inserted("sessions"),
requests: inserted("requests"),
input_tokens: inserted("input_tokens"),
output_tokens: inserted("output_tokens"),
reasoning_tokens: inserted("reasoning_tokens"),
cache_read_tokens: inserted("cache_read_tokens"),
total_tokens: inserted("total_tokens"),
input_cost_microcents: inserted("input_cost_microcents"),
output_cost_microcents: inserted("output_cost_microcents"),
total_cost_microcents: inserted("total_cost_microcents"),
avg_duration_ms: inserted("avg_duration_ms"),
p50_duration_ms: inserted("p50_duration_ms"),
p95_duration_ms: inserted("p95_duration_ms"),
avg_ttfb_ms: inserted("avg_ttfb_ms"),
p50_ttfb_ms: inserted("p50_ttfb_ms"),
p95_ttfb_ms: inserted("p95_ttfb_ms"),
avg_output_tps: inserted("avg_output_tps"),
success_count: inserted("success_count"),
error_count: inserted("error_count"),
sample_count: inserted("sample_count"),
rank_by_tokens: inserted("rank_by_tokens"),
rank_by_requests: inserted("rank_by_requests"),
rank_by_cost: inserted("rank_by_cost"),
},
}),
),
)
async function upsertModelRows(db: ReturnType<typeof drizzle>, rows: ModelStatRow[], chunkSize: number) {
const batches = chunks(rows, chunkSize)
console.log(JSON.stringify({ table: "model_stat", batches: batches.length, chunkSize }))
for (const chunk of batches) {
await db
.insert(modelStat)
.values(chunk)
.onDuplicateKeyUpdate({
set: {
provider_model: inserted("provider_model"),
sessions: inserted("sessions"),
requests: inserted("requests"),
input_tokens: inserted("input_tokens"),
output_tokens: inserted("output_tokens"),
reasoning_tokens: inserted("reasoning_tokens"),
cache_read_tokens: inserted("cache_read_tokens"),
total_tokens: inserted("total_tokens"),
input_cost_microcents: inserted("input_cost_microcents"),
output_cost_microcents: inserted("output_cost_microcents"),
total_cost_microcents: inserted("total_cost_microcents"),
avg_duration_ms: inserted("avg_duration_ms"),
p50_duration_ms: inserted("p50_duration_ms"),
p95_duration_ms: inserted("p95_duration_ms"),
avg_ttfb_ms: inserted("avg_ttfb_ms"),
p50_ttfb_ms: inserted("p50_ttfb_ms"),
p95_ttfb_ms: inserted("p95_ttfb_ms"),
avg_output_tps: inserted("avg_output_tps"),
success_count: inserted("success_count"),
error_count: inserted("error_count"),
sample_count: inserted("sample_count"),
rank_by_tokens: inserted("rank_by_tokens"),
rank_by_requests: inserted("rank_by_requests"),
rank_by_cost: inserted("rank_by_cost"),
},
})
}
}
async function upsertProviderRows(db: ReturnType<typeof drizzle>, rows: ProviderStatRow[]) {
await Promise.all(
chunks(rows, UPSERT_CHUNK_SIZE).map((chunk) =>
db
.insert(providerStat)
.values(chunk)
.onDuplicateKeyUpdate({
set: {
sessions: inserted("sessions"),
requests: inserted("requests"),
input_tokens: inserted("input_tokens"),
output_tokens: inserted("output_tokens"),
reasoning_tokens: inserted("reasoning_tokens"),
cache_read_tokens: inserted("cache_read_tokens"),
total_tokens: inserted("total_tokens"),
input_cost_microcents: inserted("input_cost_microcents"),
output_cost_microcents: inserted("output_cost_microcents"),
total_cost_microcents: inserted("total_cost_microcents"),
avg_duration_ms: inserted("avg_duration_ms"),
p50_duration_ms: inserted("p50_duration_ms"),
p95_duration_ms: inserted("p95_duration_ms"),
avg_ttfb_ms: inserted("avg_ttfb_ms"),
p50_ttfb_ms: inserted("p50_ttfb_ms"),
p95_ttfb_ms: inserted("p95_ttfb_ms"),
avg_output_tps: inserted("avg_output_tps"),
success_count: inserted("success_count"),
error_count: inserted("error_count"),
sample_count: inserted("sample_count"),
market_share_tokens: inserted("market_share_tokens"),
market_share_requests: inserted("market_share_requests"),
market_share_sessions: inserted("market_share_sessions"),
rank_by_tokens: inserted("rank_by_tokens"),
rank_by_requests: inserted("rank_by_requests"),
rank_by_sessions: inserted("rank_by_sessions"),
rank_by_cost: inserted("rank_by_cost"),
},
}),
),
)
async function upsertProviderRows(db: ReturnType<typeof drizzle>, rows: ProviderStatRow[], chunkSize: number) {
const batches = chunks(rows, chunkSize)
console.log(JSON.stringify({ table: "provider_stat", batches: batches.length, chunkSize }))
for (const chunk of batches) {
await db
.insert(providerStat)
.values(chunk)
.onDuplicateKeyUpdate({
set: {
sessions: inserted("sessions"),
requests: inserted("requests"),
input_tokens: inserted("input_tokens"),
output_tokens: inserted("output_tokens"),
reasoning_tokens: inserted("reasoning_tokens"),
cache_read_tokens: inserted("cache_read_tokens"),
total_tokens: inserted("total_tokens"),
input_cost_microcents: inserted("input_cost_microcents"),
output_cost_microcents: inserted("output_cost_microcents"),
total_cost_microcents: inserted("total_cost_microcents"),
avg_duration_ms: inserted("avg_duration_ms"),
p50_duration_ms: inserted("p50_duration_ms"),
p95_duration_ms: inserted("p95_duration_ms"),
avg_ttfb_ms: inserted("avg_ttfb_ms"),
p50_ttfb_ms: inserted("p50_ttfb_ms"),
p95_ttfb_ms: inserted("p95_ttfb_ms"),
avg_output_tps: inserted("avg_output_tps"),
success_count: inserted("success_count"),
error_count: inserted("error_count"),
sample_count: inserted("sample_count"),
market_share_tokens: inserted("market_share_tokens"),
market_share_requests: inserted("market_share_requests"),
market_share_sessions: inserted("market_share_sessions"),
rank_by_tokens: inserted("rank_by_tokens"),
rank_by_requests: inserted("rank_by_requests"),
rank_by_sessions: inserted("rank_by_sessions"),
rank_by_cost: inserted("rank_by_cost"),
},
})
}
}
async function upsertGeoRows(db: ReturnType<typeof drizzle>, rows: GeoStatRow[]) {
await Promise.all(
chunks(rows, UPSERT_CHUNK_SIZE).map((chunk) =>
db
.insert(geoStat)
.values(chunk)
.onDuplicateKeyUpdate({
set: {
continent: inserted("continent"),
sessions: inserted("sessions"),
requests: inserted("requests"),
input_tokens: inserted("input_tokens"),
output_tokens: inserted("output_tokens"),
reasoning_tokens: inserted("reasoning_tokens"),
cache_read_tokens: inserted("cache_read_tokens"),
total_tokens: inserted("total_tokens"),
input_cost_microcents: inserted("input_cost_microcents"),
output_cost_microcents: inserted("output_cost_microcents"),
total_cost_microcents: inserted("total_cost_microcents"),
avg_duration_ms: inserted("avg_duration_ms"),
p50_duration_ms: inserted("p50_duration_ms"),
p95_duration_ms: inserted("p95_duration_ms"),
avg_ttfb_ms: inserted("avg_ttfb_ms"),
p50_ttfb_ms: inserted("p50_ttfb_ms"),
p95_ttfb_ms: inserted("p95_ttfb_ms"),
avg_output_tps: inserted("avg_output_tps"),
success_count: inserted("success_count"),
error_count: inserted("error_count"),
sample_count: inserted("sample_count"),
market_share_tokens: inserted("market_share_tokens"),
market_share_requests: inserted("market_share_requests"),
market_share_sessions: inserted("market_share_sessions"),
rank_by_tokens: inserted("rank_by_tokens"),
rank_by_requests: inserted("rank_by_requests"),
rank_by_sessions: inserted("rank_by_sessions"),
rank_by_cost: inserted("rank_by_cost"),
},
}),
),
)
async function upsertGeoRows(db: ReturnType<typeof drizzle>, rows: GeoStatRow[], chunkSize: number) {
const batches = chunks(rows, chunkSize)
console.log(JSON.stringify({ table: "geo_stat", batches: batches.length, chunkSize }))
for (const chunk of batches) {
await db
.insert(geoStat)
.values(chunk)
.onDuplicateKeyUpdate({
set: {
continent: inserted("continent"),
sessions: inserted("sessions"),
requests: inserted("requests"),
input_tokens: inserted("input_tokens"),
output_tokens: inserted("output_tokens"),
reasoning_tokens: inserted("reasoning_tokens"),
cache_read_tokens: inserted("cache_read_tokens"),
total_tokens: inserted("total_tokens"),
input_cost_microcents: inserted("input_cost_microcents"),
output_cost_microcents: inserted("output_cost_microcents"),
total_cost_microcents: inserted("total_cost_microcents"),
avg_duration_ms: inserted("avg_duration_ms"),
p50_duration_ms: inserted("p50_duration_ms"),
p95_duration_ms: inserted("p95_duration_ms"),
avg_ttfb_ms: inserted("avg_ttfb_ms"),
p50_ttfb_ms: inserted("p50_ttfb_ms"),
p95_ttfb_ms: inserted("p95_ttfb_ms"),
avg_output_tps: inserted("avg_output_tps"),
success_count: inserted("success_count"),
error_count: inserted("error_count"),
sample_count: inserted("sample_count"),
market_share_tokens: inserted("market_share_tokens"),
market_share_requests: inserted("market_share_requests"),
market_share_sessions: inserted("market_share_sessions"),
rank_by_tokens: inserted("rank_by_tokens"),
rank_by_requests: inserted("rank_by_requests"),
rank_by_sessions: inserted("rank_by_sessions"),
rank_by_cost: inserted("rank_by_cost"),
},
})
}
}
function parseImportOptions(args: string[]): ImportOptions {
@ -917,6 +919,7 @@ function parseImportOptions(args: string[]): ImportOptions {
directories: flags.get("dir") ?? flags.get("directory") ?? [],
dryRun: flags.has("dry-run"),
periodStart: parseDateFlag(flags, "period-start"),
upsertChunkSize: parseIntegerFlag(flags, "upsert-chunk-size") ?? DEFAULT_UPSERT_CHUNK_SIZE,
files,
}
}
@ -969,8 +972,8 @@ function parseListFlag(flags: Map<string, string[]>, name: string) {
function usage(): never {
fail(`Usage:
bun src/honeycomb-backfill.ts queries [--tiers Go,Free,Paid] [--limit 1000]
bun src/honeycomb-backfill.ts import [--dry-run] [--database-url URL] --dir downloads
bun src/honeycomb-backfill.ts import [--dry-run] [--database-url URL] --model-day file.csv [--model-day more.csv] ...`)
bun src/honeycomb-backfill.ts import [--dry-run] [--upsert-chunk-size 100] [--database-url URL] --dir downloads
bun src/honeycomb-backfill.ts import [--dry-run] [--upsert-chunk-size 100] [--database-url URL] --model-day file.csv [--model-day more.csv] ...`)
}
function fail(message: string): never {