diff --git a/internal/ai/chat/cost_recording_test.go b/internal/ai/chat/cost_recording_test.go new file mode 100644 index 000000000..79bd23fd2 --- /dev/null +++ b/internal/ai/chat/cost_recording_test.go @@ -0,0 +1,102 @@ +package chat + +import ( + "testing" + + "github.com/rcourtman/pulse-go-rewrite/internal/ai/cost" +) + +// TestRecordChatTurnCost_RecordsWhenStoreConfigured verifies the +// canonical cost-recording site for user chat turns: when the +// service has a cost store and the loop accumulated tokens, a +// cost.UsageEvent lands in the store with UseCase="chat". +// +// This is the chat-side anchor for the same "every Chat caller +// records cost" invariant the internal/ai audit test enforces. +// The chat package can't use the function-local audit because +// recording is delegated from the loop (which calls ChatStream) to +// the orchestrator (which holds the cost store) — see +// recordChatTurnCost in service.go. So we test the orchestrator's +// recording site directly. +func TestRecordChatTurnCost_RecordsWhenStoreConfigured(t *testing.T) { + store := cost.NewStore(7) + svc := &Service{costStore: store} + loop := &AgenticLoop{ + totalInputTokens: 120, + totalOutputTokens: 45, + } + + svc.recordChatTurnCost(loop, "anthropic:claude-test") + + events := store.ListEvents(1) + if len(events) != 1 { + t.Fatalf("expected 1 cost event, got %d", len(events)) + } + ev := events[0] + if ev.UseCase != "chat" { + t.Errorf("UseCase = %q, want chat", ev.UseCase) + } + if ev.InputTokens != 120 || ev.OutputTokens != 45 { + t.Errorf("tokens = (%d, %d), want (120, 45)", ev.InputTokens, ev.OutputTokens) + } + if ev.RequestModel != "anthropic:claude-test" { + t.Errorf("RequestModel = %q, want anthropic:claude-test", ev.RequestModel) + } + if ev.Provider != "anthropic" { + t.Errorf("Provider = %q, want anthropic", ev.Provider) + } +} + +// TestRecordChatTurnCost_NoopWhenStoreNil verifies the recorder is +// a no-op when the cost store is not configured. Self-hosted users +// without cost tracking enabled should not see panics or errors. +func TestRecordChatTurnCost_NoopWhenStoreNil(t *testing.T) { + svc := &Service{costStore: nil} + loop := &AgenticLoop{ + totalInputTokens: 100, + totalOutputTokens: 50, + } + // Must not panic. + svc.recordChatTurnCost(loop, "anthropic:claude-test") +} + +// TestRecordChatTurnCost_NoopWhenZeroTokens verifies the recorder +// skips when the loop terminated before producing any tokens (e.g. +// budget gate rejected the request before the first turn). Zero- +// token events would pollute the dashboard. +func TestRecordChatTurnCost_NoopWhenZeroTokens(t *testing.T) { + store := cost.NewStore(7) + svc := &Service{costStore: store} + loop := &AgenticLoop{ + totalInputTokens: 0, + totalOutputTokens: 0, + } + svc.recordChatTurnCost(loop, "anthropic:claude-test") + + if events := store.ListEvents(1); len(events) != 0 { + t.Errorf("expected 0 events for zero-token loop, got %d", len(events)) + } +} + +// TestRecordChatTurnCost_HandlesMalformedModel verifies provider +// extraction tolerates model strings without a provider prefix +// (defensive — the canonical pipeline always sends provider:model +// but a future caller might not). +func TestRecordChatTurnCost_HandlesMalformedModel(t *testing.T) { + store := cost.NewStore(7) + svc := &Service{costStore: store} + loop := &AgenticLoop{totalInputTokens: 10, totalOutputTokens: 5} + + svc.recordChatTurnCost(loop, "bare-model-name") + + events := store.ListEvents(1) + if len(events) != 1 { + t.Fatalf("expected 1 event even for malformed model, got %d", len(events)) + } + if events[0].Provider != "" { + t.Errorf("Provider should be empty for malformed model, got %q", events[0].Provider) + } + if events[0].RequestModel != "bare-model-name" { + t.Errorf("RequestModel should be passed through verbatim, got %q", events[0].RequestModel) + } +} diff --git a/internal/ai/chat/service.go b/internal/ai/chat/service.go index 3d73d36f8..e794924bf 100644 --- a/internal/ai/chat/service.go +++ b/internal/ai/chat/service.go @@ -13,6 +13,7 @@ import ( "github.com/google/uuid" "github.com/rcourtman/pulse-go-rewrite/internal/agentexec" "github.com/rcourtman/pulse-go-rewrite/internal/ai/approval" + "github.com/rcourtman/pulse-go-rewrite/internal/ai/cost" "github.com/rcourtman/pulse-go-rewrite/internal/ai/modelboundary" "github.com/rcourtman/pulse-go-rewrite/internal/ai/providers" "github.com/rcourtman/pulse-go-rewrite/internal/ai/tools" @@ -87,6 +88,16 @@ type Config struct { ReportNarrator reporting.Narrator ReportFleetNarrator reporting.FleetNarrator ReportFindingsProvider reporting.FindingsProvider + + // Optional cost store. When set, ExecuteStream records a + // cost.UsageEvent after each chat turn so user-chat token usage + // appears in the operator dashboard alongside patrol, discovery, + // and report-narrative spend. Absent value means "don't record" + // — useful for tests and for callers that bring their own ledger. + // ExecutePatrolStream intentionally does NOT record here; its + // caller (patrol_ai.go) records via its own helper so cost is + // never double-counted on the patrol-via-chat path. + CostStore *cost.Store } // Service provides direct AI chat without external sidecar @@ -113,6 +124,11 @@ type Service struct { orgID string controlLevelResolver func(*config.AIConfig) string + // costStore receives a cost.UsageEvent after each ExecuteStream + // turn so user-chat token usage is visible in the operator + // dashboard. Nil means "don't record". + costStore *cost.Store + activeMu sync.RWMutex activeExecutions map[string]map[*AgenticLoop]struct{} questionExecutions map[string]*AgenticLoop @@ -171,6 +187,7 @@ func NewService(cfg Config) *Service { executor: executor, orgID: strings.TrimSpace(cfg.OrgID), controlLevelResolver: cfg.ControlLevelResolver, + costStore: cfg.CostStore, activeExecutions: make(map[string]map[*AgenticLoop]struct{}), questionExecutions: make(map[string]*AgenticLoop), } @@ -245,6 +262,44 @@ func (s *Service) unregisterActiveLoop(sessionID string, loop *AgenticLoop) { } } +// recordChatTurnCost records a cost.UsageEvent for a completed (or +// failed) user-chat agentic turn. Uses the totals accumulated by the +// loop's stream callbacks so the recorded numbers match what the +// frontend sees in the SSE done event. No-op when the cost store is +// not configured or when the loop consumed zero tokens (an early +// failure before the first turn completed). UseCase is "chat" — the +// canonical taxonomy noted on cost.UsageEvent.UseCase. +func (s *Service) recordChatTurnCost(loop *AgenticLoop, requestModel string) { + if s == nil || loop == nil { + return + } + s.mu.RLock() + store := s.costStore + s.mu.RUnlock() + if store == nil { + return + } + inputTokens := loop.GetTotalInputTokens() + outputTokens := loop.GetTotalOutputTokens() + if inputTokens == 0 && outputTokens == 0 { + return + } + providerName := "" + if requestModel != "" { + if idx := strings.Index(requestModel, ":"); idx > 0 { + providerName = strings.ToLower(requestModel[:idx]) + } + } + store.Record(cost.UsageEvent{ + Timestamp: time.Now(), + Provider: providerName, + RequestModel: requestModel, + UseCase: "chat", + InputTokens: inputTokens, + OutputTokens: outputTokens, + }) +} + func (s *Service) registerQuestionLoop(questionID string, loop *AgenticLoop) { if s == nil || questionID == "" || loop == nil { return @@ -812,6 +867,15 @@ func (s *Service) ExecuteStream(ctx context.Context, req ExecuteRequest, callbac Err(err). Msg("[ChatService] Agentic loop returned") + // Record cost regardless of error — the operator was billed for + // whatever tokens the loop consumed before terminating. Without + // this, every user chat turn was invisible in the AI usage + // dashboard despite chat being the bulk of token spend. + // ExecutePatrolStream uses a separate tempLoop and its caller + // (patrol_ai.go) records cost via its own helper, so no double- + // recording on the patrol-via-chat path. + s.recordChatTurnCost(loop, selectedModel) + if err != nil { // Still save any messages we got for _, msg := range resultMessages { diff --git a/internal/ai/service.go b/internal/ai/service.go index 8203f024c..3136412af 100644 --- a/internal/ai/service.go +++ b/internal/ai/service.go @@ -670,6 +670,16 @@ func (s *Service) GetAIConfig() *config.AIConfig { return s.cfg } +// CostStore returns the underlying usage cost store handle so callers +// that own a Service can record cost events directly. Returned value +// may be nil when persistence/initialization failed during NewService. +// Consumers must nil-check before recording. +func (s *Service) CostStore() *cost.Store { + s.mu.RLock() + defer s.mu.RUnlock() + return s.costStore +} + // GetCostSummary returns usage rollups for the last N days. func (s *Service) GetCostSummary(days int) cost.Summary { s.mu.RLock() diff --git a/internal/api/ai_handler.go b/internal/api/ai_handler.go index 47759b062..4c166baec 100644 --- a/internal/api/ai_handler.go +++ b/internal/api/ai_handler.go @@ -16,6 +16,7 @@ import ( airuntime "github.com/rcourtman/pulse-go-rewrite/internal/ai" "github.com/rcourtman/pulse-go-rewrite/internal/ai/approval" "github.com/rcourtman/pulse-go-rewrite/internal/ai/chat" + "github.com/rcourtman/pulse-go-rewrite/internal/ai/cost" "github.com/rcourtman/pulse-go-rewrite/internal/ai/tools" "github.com/rcourtman/pulse-go-rewrite/internal/ai/unified" "github.com/rcourtman/pulse-go-rewrite/internal/config" @@ -122,6 +123,14 @@ type AIHandler struct { // return heuristic narrative. The ctx must carry the tenant org // via GetOrgID — same convention the reporting handler uses. reportNarratorResolver func(ctx context.Context) (reporting.Narrator, reporting.FleetNarrator, reporting.FindingsProvider) + + // costStoreResolver returns the per-tenant cost store so chat + // sessions can record user-chat token usage to the same ledger + // patrol, discovery, and report-narrative spend already flow + // through. Wired by the router from AISettingsHandler.GetAIService. + // A nil return causes chat to skip recording — operator sees no + // chat spend in the dashboard but no error either. + costStoreResolver func(ctx context.Context) *cost.Store } // SetReportNarratorResolver wires the optional per-tenant @@ -144,6 +153,24 @@ func (h *AIHandler) resolveReportNarrator(ctx context.Context) (reporting.Narrat return h.reportNarratorResolver(ctx) } +// SetCostStoreResolver wires the optional per-tenant cost store +// resolver. When unset (or when the resolver returns nil), chat +// sessions skip cost recording — useful for tests and for self-hosted +// deployments where the operator hasn't enabled cost tracking. +func (h *AIHandler) SetCostStoreResolver(resolver func(ctx context.Context) *cost.Store) { + if h == nil { + return + } + h.costStoreResolver = resolver +} + +func (h *AIHandler) resolveCostStore(ctx context.Context) *cost.Store { + if h == nil || h.costStoreResolver == nil { + return nil + } + return h.costStoreResolver(ctx) +} + // newChatService is the factory function for creating the AI service. // Can be swapped in tests for mocking. var newChatService = func(cfg chat.Config) AIService { @@ -506,6 +533,7 @@ func (h *AIHandler) initTenantService(ctx context.Context, orgID string) AIServi } chatCfg.ReportNarrator, chatCfg.ReportFleetNarrator, chatCfg.ReportFindingsProvider = h.resolveReportNarrator(tenantCtx) + chatCfg.CostStore = h.resolveCostStore(tenantCtx) svc := newChatService(chatCfg) if err := svc.Start(ctx); err != nil { @@ -742,6 +770,7 @@ func (h *AIHandler) Start(ctx context.Context, monitor *monitoring.Monitor) erro } chatCfg.ReportNarrator, chatCfg.ReportFleetNarrator, chatCfg.ReportFindingsProvider = h.resolveReportNarrator(serviceCtx) + chatCfg.CostStore = h.resolveCostStore(serviceCtx) svc := newChatService(chatCfg) if err := svc.Start(ctx); err != nil { diff --git a/internal/api/router.go b/internal/api/router.go index 29e681b2f..30d006255 100644 --- a/internal/api/router.go +++ b/internal/api/router.go @@ -32,6 +32,7 @@ import ( "github.com/rcourtman/pulse-go-rewrite/internal/ai/baseline" "github.com/rcourtman/pulse-go-rewrite/internal/ai/chat" "github.com/rcourtman/pulse-go-rewrite/internal/ai/circuit" + "github.com/rcourtman/pulse-go-rewrite/internal/ai/cost" "github.com/rcourtman/pulse-go-rewrite/internal/ai/forecast" "github.com/rcourtman/pulse-go-rewrite/internal/ai/knowledge" "github.com/rcourtman/pulse-go-rewrite/internal/ai/learning" @@ -666,6 +667,22 @@ func (r *Router) setupRoutes() { } return svc, svc, svc }) + // Wire the per-tenant cost store into chat sessions so user-chat + // token usage flows into the same operator-facing dashboard the + // rest of the AI runtime uses (patrol, discovery, + // report-narrative). No Enabled gate — even when chat ran briefly + // while AI was being configured, those tokens were billed and + // should be visible. + r.aiHandler.SetCostStoreResolver(func(ctx context.Context) *cost.Store { + if r.aiSettingsHandler == nil { + return nil + } + svc := r.aiSettingsHandler.GetAIService(ctx) + if svc == nil { + return nil + } + return svc.CostStore() + }) // AI-powered infrastructure discovery handlers // Note: The actual service is wired up later via SetDiscoveryService