Thread per-tenant AI narrators into pulse_summarize via chat session

v1 of pulse_summarize (1fe5d6853) shipped with heuristic narrative
only. The follow-up wiring promised in that commit now lands: the
chat session carries optional report-narration providers that the
tool's handler reads when building requests, so AI-narrated synthesis
flows into chat using the same provider, sanitizer, model selection,
cost ledger, and budget gate the report PDF endpoint already uses.

Pipeline:
- pkg/reporting Narrator / FleetNarrator / FindingsProvider interfaces
  are already implemented by internal/ai.Service. No new
  implementations.
- tools.ExecutorConfig + PulseToolExecutor gain three optional fields
  (ReportNarrator, ReportFleetNarrator, ReportFindingsProvider).
  Clone() copies them so per-session executors inherit the wiring.
- chat.Config gains the same three fields; NewService threads them
  into ExecutorConfig.
- tools_summarize.go reads e.reportNarrator/FleetNarrator/
  FindingsProvider and populates MetricReportRequest /
  MultiReportRequest. The engine already accepts these on the request
  and falls back to heuristic when they are nil — no engine changes
  needed.
- AIHandler gains SetReportNarratorResolver(ctx -> narrators); both
  per-tenant and default chat.Config construction sites invoke the
  resolver. Router wires the resolver to AISettingsHandler.GetAIService
  with the same Enabled-gate the reporting handler uses.

Unconfigured tenants are unchanged: the resolver returns nil, the
tool returns heuristic narrative — identical to today. Configured
tenants get AI synthesis in chat that matches what their report PDF
already carries, billed and budget-gated the same way.
This commit is contained in:
rcourtman 2026-05-10 22:50:17 +01:00
parent 7a7b3c9d30
commit 03463c1bfe
6 changed files with 260 additions and 8 deletions

View file

