Record user-chat token usage to the cost ledger

chat.Service.ExecuteStream was a long-standing cost-ledger gap: the
agentic loop accumulated token counts via stream callbacks (see
GetTotalInputTokens / GetTotalOutputTokens in agentic_control.go)
and surfaced them in the SSE done envelope to the frontend, but
nothing on the server side recorded a cost.UsageEvent. Patrol,
discovery, QuickAnalysis, and the report narrators all record; only
chat — the bulk of AI token spend — did not. The operator's AI
usage dashboard was therefore understating cost dramatically.

Found while extending the cost-recording mindset across subpackages
after fixing QuickAnalysis (08491b9f4). Initially spawned as a
separate task but the right shape and scope became clear, so landing
it directly here.

Pipeline:
- Service.CostStore() exposes the per-tenant cost store handle.
- chat.Config gains optional CostStore *cost.Store field, threaded
  into chat.Service.costStore at NewService time.
- chat.Service.recordChatTurnCost records a UsageEvent with
  UseCase="chat" after every loop.ExecuteWithTools return (success
  OR error — operator was billed regardless of clean response).
  Skips when costStore is nil or zero tokens accumulated.
- ai_handler.go's two chatCfg construction sites populate CostStore
  via h.resolveCostStore(ctx).
- router wires the resolver to AISettingsHandler.GetAIService(ctx).CostStore()
  with no Enabled gate — even brief chat usage while AI was being
  configured should appear in the dashboard.

ExecutePatrolStream is deliberately not changed. It creates a
separate tempLoop and its caller (patrol_ai.go) records cost via
its own helper at line 887. Recording in ExecuteStream only avoids
double-counting on the patrol-via-chat path.

Tests in chat/cost_recording_test.go cover: recording when store
configured, no-op when store nil, no-op on zero tokens (early
failures), graceful handling of model strings missing the
provider prefix.
This commit is contained in:
rcourtman 2026-05-10 23:15:53 +01:00
parent e8049e894f
commit a0b3bc7ed3
5 changed files with 222 additions and 0 deletions

View file

