chore: updated backfill

This commit is contained in:
Adam 2026-05-27 14:02:53 -05:00
parent 6883309997
commit cf2cd13fb8
No known key found for this signature in database
GPG key ID: 9CB48779AF150E75

View file

@ -1,4 +1,6 @@
import { Client } from "@planetscale/database"
import { readdir } from "node:fs/promises"
import path from "node:path"
import { drizzle } from "drizzle-orm/planetscale-serverless"
import { geoStat, modelStat, providerStat } from "./database/schema"
import {
@ -17,23 +19,30 @@ import {
} from "./domain/stat"
const DAY_MS = 86_400_000
const DEFAULT_DAYS = 60
const DEFAULT_TIERS = ["Go", "Free", "Paid"]
const FREE_MODELS = new Set(["gpt-5-nano", "grok-code", "big-pickle"])
type Grain = "day" | "week"
type MetricDimension = "model" | "provider" | "geo"
type LookupDimension = "model-provider-model" | "geo-continent"
type ImportKey = `${MetricDimension | LookupDimension}-${Grain}`
type GeneratedImportKey = `${MetricDimension}-${Grain}`
type QuerySpec = {
name: string
importKey: ImportKey
importFlag: `--${ImportKey}`
query: ReturnType<typeof metricQuery>
}
type RawRow = Record<string, string>
type Period = { start: Date; end: Date }
type Timing = { start_time: number; end_time: number; granularity?: number }
type ImportOptions = {
dataset: string
databaseUrl: string | undefined
directories: string[]
dryRun: boolean
periodEnd: Date | undefined
periodStart: Date | undefined
files: Partial<Record<ImportKey, string>>
files: Partial<Record<ImportKey, string[]>>
}
type ModelAggregate = StatBaseAggregate & { provider: string; model: string; provider_model: string }
type ProviderAggregate = StatBaseAggregate & { provider: string }
@ -66,34 +75,24 @@ async function main() {
function printQueries(args: string[]) {
const flags = parseFlags(args)
const periodEnd = parseDateFlag(flags, "period-end") ?? defaultPeriodEnd()
const days = parseIntegerFlag(flags, "days") ?? DEFAULT_DAYS
const limit = parseIntegerFlag(flags, "limit") ?? 1000
const dailyStart = new Date(
Date.UTC(periodEnd.getUTCFullYear(), periodEnd.getUTCMonth(), periodEnd.getUTCDate() - days + 1),
)
const weekStart = syncWeekStart(periodEnd)
const tiers = parseListFlag(flags, "tiers") ?? DEFAULT_TIERS
const queries = buildQueries(limit, tiers, flags.has("include-weekly"))
const only = flags.get("only")?.[0]
if (only) {
const item = queries.find((query) => query.name === only)
if (!item) fail(`Unknown --only ${only}. Expected one of: ${queries.map((query) => query.name).join(", ")}`)
console.log(JSON.stringify(item.query, null, 2))
return
}
console.log(
JSON.stringify(
{
period_end: periodEnd.toISOString(),
import_hint: `bun src/honeycomb-backfill.ts import --period-end ${periodEnd.toISOString()} ...`,
daily: buildQuerySet(
{
start_time: Math.floor(dailyStart.getTime() / 1000),
end_time: Math.floor(periodEnd.getTime() / 1000),
granularity: DAY_MS / 1000,
},
limit,
),
week: buildQuerySet(
{
start_time: Math.floor(weekStart.getTime() / 1000),
end_time: Math.floor(periodEnd.getTime() / 1000),
},
limit,
),
tiers,
import_hint: "bun src/honeycomb-backfill.ts import --dir downloads",
queries,
},
null,
2,
@ -102,7 +101,9 @@ function printQueries(args: string[]) {
}
async function importFiles(args: string[]) {
const opts = parseImportOptions(args)
const parsed = parseImportOptions(args)
const opts = { ...parsed, files: mergeFiles(parsed.files, await discoverFiles(parsed.directories)) }
if (!inputKeys.some((key) => opts.files[key]?.length)) fail("No CSV or JSON import files were provided or discovered")
const providerModelLookup = new Map([
...(await lookupRows(opts.files["model-provider-model-day"], "day", opts, modelProviderModelLookup)),
...(await lookupRows(opts.files["model-provider-model-week"], "week", opts, modelProviderModelLookup)),
@ -151,6 +152,9 @@ async function importFiles(args: string[]) {
console.log(
JSON.stringify(
{
inputs: Object.fromEntries(
inputKeys.flatMap((key) => (opts.files[key]?.length ? [[key, opts.files[key].length]] : [])),
),
modelRows: modelRows.length,
providerRows: providerRows.length,
geoRows: geoRows.length,
@ -170,109 +174,94 @@ async function importFiles(args: string[]) {
await upsertGeoRows(db, geoRows)
}
function buildQuerySet(timing: Timing, limit: number) {
function buildQueries(limit: number, tiers: string[], includeWeekly: boolean): QuerySpec[] {
const daily = tiers.flatMap((tier) => [
querySpec(
"model-day",
tier,
metricQuery(["date", "tier", "provider.normalized", "model", "provider.model"], limit, tierFilters(tier)),
),
querySpec(
"provider-day",
tier,
metricQuery(["date", "tier", "provider.normalized"], limit, tierFilters(tier)),
),
querySpec("geo-day", tier, metricQuery(["date", "tier", "country", "continent"], limit, tierFilters(tier))),
])
const weekly = includeWeekly
? tiers.flatMap((tier) => [
querySpec(
"model-week",
tier,
metricQuery(["tier", "provider.normalized", "model", "provider.model"], limit, tierFilters(tier)),
),
querySpec("provider-week", tier, metricQuery(["tier", "provider.normalized"], limit, tierFilters(tier))),
querySpec("geo-week", tier, metricQuery(["tier", "country", "continent"], limit, tierFilters(tier))),
])
: []
return [...daily, ...weekly]
}
function querySpec(importKey: GeneratedImportKey, tier: string, query: ReturnType<typeof metricQuery>) {
return {
model: metricQuery(["stat_tier", "stat_provider", "model"], timing, limit),
model_provider_model: lookupQuery(["stat_tier", "stat_provider", "model", "provider.model"], timing, limit),
provider: metricQuery(["stat_tier", "stat_provider"], timing, limit),
geo: metricQuery(["stat_tier", "stat_country"], timing, limit),
geo_continent: lookupQuery(["stat_tier", "stat_country", "cf.continent"], timing, limit),
name: `${importKey}-${queryNameSegment(tier)}`,
importKey,
importFlag: `--${importKey}` as const,
query,
}
}
function metricQuery(breakdowns: string[], timing: Timing, limit: number) {
function metricQuery(
breakdowns: string[],
limit: number,
filters: ReturnType<typeof commonFilters> = [],
) {
return {
...timing,
granularity: 0,
breakdowns,
calculated_fields: [...commonCalculatedFields(), ...metricCalculatedFields()],
calculations: [
{ op: "COUNT_DISTINCT", column: "session", name: "sessions" },
{ op: "COUNT", name: "requests" },
{ op: "SUM", column: "tokens.input", name: "input_tokens" },
{ op: "SUM", column: "tokens.output", name: "output_tokens" },
{ op: "SUM", column: "tokens.reasoning", name: "reasoning_tokens" },
{ op: "SUM", column: "tokens.cache_read", name: "cache_read_tokens" },
{ op: "SUM", column: "stat_tokens_total", name: "total_tokens" },
{ op: "SUM", column: "stat_cost_input_microcents", name: "input_cost_microcents" },
{ op: "SUM", column: "stat_cost_output_microcents", name: "output_cost_microcents" },
{ op: "SUM", column: "stat_cost_total_microcents", name: "total_cost_microcents" },
{ op: "AVG", column: "duration", name: "avg_duration_ms" },
{ op: "P50", column: "duration", name: "p50_duration_ms" },
{ op: "P95", column: "duration", name: "p95_duration_ms" },
{ op: "AVG", column: "time_to_first_byte", name: "avg_ttfb_ms" },
{ op: "P50", column: "time_to_first_byte", name: "p50_ttfb_ms" },
{ op: "P95", column: "time_to_first_byte", name: "p95_ttfb_ms" },
{ op: "AVG", column: "stat_output_tps", name: "avg_output_tps" },
{ op: "SUM", column: "stat_success", name: "success_count" },
{ op: "SUM", column: "stat_error", name: "error_count" },
{ op: "COUNT", name: "sample_count" },
{ op: "COUNT_DISTINCT", column: "session" },
{ op: "COUNT" },
{ op: "SUM", column: "tokens.input" },
{ op: "SUM", column: "tokens.output" },
{ op: "SUM", column: "tokens.reasoning" },
{ op: "SUM", column: "tokens.cache_read" },
{ op: "SUM", column: "tokens" },
{ op: "SUM", column: "cost.input.microcents" },
{ op: "SUM", column: "cost.output.microcents" },
{ op: "SUM", column: "cost.total.microcents" },
{ op: "AVG", column: "duration" },
{ op: "P50", column: "duration" },
{ op: "P95", column: "duration" },
{ op: "AVG", column: "time_to_first_byte" },
{ op: "P50", column: "time_to_first_byte" },
{ op: "P95", column: "time_to_first_byte" },
{ op: "AVG", column: "tps.output" },
{ op: "SUM", column: "success" },
{ op: "SUM", column: "error" },
],
filters: commonFilters(),
filters: [...commonFilters(), ...filters],
filter_combination: "AND",
orders: [{ column: "stat_tokens_total", op: "SUM", order: "descending" }],
orders: [{ column: "tokens", op: "SUM", order: "descending" }],
havings: [],
limit,
formulas: [],
}
}
function lookupQuery(breakdowns: string[], timing: Timing, limit: number) {
return {
...timing,
breakdowns,
calculated_fields: commonCalculatedFields(),
calculations: [{ op: "COUNT", name: "requests" }],
filters: commonFilters(),
filter_combination: "AND",
orders: [{ op: "COUNT", order: "descending" }],
limit,
}
function tierFilters(tier: string) {
if (tier === "all") return []
return [{ column: "tier", op: "=", value: tier }]
}
function commonCalculatedFields() {
return [
{
name: "stat_included_client",
expression: `IF(OR(CONTAINS(COALESCE($user_agent, ""), "ai-sdk"), CONTAINS(COALESCE($user_agent, ""), "opencode")), 1, 0)`,
},
{
name: "stat_tier",
expression: `IF(EQUALS(COALESCE($source, ""), "lite"), "Go", OR(EQUALS(COALESCE($model, ""), "gpt-5-nano"), EQUALS(COALESCE($model, ""), "grok-code"), EQUALS(COALESCE($model, ""), "big-pickle"), ENDS_WITH(COALESCE($model, ""), "-free")), "Free", "Zen")`,
},
{
name: "stat_provider",
expression:
`IF(STARTS_WITH(COALESCE($provider, ""), "minimax-plan"), "minimax-plan", STARTS_WITH(COALESCE($provider, ""), "zai-plan"), "zai-plan", STARTS_WITH(COALESCE($provider, ""), "azure-databricks"), "azure-databricks", REG_MATCH(COALESCE($provider, ""), ` +
"`^azure[0-9]+`" +
`), "azure-openai", COALESCE($provider, "unknown"))`,
},
{ name: "stat_country", expression: `COALESCE($cf.country, "ZZ")` },
]
}
function metricCalculatedFields() {
return [
{
name: "stat_tokens_total",
expression: `SUM(COALESCE($tokens.cache_read, 0), COALESCE($tokens.cache_write_5m, 0), COALESCE($tokens.input, 0), COALESCE($tokens.output, 0))`,
},
{
name: "stat_cost_input_microcents",
expression: `COALESCE($cost.input.microcents, MUL($cost.input, 1000000), 0)`,
},
{
name: "stat_cost_output_microcents",
expression: `COALESCE($cost.output.microcents, MUL($cost.output, 1000000), 0)`,
},
{
name: "stat_cost_total_microcents",
expression: `COALESCE($cost.total.microcents, MUL($cost.total, 1000000), 0)`,
},
{
name: "stat_output_tps",
expression: `IF(LT(SUB($timestamp.last_byte, $timestamp.first_byte), 100), null, DIV(MUL($tokens.output, 1000), SUB($timestamp.last_byte, $timestamp.first_byte)))`,
},
{ name: "stat_success", expression: `IF(AND(GTE($status, 200), LT($status, 400)), 1, 0)` },
{ name: "stat_error", expression: `IF(GTE($status, 400), 1, 0)` },
]
function queryNameSegment(value: string) {
return (
value
.toLowerCase()
.replace(/[^a-z0-9]+/g, "-")
.replace(/^-|-$/g, "") || "all"
)
}
function commonFilters() {
@ -280,28 +269,27 @@ function commonFilters() {
{ column: "event_type", op: "=", value: "completions" },
{ column: "model", op: "exists" },
{ column: "model", op: "!=", value: "" },
{ column: "stat_included_client", op: "=", value: 1 },
]
}
function metricRows<T extends StatBaseAggregate>(
file: string | undefined,
files: string[] | undefined,
grain: Grain,
opts: ImportOptions,
map: (row: RawRow, base: StatBaseAggregate) => T,
) {
if (!file) return Promise.resolve([])
return readRows(file).then((rows) => rows.map((row) => map(row, baseAggregate(row, grain, opts))))
if (!files) return Promise.resolve([])
return readFiles(files).then((rows) => rows.map((row) => map(row, baseAggregate(row, grain, opts))))
}
function lookupRows(
file: string | undefined,
files: string[] | undefined,
grain: Grain,
opts: ImportOptions,
map: (row: RawRow, grain: Grain, opts: ImportOptions) => readonly (readonly [string, string])[],
) {
if (!file) return Promise.resolve([])
return readRows(file).then((rows) =>
if (!files) return Promise.resolve([])
return readFiles(files).then((rows) =>
Array.from(
rows
.flatMap((row) => map(row, grain, opts))
@ -313,6 +301,64 @@ function lookupRows(
)
}
async function readFiles(files: string[]) {
return (await Promise.all(files.map(readRows))).flat()
}
async function discoverFiles(directories: string[]) {
const classified = await Promise.all(
(await Promise.all(directories.map(filesInDirectory))).flat().map(async (file) => ({
file,
key: classifyRows(file, await readRows(file)),
})),
)
return classified.reduce<Partial<Record<ImportKey, string[]>>>((result, item) => {
return { ...result, [item.key]: [...(result[item.key] ?? []), item.file] }
}, {})
}
async function filesInDirectory(directory: string): Promise<string[]> {
return (
await Promise.all(
(await readdir(directory, { withFileTypes: true })).map((entry) => {
const file = path.join(directory, entry.name)
if (entry.isDirectory()) return filesInDirectory(file)
if (entry.isFile() && /\.(csv|json)$/i.test(entry.name)) return Promise.resolve([file])
return Promise.resolve([])
}),
)
).flat()
}
function classifyRows(file: string, rows: RawRow[]): ImportKey {
if (rows.length === 0) fail(`Cannot classify empty export: ${file}`)
const headers = new Set(rows.flatMap((row) => Object.keys(row).map(normalizeHeader)))
const grain: Grain = headers.has("date") ? "day" : "week"
if (headers.has("model")) return hasMetricHeaders(headers) ? `model-${grain}` : `model-provider-model-${grain}`
if (hasHeader(headers, ["country", "cf.country"]))
return hasMetricHeaders(headers) ? `geo-${grain}` : `geo-continent-${grain}`
if (hasHeader(headers, ["provider", "provider.normalized"])) return `provider-${grain}`
fail(`Cannot classify export from columns in ${file}`)
}
function hasMetricHeaders(headers: Set<string>) {
return ["sumtokens", "sumtokensinput", "inputtokens", "totaltokens", "avgduration", "countdistinctsession"].some(
(header) => headers.has(header),
)
}
function hasHeader(headers: Set<string>, names: string[]) {
return names.some((name) => headers.has(normalizeHeader(name)))
}
function mergeFiles(left: Partial<Record<ImportKey, string[]>>, right: Partial<Record<ImportKey, string[]>>) {
return inputKeys.reduce<Partial<Record<ImportKey, string[]>>>((result, key) => {
const files = [...(left[key] ?? []), ...(right[key] ?? [])]
if (files.length === 0) return result
return { ...result, [key]: files }
}, {})
}
function modelProviderModelLookup(row: RawRow, grain: Grain, opts: ImportOptions): [string, string][] {
const base = basePeriod(row, grain, opts)
const value = providerModel(row)
@ -340,18 +386,27 @@ function baseAggregate(row: RawRow, grain: Grain, opts: ImportOptions): StatBase
reasoning_tokens: integer(row, "reasoning_tokens", ["SUM(tokens.reasoning)", "SUM(tokens_reasoning)"]),
cache_read_tokens: integer(row, "cache_read_tokens", ["SUM(tokens.cache_read)", "SUM(tokens_cache_read)"]),
total_tokens: integer(row, "total_tokens", ["SUM(stat_tokens_total)", "SUM(tokens)", "SUM(tokens_total)"]),
input_cost_microcents: integer(row, "input_cost_microcents", ["SUM(stat_cost_input_microcents)"]),
output_cost_microcents: integer(row, "output_cost_microcents", ["SUM(stat_cost_output_microcents)"]),
total_cost_microcents: integer(row, "total_cost_microcents", ["SUM(stat_cost_total_microcents)"]),
input_cost_microcents: integer(row, "input_cost_microcents", [
"SUM(cost.input.microcents)",
"SUM(stat_cost_input_microcents)",
]),
output_cost_microcents: integer(row, "output_cost_microcents", [
"SUM(cost.output.microcents)",
"SUM(stat_cost_output_microcents)",
]),
total_cost_microcents: integer(row, "total_cost_microcents", [
"SUM(cost.total.microcents)",
"SUM(stat_cost_total_microcents)",
]),
avg_duration_ms: nullableNumber(row, "avg_duration_ms", ["AVG(duration)", "AVG(duration_ms)"]),
p50_duration_ms: nullableInteger(row, "p50_duration_ms", ["P50(duration)", "P50(duration_ms)"]),
p95_duration_ms: nullableInteger(row, "p95_duration_ms", ["P95(duration)", "P95(duration_ms)"]),
avg_ttfb_ms: nullableNumber(row, "avg_ttfb_ms", ["AVG(time_to_first_byte)", "AVG(ttfb_ms)"]),
p50_ttfb_ms: nullableInteger(row, "p50_ttfb_ms", ["P50(time_to_first_byte)", "P50(ttfb_ms)"]),
p95_ttfb_ms: nullableInteger(row, "p95_ttfb_ms", ["P95(time_to_first_byte)", "P95(ttfb_ms)"]),
avg_output_tps: nullableNumber(row, "avg_output_tps", ["AVG(stat_output_tps)", "AVG(tps.output)"]),
success_count: integer(row, "success_count", ["SUM(stat_success)"]),
error_count: integer(row, "error_count", ["SUM(stat_error)"]),
avg_output_tps: nullableNumber(row, "avg_output_tps", ["AVG(tps.output)", "AVG(stat_output_tps)"]),
success_count: integer(row, "success_count", ["SUM(success)", "SUM(is_success)", "SUM(stat_success)"]),
error_count: integer(row, "error_count", ["SUM(error)", "SUM(is_error)", "SUM(stat_error)"]),
sample_count: integer(row, "sample_count", ["COUNT", "COUNT()"]),
}
}
@ -477,7 +532,7 @@ function deriveTier(row: RawRow) {
}
function provider(row: RawRow) {
return normalizeProvider(cell(row, ["stat_provider", "provider"]) || "unknown")
return normalizeProvider(cell(row, ["provider.normalized", "stat_provider", "provider"]) || "unknown")
}
function normalizeProvider(value: string) {
@ -544,7 +599,7 @@ function normalizeHeader(value: string) {
}
function parseTime(row: RawRow) {
const value = cell(row, ["time", "timestamp", "date", "datetime", "bucket"])
const value = cell(row, ["date", "time", "timestamp", "datetime", "bucket"])
if (!value) return undefined
const numeric = Number(value)
const date = Number.isFinite(numeric)
@ -562,10 +617,6 @@ function syncWeekStart(periodEnd: Date) {
return new Date(Date.UTC(periodEnd.getUTCFullYear(), periodEnd.getUTCMonth(), periodEnd.getUTCDate() - 6))
}
function defaultPeriodEnd() {
return new Date(Math.floor((Date.now() - 5 * 60_000) / 60_000) * 60_000)
}
function sameUtcDay(left: Date, right: Date) {
return (
left.getUTCFullYear() === right.getUTCFullYear() &&
@ -799,14 +850,15 @@ async function upsertGeoRows(db: ReturnType<typeof drizzle>, rows: GeoStatRow[])
function parseImportOptions(args: string[]): ImportOptions {
const flags = parseFlags(args)
const files = inputKeys.reduce<Partial<Record<ImportKey, string>>>((result, key) => {
const value = flags.get(key)?.[0]
if (!value) return result
return { ...result, [key]: value }
const files = inputKeys.reduce<Partial<Record<ImportKey, string[]>>>((result, key) => {
const values = flags.get(key)
if (!values) return result
return { ...result, [key]: values }
}, {})
return {
dataset: flags.get("dataset")?.[0] ?? "zen",
databaseUrl: flags.get("database-url")?.[0] ?? process.env.DATABASE_URL,
directories: flags.get("dir") ?? flags.get("directory") ?? [],
dryRun: flags.has("dry-run"),
periodEnd: parseDateFlag(flags, "period-end"),
periodStart: parseDateFlag(flags, "period-start"),
@ -820,14 +872,15 @@ function parseFlags(args: string[]) {
const arg = args[index]
if (!arg.startsWith("--")) fail(`Unexpected argument: ${arg}`)
const name = arg.slice(2)
if (name === "dry-run") {
if (name === "dry-run" || name === "include-weekly") {
result.set(name, ["true"])
continue
}
const value = args[index + 1]
if (!value || value.startsWith("--")) fail(`Missing value for --${name}`)
result.set(name, [...(result.get(name) ?? []), value])
index++
const nextFlag = args.findIndex((value, valueIndex) => valueIndex > index && value.startsWith("--"))
const values = args.slice(index + 1, nextFlag === -1 ? args.length : nextFlag)
if (values.length === 0) fail(`Missing value for --${name}`)
result.set(name, [...(result.get(name) ?? []), ...values])
index += values.length
}
return result
}
@ -848,10 +901,21 @@ function parseIntegerFlag(flags: Map<string, string[]>, name: string) {
return parsed
}
function parseListFlag(flags: Map<string, string[]>, name: string) {
const value = flags.get(name)?.[0]
if (!value) return undefined
if (value === "all") return ["all"]
return value
.split(",")
.map((item) => item.trim())
.filter(Boolean)
}
function usage(): never {
fail(`Usage:
bun src/honeycomb-backfill.ts queries [--period-end ISO] [--days 60] [--limit 1000]
bun src/honeycomb-backfill.ts import --period-end ISO [--dry-run] [--database-url URL] --model-day file.csv ...`)
bun src/honeycomb-backfill.ts queries [--tiers Go,Free,Paid] [--limit 1000] [--include-weekly]
bun src/honeycomb-backfill.ts import [--period-end ISO for weekly files] [--dry-run] [--database-url URL] --dir downloads
bun src/honeycomb-backfill.ts import [--period-end ISO for weekly files] [--dry-run] [--database-url URL] --model-day file.csv [--model-day more.csv] ...`)
}
function fail(message: string): never {