@ -19,6 +19,7 @@ import (
"github.com/rcourtman/pulse-go-rewrite/internal/config" "github.com/rcourtman/pulse-go-rewrite/internal/config"
"github.com/rcourtman/pulse-go-rewrite/internal/models" "github.com/rcourtman/pulse-go-rewrite/internal/models"
"github.com/rcourtman/pulse-go-rewrite/internal/unifiedresources" "github.com/rcourtman/pulse-go-rewrite/internal/unifiedresources"
"github.com/rcourtman/pulse-go-rewrite/pkg/reporting"
"github.com/rs/zerolog/log" "github.com/rs/zerolog/log"
) )
@ -78,6 +79,14 @@ type Config struct {
// Stored config may say autonomous, but runtime execution must use the // Stored config may say autonomous, but runtime execution must use the
// entitlement-clamped level. // entitlement-clamped level.
ControlLevelResolver func(*config.AIConfig) string ControlLevelResolver func(*config.AIConfig) string
// Optional report-narration providers for the pulse_summarize tool.
// When the per-tenant AI service is configured the API layer passes
// it here for all three roles; when unconfigured the tool returns
// heuristic narrative.
ReportNarrator reporting.Narrator
ReportFleetNarrator reporting.FleetNarrator
ReportFindingsProvider reporting.FindingsProvider
} }
// Service provides direct AI chat without external sidecar // Service provides direct AI chat without external sidecar
@ -138,6 +147,9 @@ func NewService(cfg Config) *Service {
AgentServer: agentServer, AgentServer: agentServer,
RecoveryPointsProvider: cfg.RecoveryPointsProvider, RecoveryPointsProvider: cfg.RecoveryPointsProvider,
OrgID: cfg.OrgID, OrgID: cfg.OrgID,
ReportNarrator: cfg.ReportNarrator,
ReportFleetNarrator: cfg.ReportFleetNarrator,
ReportFindingsProvider: cfg.ReportFindingsProvider,
} }
if cfg.AIConfig != nil { if cfg.AIConfig != nil {

View file

@ -10,6 +10,7 @@ import (
"github.com/rcourtman/pulse-go-rewrite/internal/models" "github.com/rcourtman/pulse-go-rewrite/internal/models"
"github.com/rcourtman/pulse-go-rewrite/internal/recovery" "github.com/rcourtman/pulse-go-rewrite/internal/recovery"
"github.com/rcourtman/pulse-go-rewrite/internal/unifiedresources" "github.com/rcourtman/pulse-go-rewrite/internal/unifiedresources"
"github.com/rcourtman/pulse-go-rewrite/pkg/reporting"
"github.com/rs/zerolog/log" "github.com/rs/zerolog/log"
) )
@ -486,6 +487,19 @@ type ExecutorConfig struct {
ControlLevel ControlLevel ControlLevel ControlLevel
ProtectedGuests []string // VMIDs that AI cannot control ProtectedGuests []string // VMIDs that AI cannot control
OrgID string // Tenant/org scope for approval records OrgID string // Tenant/org scope for approval records
// Optional report-narration providers, used by the pulse_summarize
// tool to produce AI-narrated synthesis in chat. When the per-tenant
// AI service is configured these are the same interfaces it exposes
// to the reporting handler for PDF generation. Absent values cause
// the tool to return heuristic narrative instead — identical to
// what the report PDF carries when no AI is configured. Defined
// here (rather than as separate setters) so chat.Config can wire
// them at session construction time alongside the rest of the
// executor's providers.
ReportNarrator reporting.Narrator
ReportFleetNarrator reporting.FleetNarrator
ReportFindingsProvider reporting.FindingsProvider
} }
// PulseToolExecutor implements ToolExecutor for Pulse-specific tools // PulseToolExecutor implements ToolExecutor for Pulse-specific tools
@ -566,6 +580,14 @@ type PulseToolExecutor struct {
patrolFindingCreatorMu sync.RWMutex patrolFindingCreatorMu sync.RWMutex
patrolFindingCreator PatrolFindingCreator patrolFindingCreator PatrolFindingCreator
// Report-narration providers, used by pulse_summarize when the
// per-tenant AI service is configured. Absent values cause the tool
// to fall back to heuristic narrative — identical to the report PDF
// behaviour when AI is unconfigured.
reportNarrator reporting.Narrator
reportFleetNarrator reporting.FleetNarrator
reportFindingsProvider reporting.FindingsProvider
// Tool registry // Tool registry
registry *ToolRegistry registry *ToolRegistry
} }
@ -621,6 +643,9 @@ func NewPulseToolExecutor(cfg ExecutorConfig) *PulseToolExecutor {
controlLevel: cfg.ControlLevel, controlLevel: cfg.ControlLevel,
protectedGuests: cfg.ProtectedGuests, protectedGuests: cfg.ProtectedGuests,
orgID: normalizeExecutorOrgID(cfg.OrgID), orgID: normalizeExecutorOrgID(cfg.OrgID),
reportNarrator: cfg.ReportNarrator,
reportFleetNarrator: cfg.ReportFleetNarrator,
reportFindingsProvider: cfg.ReportFindingsProvider,
registry: NewToolRegistry(), registry: NewToolRegistry(),
} }
@ -698,6 +723,9 @@ func (e *PulseToolExecutor) Clone() *PulseToolExecutor {
isAutonomous: e.isAutonomous, isAutonomous: e.isAutonomous,
orgID: e.orgID, orgID: e.orgID,
telemetryCallback: e.telemetryCallback, telemetryCallback: e.telemetryCallback,
reportNarrator: e.reportNarrator,
reportFleetNarrator: e.reportFleetNarrator,
reportFindingsProvider: e.reportFindingsProvider,
registry: e.registry, registry: e.registry,
} }
clone.patrolFindingCreator = e.GetPatrolFindingCreator() clone.patrolFindingCreator = e.GetPatrolFindingCreator()

View file

