diff --git a/packages/stats/core/src/honeycomb-backfill.ts b/packages/stats/core/src/honeycomb-backfill.ts index dd193c8b9a..507b1dca0b 100644 --- a/packages/stats/core/src/honeycomb-backfill.ts +++ b/packages/stats/core/src/honeycomb-backfill.ts @@ -1,4 +1,6 @@ import { Client } from "@planetscale/database" +import { readdir } from "node:fs/promises" +import path from "node:path" import { drizzle } from "drizzle-orm/planetscale-serverless" import { geoStat, modelStat, providerStat } from "./database/schema" import { @@ -17,23 +19,30 @@ import { } from "./domain/stat" const DAY_MS = 86_400_000 -const DEFAULT_DAYS = 60 +const DEFAULT_TIERS = ["Go", "Free", "Paid"] const FREE_MODELS = new Set(["gpt-5-nano", "grok-code", "big-pickle"]) type Grain = "day" | "week" type MetricDimension = "model" | "provider" | "geo" type LookupDimension = "model-provider-model" | "geo-continent" type ImportKey = `${MetricDimension | LookupDimension}-${Grain}` +type GeneratedImportKey = `${MetricDimension}-${Grain}` +type QuerySpec = { + name: string + importKey: ImportKey + importFlag: `--${ImportKey}` + query: ReturnType +} type RawRow = Record type Period = { start: Date; end: Date } -type Timing = { start_time: number; end_time: number; granularity?: number } type ImportOptions = { dataset: string databaseUrl: string | undefined + directories: string[] dryRun: boolean periodEnd: Date | undefined periodStart: Date | undefined - files: Partial> + files: Partial> } type ModelAggregate = StatBaseAggregate & { provider: string; model: string; provider_model: string } type ProviderAggregate = StatBaseAggregate & { provider: string } @@ -66,34 +75,24 @@ async function main() { function printQueries(args: string[]) { const flags = parseFlags(args) - const periodEnd = parseDateFlag(flags, "period-end") ?? defaultPeriodEnd() - const days = parseIntegerFlag(flags, "days") ?? DEFAULT_DAYS const limit = parseIntegerFlag(flags, "limit") ?? 1000 - const dailyStart = new Date( - Date.UTC(periodEnd.getUTCFullYear(), periodEnd.getUTCMonth(), periodEnd.getUTCDate() - days + 1), - ) - const weekStart = syncWeekStart(periodEnd) + const tiers = parseListFlag(flags, "tiers") ?? DEFAULT_TIERS + const queries = buildQueries(limit, tiers, flags.has("include-weekly")) + const only = flags.get("only")?.[0] + + if (only) { + const item = queries.find((query) => query.name === only) + if (!item) fail(`Unknown --only ${only}. Expected one of: ${queries.map((query) => query.name).join(", ")}`) + console.log(JSON.stringify(item.query, null, 2)) + return + } console.log( JSON.stringify( { - period_end: periodEnd.toISOString(), - import_hint: `bun src/honeycomb-backfill.ts import --period-end ${periodEnd.toISOString()} ...`, - daily: buildQuerySet( - { - start_time: Math.floor(dailyStart.getTime() / 1000), - end_time: Math.floor(periodEnd.getTime() / 1000), - granularity: DAY_MS / 1000, - }, - limit, - ), - week: buildQuerySet( - { - start_time: Math.floor(weekStart.getTime() / 1000), - end_time: Math.floor(periodEnd.getTime() / 1000), - }, - limit, - ), + tiers, + import_hint: "bun src/honeycomb-backfill.ts import --dir downloads", + queries, }, null, 2, @@ -102,7 +101,9 @@ function printQueries(args: string[]) { } async function importFiles(args: string[]) { - const opts = parseImportOptions(args) + const parsed = parseImportOptions(args) + const opts = { ...parsed, files: mergeFiles(parsed.files, await discoverFiles(parsed.directories)) } + if (!inputKeys.some((key) => opts.files[key]?.length)) fail("No CSV or JSON import files were provided or discovered") const providerModelLookup = new Map([ ...(await lookupRows(opts.files["model-provider-model-day"], "day", opts, modelProviderModelLookup)), ...(await lookupRows(opts.files["model-provider-model-week"], "week", opts, modelProviderModelLookup)), @@ -151,6 +152,9 @@ async function importFiles(args: string[]) { console.log( JSON.stringify( { + inputs: Object.fromEntries( + inputKeys.flatMap((key) => (opts.files[key]?.length ? [[key, opts.files[key].length]] : [])), + ), modelRows: modelRows.length, providerRows: providerRows.length, geoRows: geoRows.length, @@ -170,109 +174,94 @@ async function importFiles(args: string[]) { await upsertGeoRows(db, geoRows) } -function buildQuerySet(timing: Timing, limit: number) { +function buildQueries(limit: number, tiers: string[], includeWeekly: boolean): QuerySpec[] { + const daily = tiers.flatMap((tier) => [ + querySpec( + "model-day", + tier, + metricQuery(["date", "tier", "provider.normalized", "model", "provider.model"], limit, tierFilters(tier)), + ), + querySpec( + "provider-day", + tier, + metricQuery(["date", "tier", "provider.normalized"], limit, tierFilters(tier)), + ), + querySpec("geo-day", tier, metricQuery(["date", "tier", "country", "continent"], limit, tierFilters(tier))), + ]) + const weekly = includeWeekly + ? tiers.flatMap((tier) => [ + querySpec( + "model-week", + tier, + metricQuery(["tier", "provider.normalized", "model", "provider.model"], limit, tierFilters(tier)), + ), + querySpec("provider-week", tier, metricQuery(["tier", "provider.normalized"], limit, tierFilters(tier))), + querySpec("geo-week", tier, metricQuery(["tier", "country", "continent"], limit, tierFilters(tier))), + ]) + : [] + + return [...daily, ...weekly] +} + +function querySpec(importKey: GeneratedImportKey, tier: string, query: ReturnType) { return { - model: metricQuery(["stat_tier", "stat_provider", "model"], timing, limit), - model_provider_model: lookupQuery(["stat_tier", "stat_provider", "model", "provider.model"], timing, limit), - provider: metricQuery(["stat_tier", "stat_provider"], timing, limit), - geo: metricQuery(["stat_tier", "stat_country"], timing, limit), - geo_continent: lookupQuery(["stat_tier", "stat_country", "cf.continent"], timing, limit), + name: `${importKey}-${queryNameSegment(tier)}`, + importKey, + importFlag: `--${importKey}` as const, + query, } } -function metricQuery(breakdowns: string[], timing: Timing, limit: number) { +function metricQuery( + breakdowns: string[], + limit: number, + filters: ReturnType = [], +) { return { - ...timing, + granularity: 0, breakdowns, - calculated_fields: [...commonCalculatedFields(), ...metricCalculatedFields()], calculations: [ - { op: "COUNT_DISTINCT", column: "session", name: "sessions" }, - { op: "COUNT", name: "requests" }, - { op: "SUM", column: "tokens.input", name: "input_tokens" }, - { op: "SUM", column: "tokens.output", name: "output_tokens" }, - { op: "SUM", column: "tokens.reasoning", name: "reasoning_tokens" }, - { op: "SUM", column: "tokens.cache_read", name: "cache_read_tokens" }, - { op: "SUM", column: "stat_tokens_total", name: "total_tokens" }, - { op: "SUM", column: "stat_cost_input_microcents", name: "input_cost_microcents" }, - { op: "SUM", column: "stat_cost_output_microcents", name: "output_cost_microcents" }, - { op: "SUM", column: "stat_cost_total_microcents", name: "total_cost_microcents" }, - { op: "AVG", column: "duration", name: "avg_duration_ms" }, - { op: "P50", column: "duration", name: "p50_duration_ms" }, - { op: "P95", column: "duration", name: "p95_duration_ms" }, - { op: "AVG", column: "time_to_first_byte", name: "avg_ttfb_ms" }, - { op: "P50", column: "time_to_first_byte", name: "p50_ttfb_ms" }, - { op: "P95", column: "time_to_first_byte", name: "p95_ttfb_ms" }, - { op: "AVG", column: "stat_output_tps", name: "avg_output_tps" }, - { op: "SUM", column: "stat_success", name: "success_count" }, - { op: "SUM", column: "stat_error", name: "error_count" }, - { op: "COUNT", name: "sample_count" }, + { op: "COUNT_DISTINCT", column: "session" }, + { op: "COUNT" }, + { op: "SUM", column: "tokens.input" }, + { op: "SUM", column: "tokens.output" }, + { op: "SUM", column: "tokens.reasoning" }, + { op: "SUM", column: "tokens.cache_read" }, + { op: "SUM", column: "tokens" }, + { op: "SUM", column: "cost.input.microcents" }, + { op: "SUM", column: "cost.output.microcents" }, + { op: "SUM", column: "cost.total.microcents" }, + { op: "AVG", column: "duration" }, + { op: "P50", column: "duration" }, + { op: "P95", column: "duration" }, + { op: "AVG", column: "time_to_first_byte" }, + { op: "P50", column: "time_to_first_byte" }, + { op: "P95", column: "time_to_first_byte" }, + { op: "AVG", column: "tps.output" }, + { op: "SUM", column: "success" }, + { op: "SUM", column: "error" }, ], - filters: commonFilters(), + filters: [...commonFilters(), ...filters], filter_combination: "AND", - orders: [{ column: "stat_tokens_total", op: "SUM", order: "descending" }], + orders: [{ column: "tokens", op: "SUM", order: "descending" }], + havings: [], limit, + formulas: [], } } -function lookupQuery(breakdowns: string[], timing: Timing, limit: number) { - return { - ...timing, - breakdowns, - calculated_fields: commonCalculatedFields(), - calculations: [{ op: "COUNT", name: "requests" }], - filters: commonFilters(), - filter_combination: "AND", - orders: [{ op: "COUNT", order: "descending" }], - limit, - } +function tierFilters(tier: string) { + if (tier === "all") return [] + return [{ column: "tier", op: "=", value: tier }] } -function commonCalculatedFields() { - return [ - { - name: "stat_included_client", - expression: `IF(OR(CONTAINS(COALESCE($user_agent, ""), "ai-sdk"), CONTAINS(COALESCE($user_agent, ""), "opencode")), 1, 0)`, - }, - { - name: "stat_tier", - expression: `IF(EQUALS(COALESCE($source, ""), "lite"), "Go", OR(EQUALS(COALESCE($model, ""), "gpt-5-nano"), EQUALS(COALESCE($model, ""), "grok-code"), EQUALS(COALESCE($model, ""), "big-pickle"), ENDS_WITH(COALESCE($model, ""), "-free")), "Free", "Zen")`, - }, - { - name: "stat_provider", - expression: - `IF(STARTS_WITH(COALESCE($provider, ""), "minimax-plan"), "minimax-plan", STARTS_WITH(COALESCE($provider, ""), "zai-plan"), "zai-plan", STARTS_WITH(COALESCE($provider, ""), "azure-databricks"), "azure-databricks", REG_MATCH(COALESCE($provider, ""), ` + - "`^azure[0-9]+`" + - `), "azure-openai", COALESCE($provider, "unknown"))`, - }, - { name: "stat_country", expression: `COALESCE($cf.country, "ZZ")` }, - ] -} - -function metricCalculatedFields() { - return [ - { - name: "stat_tokens_total", - expression: `SUM(COALESCE($tokens.cache_read, 0), COALESCE($tokens.cache_write_5m, 0), COALESCE($tokens.input, 0), COALESCE($tokens.output, 0))`, - }, - { - name: "stat_cost_input_microcents", - expression: `COALESCE($cost.input.microcents, MUL($cost.input, 1000000), 0)`, - }, - { - name: "stat_cost_output_microcents", - expression: `COALESCE($cost.output.microcents, MUL($cost.output, 1000000), 0)`, - }, - { - name: "stat_cost_total_microcents", - expression: `COALESCE($cost.total.microcents, MUL($cost.total, 1000000), 0)`, - }, - { - name: "stat_output_tps", - expression: `IF(LT(SUB($timestamp.last_byte, $timestamp.first_byte), 100), null, DIV(MUL($tokens.output, 1000), SUB($timestamp.last_byte, $timestamp.first_byte)))`, - }, - { name: "stat_success", expression: `IF(AND(GTE($status, 200), LT($status, 400)), 1, 0)` }, - { name: "stat_error", expression: `IF(GTE($status, 400), 1, 0)` }, - ] +function queryNameSegment(value: string) { + return ( + value + .toLowerCase() + .replace(/[^a-z0-9]+/g, "-") + .replace(/^-|-$/g, "") || "all" + ) } function commonFilters() { @@ -280,28 +269,27 @@ function commonFilters() { { column: "event_type", op: "=", value: "completions" }, { column: "model", op: "exists" }, { column: "model", op: "!=", value: "" }, - { column: "stat_included_client", op: "=", value: 1 }, ] } function metricRows( - file: string | undefined, + files: string[] | undefined, grain: Grain, opts: ImportOptions, map: (row: RawRow, base: StatBaseAggregate) => T, ) { - if (!file) return Promise.resolve([]) - return readRows(file).then((rows) => rows.map((row) => map(row, baseAggregate(row, grain, opts)))) + if (!files) return Promise.resolve([]) + return readFiles(files).then((rows) => rows.map((row) => map(row, baseAggregate(row, grain, opts)))) } function lookupRows( - file: string | undefined, + files: string[] | undefined, grain: Grain, opts: ImportOptions, map: (row: RawRow, grain: Grain, opts: ImportOptions) => readonly (readonly [string, string])[], ) { - if (!file) return Promise.resolve([]) - return readRows(file).then((rows) => + if (!files) return Promise.resolve([]) + return readFiles(files).then((rows) => Array.from( rows .flatMap((row) => map(row, grain, opts)) @@ -313,6 +301,64 @@ function lookupRows( ) } +async function readFiles(files: string[]) { + return (await Promise.all(files.map(readRows))).flat() +} + +async function discoverFiles(directories: string[]) { + const classified = await Promise.all( + (await Promise.all(directories.map(filesInDirectory))).flat().map(async (file) => ({ + file, + key: classifyRows(file, await readRows(file)), + })), + ) + return classified.reduce>>((result, item) => { + return { ...result, [item.key]: [...(result[item.key] ?? []), item.file] } + }, {}) +} + +async function filesInDirectory(directory: string): Promise { + return ( + await Promise.all( + (await readdir(directory, { withFileTypes: true })).map((entry) => { + const file = path.join(directory, entry.name) + if (entry.isDirectory()) return filesInDirectory(file) + if (entry.isFile() && /\.(csv|json)$/i.test(entry.name)) return Promise.resolve([file]) + return Promise.resolve([]) + }), + ) + ).flat() +} + +function classifyRows(file: string, rows: RawRow[]): ImportKey { + if (rows.length === 0) fail(`Cannot classify empty export: ${file}`) + const headers = new Set(rows.flatMap((row) => Object.keys(row).map(normalizeHeader))) + const grain: Grain = headers.has("date") ? "day" : "week" + if (headers.has("model")) return hasMetricHeaders(headers) ? `model-${grain}` : `model-provider-model-${grain}` + if (hasHeader(headers, ["country", "cf.country"])) + return hasMetricHeaders(headers) ? `geo-${grain}` : `geo-continent-${grain}` + if (hasHeader(headers, ["provider", "provider.normalized"])) return `provider-${grain}` + fail(`Cannot classify export from columns in ${file}`) +} + +function hasMetricHeaders(headers: Set) { + return ["sumtokens", "sumtokensinput", "inputtokens", "totaltokens", "avgduration", "countdistinctsession"].some( + (header) => headers.has(header), + ) +} + +function hasHeader(headers: Set, names: string[]) { + return names.some((name) => headers.has(normalizeHeader(name))) +} + +function mergeFiles(left: Partial>, right: Partial>) { + return inputKeys.reduce>>((result, key) => { + const files = [...(left[key] ?? []), ...(right[key] ?? [])] + if (files.length === 0) return result + return { ...result, [key]: files } + }, {}) +} + function modelProviderModelLookup(row: RawRow, grain: Grain, opts: ImportOptions): [string, string][] { const base = basePeriod(row, grain, opts) const value = providerModel(row) @@ -340,18 +386,27 @@ function baseAggregate(row: RawRow, grain: Grain, opts: ImportOptions): StatBase reasoning_tokens: integer(row, "reasoning_tokens", ["SUM(tokens.reasoning)", "SUM(tokens_reasoning)"]), cache_read_tokens: integer(row, "cache_read_tokens", ["SUM(tokens.cache_read)", "SUM(tokens_cache_read)"]), total_tokens: integer(row, "total_tokens", ["SUM(stat_tokens_total)", "SUM(tokens)", "SUM(tokens_total)"]), - input_cost_microcents: integer(row, "input_cost_microcents", ["SUM(stat_cost_input_microcents)"]), - output_cost_microcents: integer(row, "output_cost_microcents", ["SUM(stat_cost_output_microcents)"]), - total_cost_microcents: integer(row, "total_cost_microcents", ["SUM(stat_cost_total_microcents)"]), + input_cost_microcents: integer(row, "input_cost_microcents", [ + "SUM(cost.input.microcents)", + "SUM(stat_cost_input_microcents)", + ]), + output_cost_microcents: integer(row, "output_cost_microcents", [ + "SUM(cost.output.microcents)", + "SUM(stat_cost_output_microcents)", + ]), + total_cost_microcents: integer(row, "total_cost_microcents", [ + "SUM(cost.total.microcents)", + "SUM(stat_cost_total_microcents)", + ]), avg_duration_ms: nullableNumber(row, "avg_duration_ms", ["AVG(duration)", "AVG(duration_ms)"]), p50_duration_ms: nullableInteger(row, "p50_duration_ms", ["P50(duration)", "P50(duration_ms)"]), p95_duration_ms: nullableInteger(row, "p95_duration_ms", ["P95(duration)", "P95(duration_ms)"]), avg_ttfb_ms: nullableNumber(row, "avg_ttfb_ms", ["AVG(time_to_first_byte)", "AVG(ttfb_ms)"]), p50_ttfb_ms: nullableInteger(row, "p50_ttfb_ms", ["P50(time_to_first_byte)", "P50(ttfb_ms)"]), p95_ttfb_ms: nullableInteger(row, "p95_ttfb_ms", ["P95(time_to_first_byte)", "P95(ttfb_ms)"]), - avg_output_tps: nullableNumber(row, "avg_output_tps", ["AVG(stat_output_tps)", "AVG(tps.output)"]), - success_count: integer(row, "success_count", ["SUM(stat_success)"]), - error_count: integer(row, "error_count", ["SUM(stat_error)"]), + avg_output_tps: nullableNumber(row, "avg_output_tps", ["AVG(tps.output)", "AVG(stat_output_tps)"]), + success_count: integer(row, "success_count", ["SUM(success)", "SUM(is_success)", "SUM(stat_success)"]), + error_count: integer(row, "error_count", ["SUM(error)", "SUM(is_error)", "SUM(stat_error)"]), sample_count: integer(row, "sample_count", ["COUNT", "COUNT()"]), } } @@ -477,7 +532,7 @@ function deriveTier(row: RawRow) { } function provider(row: RawRow) { - return normalizeProvider(cell(row, ["stat_provider", "provider"]) || "unknown") + return normalizeProvider(cell(row, ["provider.normalized", "stat_provider", "provider"]) || "unknown") } function normalizeProvider(value: string) { @@ -544,7 +599,7 @@ function normalizeHeader(value: string) { } function parseTime(row: RawRow) { - const value = cell(row, ["time", "timestamp", "date", "datetime", "bucket"]) + const value = cell(row, ["date", "time", "timestamp", "datetime", "bucket"]) if (!value) return undefined const numeric = Number(value) const date = Number.isFinite(numeric) @@ -562,10 +617,6 @@ function syncWeekStart(periodEnd: Date) { return new Date(Date.UTC(periodEnd.getUTCFullYear(), periodEnd.getUTCMonth(), periodEnd.getUTCDate() - 6)) } -function defaultPeriodEnd() { - return new Date(Math.floor((Date.now() - 5 * 60_000) / 60_000) * 60_000) -} - function sameUtcDay(left: Date, right: Date) { return ( left.getUTCFullYear() === right.getUTCFullYear() && @@ -799,14 +850,15 @@ async function upsertGeoRows(db: ReturnType, rows: GeoStatRow[]) function parseImportOptions(args: string[]): ImportOptions { const flags = parseFlags(args) - const files = inputKeys.reduce>>((result, key) => { - const value = flags.get(key)?.[0] - if (!value) return result - return { ...result, [key]: value } + const files = inputKeys.reduce>>((result, key) => { + const values = flags.get(key) + if (!values) return result + return { ...result, [key]: values } }, {}) return { dataset: flags.get("dataset")?.[0] ?? "zen", databaseUrl: flags.get("database-url")?.[0] ?? process.env.DATABASE_URL, + directories: flags.get("dir") ?? flags.get("directory") ?? [], dryRun: flags.has("dry-run"), periodEnd: parseDateFlag(flags, "period-end"), periodStart: parseDateFlag(flags, "period-start"), @@ -820,14 +872,15 @@ function parseFlags(args: string[]) { const arg = args[index] if (!arg.startsWith("--")) fail(`Unexpected argument: ${arg}`) const name = arg.slice(2) - if (name === "dry-run") { + if (name === "dry-run" || name === "include-weekly") { result.set(name, ["true"]) continue } - const value = args[index + 1] - if (!value || value.startsWith("--")) fail(`Missing value for --${name}`) - result.set(name, [...(result.get(name) ?? []), value]) - index++ + const nextFlag = args.findIndex((value, valueIndex) => valueIndex > index && value.startsWith("--")) + const values = args.slice(index + 1, nextFlag === -1 ? args.length : nextFlag) + if (values.length === 0) fail(`Missing value for --${name}`) + result.set(name, [...(result.get(name) ?? []), ...values]) + index += values.length } return result } @@ -848,10 +901,21 @@ function parseIntegerFlag(flags: Map, name: string) { return parsed } +function parseListFlag(flags: Map, name: string) { + const value = flags.get(name)?.[0] + if (!value) return undefined + if (value === "all") return ["all"] + return value + .split(",") + .map((item) => item.trim()) + .filter(Boolean) +} + function usage(): never { fail(`Usage: - bun src/honeycomb-backfill.ts queries [--period-end ISO] [--days 60] [--limit 1000] - bun src/honeycomb-backfill.ts import --period-end ISO [--dry-run] [--database-url URL] --model-day file.csv ...`) + bun src/honeycomb-backfill.ts queries [--tiers Go,Free,Paid] [--limit 1000] [--include-weekly] + bun src/honeycomb-backfill.ts import [--period-end ISO for weekly files] [--dry-run] [--database-url URL] --dir downloads + bun src/honeycomb-backfill.ts import [--period-end ISO for weekly files] [--dry-run] [--database-url URL] --model-day file.csv [--model-day more.csv] ...`) } function fail(message: string): never {