@ -0,0 +1,102 @@
package chat
import (
"testing"
"github.com/rcourtman/pulse-go-rewrite/internal/ai/cost"
)
// TestRecordChatTurnCost_RecordsWhenStoreConfigured verifies the
// canonical cost-recording site for user chat turns: when the
// service has a cost store and the loop accumulated tokens, a
// cost.UsageEvent lands in the store with UseCase="chat".
//
// This is the chat-side anchor for the same "every Chat caller
// records cost" invariant the internal/ai audit test enforces.
// The chat package can't use the function-local audit because
// recording is delegated from the loop (which calls ChatStream) to
// the orchestrator (which holds the cost store) — see
// recordChatTurnCost in service.go. So we test the orchestrator's
// recording site directly.
func TestRecordChatTurnCost_RecordsWhenStoreConfigured(t *testing.T) {
store := cost.NewStore(7)
svc := &Service{costStore: store}
loop := &AgenticLoop{
totalInputTokens: 120,
totalOutputTokens: 45,
}
svc.recordChatTurnCost(loop, "anthropic:claude-test")
events := store.ListEvents(1)
if len(events) != 1 {
t.Fatalf("expected 1 cost event, got %d", len(events))
}
ev := events[0]
if ev.UseCase != "chat" {
t.Errorf("UseCase = %q, want chat", ev.UseCase)
}
if ev.InputTokens != 120 || ev.OutputTokens != 45 {
t.Errorf("tokens = (%d, %d), want (120, 45)", ev.InputTokens, ev.OutputTokens)
}
if ev.RequestModel != "anthropic:claude-test" {
t.Errorf("RequestModel = %q, want anthropic:claude-test", ev.RequestModel)
}
if ev.Provider != "anthropic" {
t.Errorf("Provider = %q, want anthropic", ev.Provider)
}
}
// TestRecordChatTurnCost_NoopWhenStoreNil verifies the recorder is
// a no-op when the cost store is not configured. Self-hosted users
// without cost tracking enabled should not see panics or errors.
func TestRecordChatTurnCost_NoopWhenStoreNil(t *testing.T) {
svc := &Service{costStore: nil}
loop := &AgenticLoop{
totalInputTokens: 100,
totalOutputTokens: 50,
}
// Must not panic.
svc.recordChatTurnCost(loop, "anthropic:claude-test")
}
// TestRecordChatTurnCost_NoopWhenZeroTokens verifies the recorder
// skips when the loop terminated before producing any tokens (e.g.
// budget gate rejected the request before the first turn). Zero-
// token events would pollute the dashboard.
func TestRecordChatTurnCost_NoopWhenZeroTokens(t *testing.T) {
store := cost.NewStore(7)
svc := &Service{costStore: store}
loop := &AgenticLoop{
totalInputTokens: 0,
totalOutputTokens: 0,
}
svc.recordChatTurnCost(loop, "anthropic:claude-test")
if events := store.ListEvents(1); len(events) != 0 {
t.Errorf("expected 0 events for zero-token loop, got %d", len(events))
}
}
// TestRecordChatTurnCost_HandlesMalformedModel verifies provider
// extraction tolerates model strings without a provider prefix
// (defensive — the canonical pipeline always sends provider:model
// but a future caller might not).
func TestRecordChatTurnCost_HandlesMalformedModel(t *testing.T) {
store := cost.NewStore(7)
svc := &Service{costStore: store}
loop := &AgenticLoop{totalInputTokens: 10, totalOutputTokens: 5}
svc.recordChatTurnCost(loop, "bare-model-name")
events := store.ListEvents(1)
if len(events) != 1 {
t.Fatalf("expected 1 event even for malformed model, got %d", len(events))
}
if events[0].Provider != "" {
t.Errorf("Provider should be empty for malformed model, got %q", events[0].Provider)
}
if events[0].RequestModel != "bare-model-name" {
t.Errorf("RequestModel should be passed through verbatim, got %q", events[0].RequestModel)
}
}

View file

@ -13,6 +13,7 @@ import (
"github.com/google/uuid"
"github.com/rcourtman/pulse-go-rewrite/internal/agentexec"
"github.com/rcourtman/pulse-go-rewrite/internal/ai/approval"
"github.com/rcourtman/pulse-go-rewrite/internal/ai/cost"
"github.com/rcourtman/pulse-go-rewrite/internal/ai/modelboundary"
"github.com/rcourtman/pulse-go-rewrite/internal/ai/providers"
"github.com/rcourtman/pulse-go-rewrite/internal/ai/tools"
@ -87,6 +88,16 @@ type Config struct {
ReportNarrator reporting.Narrator
ReportFleetNarrator reporting.FleetNarrator
ReportFindingsProvider reporting.FindingsProvider
// Optional cost store. When set, ExecuteStream records a
// cost.UsageEvent after each chat turn so user-chat token usage
// appears in the operator dashboard alongside patrol, discovery,
// and report-narrative spend. Absent value means "don't record"
// — useful for tests and for callers that bring their own ledger.
// ExecutePatrolStream intentionally does NOT record here; its
// caller (patrol_ai.go) records via its own helper so cost is
// never double-counted on the patrol-via-chat path.
CostStore *cost.Store
}
// Service provides direct AI chat without external sidecar
@ -113,6 +124,11 @@ type Service struct {
orgID string
controlLevelResolver func(*config.AIConfig) string
// costStore receives a cost.UsageEvent after each ExecuteStream
// turn so user-chat token usage is visible in the operator
// dashboard. Nil means "don't record".
costStore *cost.Store
activeMu sync.RWMutex
activeExecutions map[string]map[*AgenticLoop]struct{}
questionExecutions map[string]*AgenticLoop
@ -171,6 +187,7 @@ func NewService(cfg Config) *Service {
executor: executor,
orgID: strings.TrimSpace(cfg.OrgID),
controlLevelResolver: cfg.ControlLevelResolver,
costStore: cfg.CostStore,
activeExecutions: make(map[string]map[*AgenticLoop]struct{}),
questionExecutions: make(map[string]*AgenticLoop),
}
@ -245,6 +262,44 @@ func (s *Service) unregisterActiveLoop(sessionID string, loop *AgenticLoop) {
}
}
// recordChatTurnCost records a cost.UsageEvent for a completed (or
// failed) user-chat agentic turn. Uses the totals accumulated by the
// loop's stream callbacks so the recorded numbers match what the
// frontend sees in the SSE done event. No-op when the cost store is
// not configured or when the loop consumed zero tokens (an early
// failure before the first turn completed). UseCase is "chat" — the
// canonical taxonomy noted on cost.UsageEvent.UseCase.
func (s *Service) recordChatTurnCost(loop *AgenticLoop, requestModel string) {
if s == nil || loop == nil {
return
}
s.mu.RLock()
store := s.costStore
s.mu.RUnlock()
if store == nil {
return
}
inputTokens := loop.GetTotalInputTokens()
outputTokens := loop.GetTotalOutputTokens()
if inputTokens == 0 && outputTokens == 0 {
return
}
providerName := ""
if requestModel != "" {
if idx := strings.Index(requestModel, ":"); idx > 0 {
providerName = strings.ToLower(requestModel[:idx])
}
}
store.Record(cost.UsageEvent{
Timestamp: time.Now(),
Provider: providerName,
RequestModel: requestModel,
UseCase: "chat",
InputTokens: inputTokens,
OutputTokens: outputTokens,
})
}
func (s *Service) registerQuestionLoop(questionID string, loop *AgenticLoop) {
if s == nil || questionID == "" || loop == nil {
return
@ -812,6 +867,15 @@ func (s *Service) ExecuteStream(ctx context.Context, req ExecuteRequest, callbac
Err(err).
Msg("[ChatService] Agentic loop returned")
// Record cost regardless of error — the operator was billed for
// whatever tokens the loop consumed before terminating. Without
// this, every user chat turn was invisible in the AI usage
// dashboard despite chat being the bulk of token spend.
// ExecutePatrolStream uses a separate tempLoop and its caller
// (patrol_ai.go) records cost via its own helper, so no double-
// recording on the patrol-via-chat path.
s.recordChatTurnCost(loop, selectedModel)
if err != nil {
// Still save any messages we got
for _, msg := range resultMessages {

View file

@ -670,6 +670,16 @@ func (s *Service) GetAIConfig() *config.AIConfig {
return s.cfg
}
// CostStore returns the underlying usage cost store handle so callers
// that own a Service can record cost events directly. Returned value
// may be nil when persistence/initialization failed during NewService.
// Consumers must nil-check before recording.
func (s *Service) CostStore() *cost.Store {
s.mu.RLock()
defer s.mu.RUnlock()
return s.costStore
}
// GetCostSummary returns usage rollups for the last N days.
func (s *Service) GetCostSummary(days int) cost.Summary {
s.mu.RLock()

View file

@ -16,6 +16,7 @@ import (
airuntime "github.com/rcourtman/pulse-go-rewrite/internal/ai"
"github.com/rcourtman/pulse-go-rewrite/internal/ai/approval"
"github.com/rcourtman/pulse-go-rewrite/internal/ai/chat"
"github.com/rcourtman/pulse-go-rewrite/internal/ai/cost"
"github.com/rcourtman/pulse-go-rewrite/internal/ai/tools"
"github.com/rcourtman/pulse-go-rewrite/internal/ai/unified"
"github.com/rcourtman/pulse-go-rewrite/internal/config"
@ -122,6 +123,14 @@ type AIHandler struct {
// return heuristic narrative. The ctx must carry the tenant org
// via GetOrgID — same convention the reporting handler uses.
reportNarratorResolver func(ctx context.Context) (reporting.Narrator, reporting.FleetNarrator, reporting.FindingsProvider)
// costStoreResolver returns the per-tenant cost store so chat
// sessions can record user-chat token usage to the same ledger
// patrol, discovery, and report-narrative spend already flow
// through. Wired by the router from AISettingsHandler.GetAIService.
// A nil return causes chat to skip recording — operator sees no
// chat spend in the dashboard but no error either.
costStoreResolver func(ctx context.Context) *cost.Store
}
// SetReportNarratorResolver wires the optional per-tenant
@ -144,6 +153,24 @@ func (h *AIHandler) resolveReportNarrator(ctx context.Context) (reporting.Narrat
return h.reportNarratorResolver(ctx)
}
// SetCostStoreResolver wires the optional per-tenant cost store
// resolver. When unset (or when the resolver returns nil), chat
// sessions skip cost recording — useful for tests and for self-hosted
// deployments where the operator hasn't enabled cost tracking.
func (h *AIHandler) SetCostStoreResolver(resolver func(ctx context.Context) *cost.Store) {
if h == nil {
return
}
h.costStoreResolver = resolver
}
func (h *AIHandler) resolveCostStore(ctx context.Context) *cost.Store {
if h == nil || h.costStoreResolver == nil {
return nil
}
return h.costStoreResolver(ctx)
}
// newChatService is the factory function for creating the AI service.
// Can be swapped in tests for mocking.
var newChatService = func(cfg chat.Config) AIService {
@ -506,6 +533,7 @@ func (h *AIHandler) initTenantService(ctx context.Context, orgID string) AIServi
}
chatCfg.ReportNarrator, chatCfg.ReportFleetNarrator, chatCfg.ReportFindingsProvider = h.resolveReportNarrator(tenantCtx)
chatCfg.CostStore = h.resolveCostStore(tenantCtx)
svc := newChatService(chatCfg)
if err := svc.Start(ctx); err != nil {
@ -742,6 +770,7 @@ func (h *AIHandler) Start(ctx context.Context, monitor *monitoring.Monitor) erro
}
chatCfg.ReportNarrator, chatCfg.ReportFleetNarrator, chatCfg.ReportFindingsProvider = h.resolveReportNarrator(serviceCtx)
chatCfg.CostStore = h.resolveCostStore(serviceCtx)
svc := newChatService(chatCfg)
if err := svc.Start(ctx); err != nil {

View file

@ -32,6 +32,7 @@ import (
"github.com/rcourtman/pulse-go-rewrite/internal/ai/baseline"
"github.com/rcourtman/pulse-go-rewrite/internal/ai/chat"
"github.com/rcourtman/pulse-go-rewrite/internal/ai/circuit"
"github.com/rcourtman/pulse-go-rewrite/internal/ai/cost"
"github.com/rcourtman/pulse-go-rewrite/internal/ai/forecast"
"github.com/rcourtman/pulse-go-rewrite/internal/ai/knowledge"
"github.com/rcourtman/pulse-go-rewrite/internal/ai/learning"
@ -666,6 +667,22 @@ func (r *Router) setupRoutes() {
}
return svc, svc, svc
})
// Wire the per-tenant cost store into chat sessions so user-chat
// token usage flows into the same operator-facing dashboard the
// rest of the AI runtime uses (patrol, discovery,
// report-narrative). No Enabled gate — even when chat ran briefly
// while AI was being configured, those tokens were billed and
// should be visible.
r.aiHandler.SetCostStoreResolver(func(ctx context.Context) *cost.Store {
if r.aiSettingsHandler == nil {
return nil
}
svc := r.aiSettingsHandler.GetAIService(ctx)
if svc == nil {
return nil
}
return svc.CostStore()
})
// AI-powered infrastructure discovery handlers
// Note: The actual service is wired up later via SetDiscoveryService