@ -155,10 +155,12 @@ func (e *PulseToolExecutor) summarizeResource(
} }
req := reporting.MetricReportRequest{ req := reporting.MetricReportRequest{
ResourceType: canonicalType, ResourceType: canonicalType,
ResourceID: resourceID, ResourceID: resourceID,
Start: start, Start: start,
End: end, End: end,
Narrator: e.reportNarrator,
FindingsProvider: e.reportFindingsProvider,
} }
narrative, err := engine.NarrativeFor(req) narrative, err := engine.NarrativeFor(req)
if err != nil { if err != nil {
@ -255,10 +257,13 @@ func (e *PulseToolExecutor) summarizeFleet(
} }
req := reporting.MultiReportRequest{ req := reporting.MultiReportRequest{
Title: "Fleet summary", Title: "Fleet summary",
Start: start, Start: start,
End: end, End: end,
Resources: resources, Resources: resources,
FleetNarrator: e.reportFleetNarrator,
Narrator: e.reportNarrator,
FindingsProvider: e.reportFindingsProvider,
} }
narrative, err := engine.FleetNarrativeFor(req) narrative, err := engine.FleetNarrativeFor(req)
if err != nil { if err != nil {

View file

@ -230,6 +230,158 @@ func TestSummarizeTool_FleetEnforcesMaxResources(t *testing.T) {
} }
} }
// stubReportNarrator implements reporting.Narrator with a recorded
// invocation so the test can assert the tool delegates to it.
type stubReportNarrator struct {
called bool
seen reporting.NarrativeInput
response reporting.Narrative
}
func (s *stubReportNarrator) Narrate(_ context.Context, in reporting.NarrativeInput) (reporting.Narrative, error) {
s.called = true
s.seen = in
return s.response, nil
}
type stubFleetReportNarrator struct {
called bool
seen reporting.FleetNarrativeInput
response reporting.FleetNarrative
}
func (s *stubFleetReportNarrator) NarrateFleet(_ context.Context, in reporting.FleetNarrativeInput) (reporting.FleetNarrative, error) {
s.called = true
s.seen = in
return s.response, nil
}
func TestSummarizeTool_UsesReportNarratorWhenConfigured(t *testing.T) {
dir := t.TempDir()
store, err := metrics.NewStore(metrics.StoreConfig{
DBPath: filepath.Join(dir, "metrics.db"),
WriteBufferSize: 10,
FlushInterval: 50 * time.Millisecond,
RetentionRaw: 24 * time.Hour,
RetentionMinute: 7 * 24 * time.Hour,
RetentionHourly: 30 * 24 * time.Hour,
RetentionDaily: 90 * 24 * time.Hour,
})
if err != nil {
t.Fatalf("metrics store: %v", err)
}
defer store.Close()
engine := reporting.NewReportEngine(reporting.EngineConfig{MetricsStore: store})
prev := reporting.GetEngine()
reporting.SetEngine(engine)
defer reporting.SetEngine(prev)
narrator := &stubReportNarrator{
response: reporting.Narrative{
Source: reporting.NarrativeSourceAI,
HealthStatus: "HEALTHY",
HealthMessage: "AI says fine",
Observations: []reporting.NarrativeBullet{
{Text: "AI bullet", Severity: reporting.NarrativeSeverityOK},
},
Recommendations: []string{"Continue monitoring"},
},
}
exec := NewPulseToolExecutor(ExecutorConfig{
ReportNarrator: narrator,
})
res, err := exec.executeSummarize(context.Background(), map[string]interface{}{
"action": "resource",
"resource_type": "node",
"resource_id": "node-with-narrator",
})
if err != nil {
t.Fatalf("executeSummarize: %v", err)
}
if res.IsError {
t.Fatalf("unexpected error: %+v", res.Content)
}
if !narrator.called {
t.Fatal("expected narrator to be invoked")
}
var parsed summarizeResourceResponse
if err := json.Unmarshal([]byte(res.Content[0].Text), &parsed); err != nil {
t.Fatalf("decode response: %v", err)
}
if parsed.NarrativeSource != reporting.NarrativeSourceAI {
t.Errorf("NarrativeSource = %q, want ai", parsed.NarrativeSource)
}
if parsed.HealthMessage != "AI says fine" {
t.Errorf("HealthMessage = %q, want AI says fine", parsed.HealthMessage)
}
if len(parsed.Observations) != 1 || parsed.Observations[0].Text != "AI bullet" {
t.Errorf("Observations = %#v", parsed.Observations)
}
}
func TestSummarizeTool_FleetUsesFleetNarratorWhenConfigured(t *testing.T) {
dir := t.TempDir()
store, err := metrics.NewStore(metrics.StoreConfig{
DBPath: filepath.Join(dir, "metrics.db"),
WriteBufferSize: 10,
FlushInterval: 50 * time.Millisecond,
RetentionRaw: 24 * time.Hour,
RetentionMinute: 7 * 24 * time.Hour,
RetentionHourly: 30 * 24 * time.Hour,
RetentionDaily: 90 * 24 * time.Hour,
})
if err != nil {
t.Fatalf("metrics store: %v", err)
}
defer store.Close()
engine := reporting.NewReportEngine(reporting.EngineConfig{MetricsStore: store})
prev := reporting.GetEngine()
reporting.SetEngine(engine)
defer reporting.SetEngine(prev)
fleet := &stubFleetReportNarrator{
response: reporting.FleetNarrative{
Source: reporting.NarrativeSourceAI,
HealthStatus: "WARNING",
HealthMessage: "Memory creeping up",
ExecutiveSummary: "AI fleet summary text",
Outliers: []reporting.FleetOutlier{
{ResourceID: "node-a", ResourceName: "alpha", Reason: "Memory at 92%", Severity: reporting.NarrativeSeverityWarning},
},
Recommendations: []string{"Review memory allocation"},
},
}
exec := NewPulseToolExecutor(ExecutorConfig{
ReportFleetNarrator: fleet,
})
res, err := exec.executeSummarize(context.Background(), map[string]interface{}{
"action": "fleet",
"resource_type": "node",
"resource_ids": "node-a,node-b",
})
if err != nil {
t.Fatalf("executeSummarize: %v", err)
}
if res.IsError {
t.Fatalf("unexpected error: %+v", res.Content)
}
if !fleet.called {
t.Fatal("expected fleet narrator to be invoked")
}
var parsed summarizeFleetResponse
if err := json.Unmarshal([]byte(res.Content[0].Text), &parsed); err != nil {
t.Fatalf("decode response: %v", err)
}
if parsed.NarrativeSource != reporting.NarrativeSourceAI {
t.Errorf("NarrativeSource = %q, want ai", parsed.NarrativeSource)
}
if len(parsed.Outliers) != 1 || parsed.Outliers[0].ResourceName != "alpha" {
t.Errorf("Outliers = %#v", parsed.Outliers)
}
}
func TestSummarizeRangeWindow(t *testing.T) { func TestSummarizeRangeWindow(t *testing.T) {
cases := map[string]time.Duration{ cases := map[string]time.Duration{
"24h": 24 * time.Hour, "24h": 24 * time.Hour,

View file

@ -23,6 +23,7 @@ import (
recoverymanager "github.com/rcourtman/pulse-go-rewrite/internal/recovery/manager" recoverymanager "github.com/rcourtman/pulse-go-rewrite/internal/recovery/manager"
"github.com/rcourtman/pulse-go-rewrite/internal/unifiedresources" "github.com/rcourtman/pulse-go-rewrite/internal/unifiedresources"
"github.com/rcourtman/pulse-go-rewrite/pkg/aicontracts" "github.com/rcourtman/pulse-go-rewrite/pkg/aicontracts"
"github.com/rcourtman/pulse-go-rewrite/pkg/reporting"
"github.com/rs/zerolog/log" "github.com/rs/zerolog/log"
) )
@ -111,6 +112,36 @@ type AIHandler struct {
approvalCreatedCallback func(*approval.ApprovalRequest) approvalCreatedCallback func(*approval.ApprovalRequest)
controlLevelResolver func(context.Context, *config.AIConfig) string controlLevelResolver func(context.Context, *config.AIConfig) string
patrolRunProvider patrolRunHandoffProvider patrolRunProvider patrolRunHandoffProvider
// reportNarratorResolver returns the per-tenant report-narration
// interfaces (single-resource, fleet, findings) for chat sessions
// so the pulse_summarize tool can produce AI-narrated synthesis
// using the same provider, sanitizer, budget gate, and cost ledger
// the report PDF endpoint already uses. Wired by the router from
// AISettingsHandler.GetAIService; absent values cause the tool to
// return heuristic narrative. The ctx must carry the tenant org
// via GetOrgID — same convention the reporting handler uses.
reportNarratorResolver func(ctx context.Context) (reporting.Narrator, reporting.FleetNarrator, reporting.FindingsProvider)
}
// SetReportNarratorResolver wires the optional per-tenant
// report-narrator resolver. When unset (or when the resolver returns
// nil), chat sessions construct their tool executor without report
// narrators and pulse_summarize falls back to heuristic narrative.
func (h *AIHandler) SetReportNarratorResolver(
resolver func(ctx context.Context) (reporting.Narrator, reporting.FleetNarrator, reporting.FindingsProvider),
) {
if h == nil {
return
}
h.reportNarratorResolver = resolver
}
func (h *AIHandler) resolveReportNarrator(ctx context.Context) (reporting.Narrator, reporting.FleetNarrator, reporting.FindingsProvider) {
if h == nil || h.reportNarratorResolver == nil {
return nil, nil, nil
}
return h.reportNarratorResolver(ctx)
} }
// newChatService is the factory function for creating the AI service. // newChatService is the factory function for creating the AI service.
@ -474,6 +505,8 @@ func (h *AIHandler) initTenantService(ctx context.Context, orgID string) AIServi
} }
} }
chatCfg.ReportNarrator, chatCfg.ReportFleetNarrator, chatCfg.ReportFindingsProvider = h.resolveReportNarrator(tenantCtx)
svc := newChatService(chatCfg) svc := newChatService(chatCfg)
if err := svc.Start(ctx); err != nil { if err := svc.Start(ctx); err != nil {
log.Error().Str("orgID", orgID).Err(err).Msg("Failed to start AI service for tenant") log.Error().Str("orgID", orgID).Err(err).Msg("Failed to start AI service for tenant")
@ -708,6 +741,8 @@ func (h *AIHandler) Start(ctx context.Context, monitor *monitoring.Monitor) erro
chatCfg.RecoveryPointsProvider = tools.NewRecoveryPointsMCPAdapter(recoveryManager, orgID) chatCfg.RecoveryPointsProvider = tools.NewRecoveryPointsMCPAdapter(recoveryManager, orgID)
} }
chatCfg.ReportNarrator, chatCfg.ReportFleetNarrator, chatCfg.ReportFindingsProvider = h.resolveReportNarrator(serviceCtx)
svc := newChatService(chatCfg) svc := newChatService(chatCfg)
if err := svc.Start(ctx); err != nil { if err := svc.Start(ctx); err != nil {
return fmt.Errorf("start AI chat service: %w", err) return fmt.Errorf("start AI chat service: %w", err)

View file

@ -646,6 +646,26 @@ func (r *Router) setupRoutes() {
r.aiHandler.SetServiceInitializer(func(ctx context.Context, service AIService) { r.aiHandler.SetServiceInitializer(func(ctx context.Context, service AIService) {
r.wireAIChatDependenciesForService(ctx, service) r.wireAIChatDependenciesForService(ctx, service)
}) })
// Wire the per-tenant report-narrator resolver into the chat
// handler so the pulse_summarize tool can produce AI-narrated
// synthesis using the same Service that powers the report PDF
// endpoint. The AI service implements all three interfaces; an
// unconfigured tenant returns nil and the tool falls back to
// heuristic narrative.
r.aiHandler.SetReportNarratorResolver(func(ctx context.Context) (reporting.Narrator, reporting.FleetNarrator, reporting.FindingsProvider) {
if r.aiSettingsHandler == nil {
return nil, nil, nil
}
svc := r.aiSettingsHandler.GetAIService(ctx)
if svc == nil {
return nil, nil, nil
}
cfg := svc.GetAIConfig()
if cfg == nil || !cfg.Enabled {
return nil, nil, nil
}
return svc, svc, svc
})
// AI-powered infrastructure discovery handlers // AI-powered infrastructure discovery handlers
// Note: The actual service is wired up later via SetDiscoveryService // Note: The actual service is wired up later via SetDiscoveryService