mirror of
https://github.com/rcourtman/Pulse.git
synced 2026-05-20 01:01:20 +00:00
Record user-chat token usage to the cost ledger
chat.Service.ExecuteStream was a long-standing cost-ledger gap: the
agentic loop accumulated token counts via stream callbacks (see
GetTotalInputTokens / GetTotalOutputTokens in agentic_control.go)
and surfaced them in the SSE done envelope to the frontend, but
nothing on the server side recorded a cost.UsageEvent. Patrol,
discovery, QuickAnalysis, and the report narrators all record; only
chat — the bulk of AI token spend — did not. The operator's AI
usage dashboard was therefore understating cost dramatically.
Found while extending the cost-recording mindset across subpackages
after fixing QuickAnalysis (08491b9f4). Initially spawned as a
separate task but the right shape and scope became clear, so landing
it directly here.
Pipeline:
- Service.CostStore() exposes the per-tenant cost store handle.
- chat.Config gains optional CostStore *cost.Store field, threaded
into chat.Service.costStore at NewService time.
- chat.Service.recordChatTurnCost records a UsageEvent with
UseCase="chat" after every loop.ExecuteWithTools return (success
OR error — operator was billed regardless of clean response).
Skips when costStore is nil or zero tokens accumulated.
- ai_handler.go's two chatCfg construction sites populate CostStore
via h.resolveCostStore(ctx).
- router wires the resolver to AISettingsHandler.GetAIService(ctx).CostStore()
with no Enabled gate — even brief chat usage while AI was being
configured should appear in the dashboard.
ExecutePatrolStream is deliberately not changed. It creates a
separate tempLoop and its caller (patrol_ai.go) records cost via
its own helper at line 887. Recording in ExecuteStream only avoids
double-counting on the patrol-via-chat path.
Tests in chat/cost_recording_test.go cover: recording when store
configured, no-op when store nil, no-op on zero tokens (early
failures), graceful handling of model strings missing the
provider prefix.
This commit is contained in:
parent
e8049e894f
commit
a0b3bc7ed3
5 changed files with 222 additions and 0 deletions
102
internal/ai/chat/cost_recording_test.go
Normal file
102
internal/ai/chat/cost_recording_test.go
Normal file
|
|
@ -0,0 +1,102 @@
|
|||
package chat
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/rcourtman/pulse-go-rewrite/internal/ai/cost"
|
||||
)
|
||||
|
||||
// TestRecordChatTurnCost_RecordsWhenStoreConfigured verifies the
|
||||
// canonical cost-recording site for user chat turns: when the
|
||||
// service has a cost store and the loop accumulated tokens, a
|
||||
// cost.UsageEvent lands in the store with UseCase="chat".
|
||||
//
|
||||
// This is the chat-side anchor for the same "every Chat caller
|
||||
// records cost" invariant the internal/ai audit test enforces.
|
||||
// The chat package can't use the function-local audit because
|
||||
// recording is delegated from the loop (which calls ChatStream) to
|
||||
// the orchestrator (which holds the cost store) — see
|
||||
// recordChatTurnCost in service.go. So we test the orchestrator's
|
||||
// recording site directly.
|
||||
func TestRecordChatTurnCost_RecordsWhenStoreConfigured(t *testing.T) {
|
||||
store := cost.NewStore(7)
|
||||
svc := &Service{costStore: store}
|
||||
loop := &AgenticLoop{
|
||||
totalInputTokens: 120,
|
||||
totalOutputTokens: 45,
|
||||
}
|
||||
|
||||
svc.recordChatTurnCost(loop, "anthropic:claude-test")
|
||||
|
||||
events := store.ListEvents(1)
|
||||
if len(events) != 1 {
|
||||
t.Fatalf("expected 1 cost event, got %d", len(events))
|
||||
}
|
||||
ev := events[0]
|
||||
if ev.UseCase != "chat" {
|
||||
t.Errorf("UseCase = %q, want chat", ev.UseCase)
|
||||
}
|
||||
if ev.InputTokens != 120 || ev.OutputTokens != 45 {
|
||||
t.Errorf("tokens = (%d, %d), want (120, 45)", ev.InputTokens, ev.OutputTokens)
|
||||
}
|
||||
if ev.RequestModel != "anthropic:claude-test" {
|
||||
t.Errorf("RequestModel = %q, want anthropic:claude-test", ev.RequestModel)
|
||||
}
|
||||
if ev.Provider != "anthropic" {
|
||||
t.Errorf("Provider = %q, want anthropic", ev.Provider)
|
||||
}
|
||||
}
|
||||
|
||||
// TestRecordChatTurnCost_NoopWhenStoreNil verifies the recorder is
|
||||
// a no-op when the cost store is not configured. Self-hosted users
|
||||
// without cost tracking enabled should not see panics or errors.
|
||||
func TestRecordChatTurnCost_NoopWhenStoreNil(t *testing.T) {
|
||||
svc := &Service{costStore: nil}
|
||||
loop := &AgenticLoop{
|
||||
totalInputTokens: 100,
|
||||
totalOutputTokens: 50,
|
||||
}
|
||||
// Must not panic.
|
||||
svc.recordChatTurnCost(loop, "anthropic:claude-test")
|
||||
}
|
||||
|
||||
// TestRecordChatTurnCost_NoopWhenZeroTokens verifies the recorder
|
||||
// skips when the loop terminated before producing any tokens (e.g.
|
||||
// budget gate rejected the request before the first turn). Zero-
|
||||
// token events would pollute the dashboard.
|
||||
func TestRecordChatTurnCost_NoopWhenZeroTokens(t *testing.T) {
|
||||
store := cost.NewStore(7)
|
||||
svc := &Service{costStore: store}
|
||||
loop := &AgenticLoop{
|
||||
totalInputTokens: 0,
|
||||
totalOutputTokens: 0,
|
||||
}
|
||||
svc.recordChatTurnCost(loop, "anthropic:claude-test")
|
||||
|
||||
if events := store.ListEvents(1); len(events) != 0 {
|
||||
t.Errorf("expected 0 events for zero-token loop, got %d", len(events))
|
||||
}
|
||||
}
|
||||
|
||||
// TestRecordChatTurnCost_HandlesMalformedModel verifies provider
|
||||
// extraction tolerates model strings without a provider prefix
|
||||
// (defensive — the canonical pipeline always sends provider:model
|
||||
// but a future caller might not).
|
||||
func TestRecordChatTurnCost_HandlesMalformedModel(t *testing.T) {
|
||||
store := cost.NewStore(7)
|
||||
svc := &Service{costStore: store}
|
||||
loop := &AgenticLoop{totalInputTokens: 10, totalOutputTokens: 5}
|
||||
|
||||
svc.recordChatTurnCost(loop, "bare-model-name")
|
||||
|
||||
events := store.ListEvents(1)
|
||||
if len(events) != 1 {
|
||||
t.Fatalf("expected 1 event even for malformed model, got %d", len(events))
|
||||
}
|
||||
if events[0].Provider != "" {
|
||||
t.Errorf("Provider should be empty for malformed model, got %q", events[0].Provider)
|
||||
}
|
||||
if events[0].RequestModel != "bare-model-name" {
|
||||
t.Errorf("RequestModel should be passed through verbatim, got %q", events[0].RequestModel)
|
||||
}
|
||||
}
|
||||
|
|
@ -13,6 +13,7 @@ import (
|
|||
"github.com/google/uuid"
|
||||
"github.com/rcourtman/pulse-go-rewrite/internal/agentexec"
|
||||
"github.com/rcourtman/pulse-go-rewrite/internal/ai/approval"
|
||||
"github.com/rcourtman/pulse-go-rewrite/internal/ai/cost"
|
||||
"github.com/rcourtman/pulse-go-rewrite/internal/ai/modelboundary"
|
||||
"github.com/rcourtman/pulse-go-rewrite/internal/ai/providers"
|
||||
"github.com/rcourtman/pulse-go-rewrite/internal/ai/tools"
|
||||
|
|
@ -87,6 +88,16 @@ type Config struct {
|
|||
ReportNarrator reporting.Narrator
|
||||
ReportFleetNarrator reporting.FleetNarrator
|
||||
ReportFindingsProvider reporting.FindingsProvider
|
||||
|
||||
// Optional cost store. When set, ExecuteStream records a
|
||||
// cost.UsageEvent after each chat turn so user-chat token usage
|
||||
// appears in the operator dashboard alongside patrol, discovery,
|
||||
// and report-narrative spend. Absent value means "don't record"
|
||||
// — useful for tests and for callers that bring their own ledger.
|
||||
// ExecutePatrolStream intentionally does NOT record here; its
|
||||
// caller (patrol_ai.go) records via its own helper so cost is
|
||||
// never double-counted on the patrol-via-chat path.
|
||||
CostStore *cost.Store
|
||||
}
|
||||
|
||||
// Service provides direct AI chat without external sidecar
|
||||
|
|
@ -113,6 +124,11 @@ type Service struct {
|
|||
orgID string
|
||||
controlLevelResolver func(*config.AIConfig) string
|
||||
|
||||
// costStore receives a cost.UsageEvent after each ExecuteStream
|
||||
// turn so user-chat token usage is visible in the operator
|
||||
// dashboard. Nil means "don't record".
|
||||
costStore *cost.Store
|
||||
|
||||
activeMu sync.RWMutex
|
||||
activeExecutions map[string]map[*AgenticLoop]struct{}
|
||||
questionExecutions map[string]*AgenticLoop
|
||||
|
|
@ -171,6 +187,7 @@ func NewService(cfg Config) *Service {
|
|||
executor: executor,
|
||||
orgID: strings.TrimSpace(cfg.OrgID),
|
||||
controlLevelResolver: cfg.ControlLevelResolver,
|
||||
costStore: cfg.CostStore,
|
||||
activeExecutions: make(map[string]map[*AgenticLoop]struct{}),
|
||||
questionExecutions: make(map[string]*AgenticLoop),
|
||||
}
|
||||
|
|
@ -245,6 +262,44 @@ func (s *Service) unregisterActiveLoop(sessionID string, loop *AgenticLoop) {
|
|||
}
|
||||
}
|
||||
|
||||
// recordChatTurnCost records a cost.UsageEvent for a completed (or
|
||||
// failed) user-chat agentic turn. Uses the totals accumulated by the
|
||||
// loop's stream callbacks so the recorded numbers match what the
|
||||
// frontend sees in the SSE done event. No-op when the cost store is
|
||||
// not configured or when the loop consumed zero tokens (an early
|
||||
// failure before the first turn completed). UseCase is "chat" — the
|
||||
// canonical taxonomy noted on cost.UsageEvent.UseCase.
|
||||
func (s *Service) recordChatTurnCost(loop *AgenticLoop, requestModel string) {
|
||||
if s == nil || loop == nil {
|
||||
return
|
||||
}
|
||||
s.mu.RLock()
|
||||
store := s.costStore
|
||||
s.mu.RUnlock()
|
||||
if store == nil {
|
||||
return
|
||||
}
|
||||
inputTokens := loop.GetTotalInputTokens()
|
||||
outputTokens := loop.GetTotalOutputTokens()
|
||||
if inputTokens == 0 && outputTokens == 0 {
|
||||
return
|
||||
}
|
||||
providerName := ""
|
||||
if requestModel != "" {
|
||||
if idx := strings.Index(requestModel, ":"); idx > 0 {
|
||||
providerName = strings.ToLower(requestModel[:idx])
|
||||
}
|
||||
}
|
||||
store.Record(cost.UsageEvent{
|
||||
Timestamp: time.Now(),
|
||||
Provider: providerName,
|
||||
RequestModel: requestModel,
|
||||
UseCase: "chat",
|
||||
InputTokens: inputTokens,
|
||||
OutputTokens: outputTokens,
|
||||
})
|
||||
}
|
||||
|
||||
func (s *Service) registerQuestionLoop(questionID string, loop *AgenticLoop) {
|
||||
if s == nil || questionID == "" || loop == nil {
|
||||
return
|
||||
|
|
@ -812,6 +867,15 @@ func (s *Service) ExecuteStream(ctx context.Context, req ExecuteRequest, callbac
|
|||
Err(err).
|
||||
Msg("[ChatService] Agentic loop returned")
|
||||
|
||||
// Record cost regardless of error — the operator was billed for
|
||||
// whatever tokens the loop consumed before terminating. Without
|
||||
// this, every user chat turn was invisible in the AI usage
|
||||
// dashboard despite chat being the bulk of token spend.
|
||||
// ExecutePatrolStream uses a separate tempLoop and its caller
|
||||
// (patrol_ai.go) records cost via its own helper, so no double-
|
||||
// recording on the patrol-via-chat path.
|
||||
s.recordChatTurnCost(loop, selectedModel)
|
||||
|
||||
if err != nil {
|
||||
// Still save any messages we got
|
||||
for _, msg := range resultMessages {
|
||||
|
|
|
|||
|
|
@ -670,6 +670,16 @@ func (s *Service) GetAIConfig() *config.AIConfig {
|
|||
return s.cfg
|
||||
}
|
||||
|
||||
// CostStore returns the underlying usage cost store handle so callers
|
||||
// that own a Service can record cost events directly. Returned value
|
||||
// may be nil when persistence/initialization failed during NewService.
|
||||
// Consumers must nil-check before recording.
|
||||
func (s *Service) CostStore() *cost.Store {
|
||||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
return s.costStore
|
||||
}
|
||||
|
||||
// GetCostSummary returns usage rollups for the last N days.
|
||||
func (s *Service) GetCostSummary(days int) cost.Summary {
|
||||
s.mu.RLock()
|
||||
|
|
|
|||
|
|
@ -16,6 +16,7 @@ import (
|
|||
airuntime "github.com/rcourtman/pulse-go-rewrite/internal/ai"
|
||||
"github.com/rcourtman/pulse-go-rewrite/internal/ai/approval"
|
||||
"github.com/rcourtman/pulse-go-rewrite/internal/ai/chat"
|
||||
"github.com/rcourtman/pulse-go-rewrite/internal/ai/cost"
|
||||
"github.com/rcourtman/pulse-go-rewrite/internal/ai/tools"
|
||||
"github.com/rcourtman/pulse-go-rewrite/internal/ai/unified"
|
||||
"github.com/rcourtman/pulse-go-rewrite/internal/config"
|
||||
|
|
@ -122,6 +123,14 @@ type AIHandler struct {
|
|||
// return heuristic narrative. The ctx must carry the tenant org
|
||||
// via GetOrgID — same convention the reporting handler uses.
|
||||
reportNarratorResolver func(ctx context.Context) (reporting.Narrator, reporting.FleetNarrator, reporting.FindingsProvider)
|
||||
|
||||
// costStoreResolver returns the per-tenant cost store so chat
|
||||
// sessions can record user-chat token usage to the same ledger
|
||||
// patrol, discovery, and report-narrative spend already flow
|
||||
// through. Wired by the router from AISettingsHandler.GetAIService.
|
||||
// A nil return causes chat to skip recording — operator sees no
|
||||
// chat spend in the dashboard but no error either.
|
||||
costStoreResolver func(ctx context.Context) *cost.Store
|
||||
}
|
||||
|
||||
// SetReportNarratorResolver wires the optional per-tenant
|
||||
|
|
@ -144,6 +153,24 @@ func (h *AIHandler) resolveReportNarrator(ctx context.Context) (reporting.Narrat
|
|||
return h.reportNarratorResolver(ctx)
|
||||
}
|
||||
|
||||
// SetCostStoreResolver wires the optional per-tenant cost store
|
||||
// resolver. When unset (or when the resolver returns nil), chat
|
||||
// sessions skip cost recording — useful for tests and for self-hosted
|
||||
// deployments where the operator hasn't enabled cost tracking.
|
||||
func (h *AIHandler) SetCostStoreResolver(resolver func(ctx context.Context) *cost.Store) {
|
||||
if h == nil {
|
||||
return
|
||||
}
|
||||
h.costStoreResolver = resolver
|
||||
}
|
||||
|
||||
func (h *AIHandler) resolveCostStore(ctx context.Context) *cost.Store {
|
||||
if h == nil || h.costStoreResolver == nil {
|
||||
return nil
|
||||
}
|
||||
return h.costStoreResolver(ctx)
|
||||
}
|
||||
|
||||
// newChatService is the factory function for creating the AI service.
|
||||
// Can be swapped in tests for mocking.
|
||||
var newChatService = func(cfg chat.Config) AIService {
|
||||
|
|
@ -506,6 +533,7 @@ func (h *AIHandler) initTenantService(ctx context.Context, orgID string) AIServi
|
|||
}
|
||||
|
||||
chatCfg.ReportNarrator, chatCfg.ReportFleetNarrator, chatCfg.ReportFindingsProvider = h.resolveReportNarrator(tenantCtx)
|
||||
chatCfg.CostStore = h.resolveCostStore(tenantCtx)
|
||||
|
||||
svc := newChatService(chatCfg)
|
||||
if err := svc.Start(ctx); err != nil {
|
||||
|
|
@ -742,6 +770,7 @@ func (h *AIHandler) Start(ctx context.Context, monitor *monitoring.Monitor) erro
|
|||
}
|
||||
|
||||
chatCfg.ReportNarrator, chatCfg.ReportFleetNarrator, chatCfg.ReportFindingsProvider = h.resolveReportNarrator(serviceCtx)
|
||||
chatCfg.CostStore = h.resolveCostStore(serviceCtx)
|
||||
|
||||
svc := newChatService(chatCfg)
|
||||
if err := svc.Start(ctx); err != nil {
|
||||
|
|
|
|||
|
|
@ -32,6 +32,7 @@ import (
|
|||
"github.com/rcourtman/pulse-go-rewrite/internal/ai/baseline"
|
||||
"github.com/rcourtman/pulse-go-rewrite/internal/ai/chat"
|
||||
"github.com/rcourtman/pulse-go-rewrite/internal/ai/circuit"
|
||||
"github.com/rcourtman/pulse-go-rewrite/internal/ai/cost"
|
||||
"github.com/rcourtman/pulse-go-rewrite/internal/ai/forecast"
|
||||
"github.com/rcourtman/pulse-go-rewrite/internal/ai/knowledge"
|
||||
"github.com/rcourtman/pulse-go-rewrite/internal/ai/learning"
|
||||
|
|
@ -666,6 +667,22 @@ func (r *Router) setupRoutes() {
|
|||
}
|
||||
return svc, svc, svc
|
||||
})
|
||||
// Wire the per-tenant cost store into chat sessions so user-chat
|
||||
// token usage flows into the same operator-facing dashboard the
|
||||
// rest of the AI runtime uses (patrol, discovery,
|
||||
// report-narrative). No Enabled gate — even when chat ran briefly
|
||||
// while AI was being configured, those tokens were billed and
|
||||
// should be visible.
|
||||
r.aiHandler.SetCostStoreResolver(func(ctx context.Context) *cost.Store {
|
||||
if r.aiSettingsHandler == nil {
|
||||
return nil
|
||||
}
|
||||
svc := r.aiSettingsHandler.GetAIService(ctx)
|
||||
if svc == nil {
|
||||
return nil
|
||||
}
|
||||
return svc.CostStore()
|
||||
})
|
||||
|
||||
// AI-powered infrastructure discovery handlers
|
||||
// Note: The actual service is wired up later via SetDiscoveryService
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue