mirror of
https://github.com/rcourtman/Pulse.git
synced 2026-05-20 01:01:20 +00:00
Thread per-tenant AI narrators into pulse_summarize via chat session
v1 of pulse_summarize (1fe5d6853) shipped with heuristic narrative
only. The follow-up wiring promised in that commit now lands: the
chat session carries optional report-narration providers that the
tool's handler reads when building requests, so AI-narrated synthesis
flows into chat using the same provider, sanitizer, model selection,
cost ledger, and budget gate the report PDF endpoint already uses.
Pipeline:
- pkg/reporting Narrator / FleetNarrator / FindingsProvider interfaces
are already implemented by internal/ai.Service. No new
implementations.
- tools.ExecutorConfig + PulseToolExecutor gain three optional fields
(ReportNarrator, ReportFleetNarrator, ReportFindingsProvider).
Clone() copies them so per-session executors inherit the wiring.
- chat.Config gains the same three fields; NewService threads them
into ExecutorConfig.
- tools_summarize.go reads e.reportNarrator/FleetNarrator/
FindingsProvider and populates MetricReportRequest /
MultiReportRequest. The engine already accepts these on the request
and falls back to heuristic when they are nil — no engine changes
needed.
- AIHandler gains SetReportNarratorResolver(ctx -> narrators); both
per-tenant and default chat.Config construction sites invoke the
resolver. Router wires the resolver to AISettingsHandler.GetAIService
with the same Enabled-gate the reporting handler uses.
Unconfigured tenants are unchanged: the resolver returns nil, the
tool returns heuristic narrative — identical to today. Configured
tenants get AI synthesis in chat that matches what their report PDF
already carries, billed and budget-gated the same way.
This commit is contained in:
parent
7a7b3c9d30
commit
03463c1bfe
6 changed files with 260 additions and 8 deletions
|
|
@ -19,6 +19,7 @@ import (
|
|||
"github.com/rcourtman/pulse-go-rewrite/internal/config"
|
||||
"github.com/rcourtman/pulse-go-rewrite/internal/models"
|
||||
"github.com/rcourtman/pulse-go-rewrite/internal/unifiedresources"
|
||||
"github.com/rcourtman/pulse-go-rewrite/pkg/reporting"
|
||||
"github.com/rs/zerolog/log"
|
||||
)
|
||||
|
||||
|
|
@ -78,6 +79,14 @@ type Config struct {
|
|||
// Stored config may say autonomous, but runtime execution must use the
|
||||
// entitlement-clamped level.
|
||||
ControlLevelResolver func(*config.AIConfig) string
|
||||
|
||||
// Optional report-narration providers for the pulse_summarize tool.
|
||||
// When the per-tenant AI service is configured the API layer passes
|
||||
// it here for all three roles; when unconfigured the tool returns
|
||||
// heuristic narrative.
|
||||
ReportNarrator reporting.Narrator
|
||||
ReportFleetNarrator reporting.FleetNarrator
|
||||
ReportFindingsProvider reporting.FindingsProvider
|
||||
}
|
||||
|
||||
// Service provides direct AI chat without external sidecar
|
||||
|
|
@ -138,6 +147,9 @@ func NewService(cfg Config) *Service {
|
|||
AgentServer: agentServer,
|
||||
RecoveryPointsProvider: cfg.RecoveryPointsProvider,
|
||||
OrgID: cfg.OrgID,
|
||||
ReportNarrator: cfg.ReportNarrator,
|
||||
ReportFleetNarrator: cfg.ReportFleetNarrator,
|
||||
ReportFindingsProvider: cfg.ReportFindingsProvider,
|
||||
}
|
||||
|
||||
if cfg.AIConfig != nil {
|
||||
|
|
|
|||
|
|
@ -10,6 +10,7 @@ import (
|
|||
"github.com/rcourtman/pulse-go-rewrite/internal/models"
|
||||
"github.com/rcourtman/pulse-go-rewrite/internal/recovery"
|
||||
"github.com/rcourtman/pulse-go-rewrite/internal/unifiedresources"
|
||||
"github.com/rcourtman/pulse-go-rewrite/pkg/reporting"
|
||||
"github.com/rs/zerolog/log"
|
||||
)
|
||||
|
||||
|
|
@ -486,6 +487,19 @@ type ExecutorConfig struct {
|
|||
ControlLevel ControlLevel
|
||||
ProtectedGuests []string // VMIDs that AI cannot control
|
||||
OrgID string // Tenant/org scope for approval records
|
||||
|
||||
// Optional report-narration providers, used by the pulse_summarize
|
||||
// tool to produce AI-narrated synthesis in chat. When the per-tenant
|
||||
// AI service is configured these are the same interfaces it exposes
|
||||
// to the reporting handler for PDF generation. Absent values cause
|
||||
// the tool to return heuristic narrative instead — identical to
|
||||
// what the report PDF carries when no AI is configured. Defined
|
||||
// here (rather than as separate setters) so chat.Config can wire
|
||||
// them at session construction time alongside the rest of the
|
||||
// executor's providers.
|
||||
ReportNarrator reporting.Narrator
|
||||
ReportFleetNarrator reporting.FleetNarrator
|
||||
ReportFindingsProvider reporting.FindingsProvider
|
||||
}
|
||||
|
||||
// PulseToolExecutor implements ToolExecutor for Pulse-specific tools
|
||||
|
|
@ -566,6 +580,14 @@ type PulseToolExecutor struct {
|
|||
patrolFindingCreatorMu sync.RWMutex
|
||||
patrolFindingCreator PatrolFindingCreator
|
||||
|
||||
// Report-narration providers, used by pulse_summarize when the
|
||||
// per-tenant AI service is configured. Absent values cause the tool
|
||||
// to fall back to heuristic narrative — identical to the report PDF
|
||||
// behaviour when AI is unconfigured.
|
||||
reportNarrator reporting.Narrator
|
||||
reportFleetNarrator reporting.FleetNarrator
|
||||
reportFindingsProvider reporting.FindingsProvider
|
||||
|
||||
// Tool registry
|
||||
registry *ToolRegistry
|
||||
}
|
||||
|
|
@ -621,6 +643,9 @@ func NewPulseToolExecutor(cfg ExecutorConfig) *PulseToolExecutor {
|
|||
controlLevel: cfg.ControlLevel,
|
||||
protectedGuests: cfg.ProtectedGuests,
|
||||
orgID: normalizeExecutorOrgID(cfg.OrgID),
|
||||
reportNarrator: cfg.ReportNarrator,
|
||||
reportFleetNarrator: cfg.ReportFleetNarrator,
|
||||
reportFindingsProvider: cfg.ReportFindingsProvider,
|
||||
registry: NewToolRegistry(),
|
||||
}
|
||||
|
||||
|
|
@ -698,6 +723,9 @@ func (e *PulseToolExecutor) Clone() *PulseToolExecutor {
|
|||
isAutonomous: e.isAutonomous,
|
||||
orgID: e.orgID,
|
||||
telemetryCallback: e.telemetryCallback,
|
||||
reportNarrator: e.reportNarrator,
|
||||
reportFleetNarrator: e.reportFleetNarrator,
|
||||
reportFindingsProvider: e.reportFindingsProvider,
|
||||
registry: e.registry,
|
||||
}
|
||||
clone.patrolFindingCreator = e.GetPatrolFindingCreator()
|
||||
|
|
|
|||
|
|
@ -155,10 +155,12 @@ func (e *PulseToolExecutor) summarizeResource(
|
|||
}
|
||||
|
||||
req := reporting.MetricReportRequest{
|
||||
ResourceType: canonicalType,
|
||||
ResourceID: resourceID,
|
||||
Start: start,
|
||||
End: end,
|
||||
ResourceType: canonicalType,
|
||||
ResourceID: resourceID,
|
||||
Start: start,
|
||||
End: end,
|
||||
Narrator: e.reportNarrator,
|
||||
FindingsProvider: e.reportFindingsProvider,
|
||||
}
|
||||
narrative, err := engine.NarrativeFor(req)
|
||||
if err != nil {
|
||||
|
|
@ -255,10 +257,13 @@ func (e *PulseToolExecutor) summarizeFleet(
|
|||
}
|
||||
|
||||
req := reporting.MultiReportRequest{
|
||||
Title: "Fleet summary",
|
||||
Start: start,
|
||||
End: end,
|
||||
Resources: resources,
|
||||
Title: "Fleet summary",
|
||||
Start: start,
|
||||
End: end,
|
||||
Resources: resources,
|
||||
FleetNarrator: e.reportFleetNarrator,
|
||||
Narrator: e.reportNarrator,
|
||||
FindingsProvider: e.reportFindingsProvider,
|
||||
}
|
||||
narrative, err := engine.FleetNarrativeFor(req)
|
||||
if err != nil {
|
||||
|
|
|
|||
|
|
@ -230,6 +230,158 @@ func TestSummarizeTool_FleetEnforcesMaxResources(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
// stubReportNarrator implements reporting.Narrator with a recorded
|
||||
// invocation so the test can assert the tool delegates to it.
|
||||
type stubReportNarrator struct {
|
||||
called bool
|
||||
seen reporting.NarrativeInput
|
||||
response reporting.Narrative
|
||||
}
|
||||
|
||||
func (s *stubReportNarrator) Narrate(_ context.Context, in reporting.NarrativeInput) (reporting.Narrative, error) {
|
||||
s.called = true
|
||||
s.seen = in
|
||||
return s.response, nil
|
||||
}
|
||||
|
||||
type stubFleetReportNarrator struct {
|
||||
called bool
|
||||
seen reporting.FleetNarrativeInput
|
||||
response reporting.FleetNarrative
|
||||
}
|
||||
|
||||
func (s *stubFleetReportNarrator) NarrateFleet(_ context.Context, in reporting.FleetNarrativeInput) (reporting.FleetNarrative, error) {
|
||||
s.called = true
|
||||
s.seen = in
|
||||
return s.response, nil
|
||||
}
|
||||
|
||||
func TestSummarizeTool_UsesReportNarratorWhenConfigured(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
store, err := metrics.NewStore(metrics.StoreConfig{
|
||||
DBPath: filepath.Join(dir, "metrics.db"),
|
||||
WriteBufferSize: 10,
|
||||
FlushInterval: 50 * time.Millisecond,
|
||||
RetentionRaw: 24 * time.Hour,
|
||||
RetentionMinute: 7 * 24 * time.Hour,
|
||||
RetentionHourly: 30 * 24 * time.Hour,
|
||||
RetentionDaily: 90 * 24 * time.Hour,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("metrics store: %v", err)
|
||||
}
|
||||
defer store.Close()
|
||||
engine := reporting.NewReportEngine(reporting.EngineConfig{MetricsStore: store})
|
||||
prev := reporting.GetEngine()
|
||||
reporting.SetEngine(engine)
|
||||
defer reporting.SetEngine(prev)
|
||||
|
||||
narrator := &stubReportNarrator{
|
||||
response: reporting.Narrative{
|
||||
Source: reporting.NarrativeSourceAI,
|
||||
HealthStatus: "HEALTHY",
|
||||
HealthMessage: "AI says fine",
|
||||
Observations: []reporting.NarrativeBullet{
|
||||
{Text: "AI bullet", Severity: reporting.NarrativeSeverityOK},
|
||||
},
|
||||
Recommendations: []string{"Continue monitoring"},
|
||||
},
|
||||
}
|
||||
exec := NewPulseToolExecutor(ExecutorConfig{
|
||||
ReportNarrator: narrator,
|
||||
})
|
||||
|
||||
res, err := exec.executeSummarize(context.Background(), map[string]interface{}{
|
||||
"action": "resource",
|
||||
"resource_type": "node",
|
||||
"resource_id": "node-with-narrator",
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("executeSummarize: %v", err)
|
||||
}
|
||||
if res.IsError {
|
||||
t.Fatalf("unexpected error: %+v", res.Content)
|
||||
}
|
||||
if !narrator.called {
|
||||
t.Fatal("expected narrator to be invoked")
|
||||
}
|
||||
var parsed summarizeResourceResponse
|
||||
if err := json.Unmarshal([]byte(res.Content[0].Text), &parsed); err != nil {
|
||||
t.Fatalf("decode response: %v", err)
|
||||
}
|
||||
if parsed.NarrativeSource != reporting.NarrativeSourceAI {
|
||||
t.Errorf("NarrativeSource = %q, want ai", parsed.NarrativeSource)
|
||||
}
|
||||
if parsed.HealthMessage != "AI says fine" {
|
||||
t.Errorf("HealthMessage = %q, want AI says fine", parsed.HealthMessage)
|
||||
}
|
||||
if len(parsed.Observations) != 1 || parsed.Observations[0].Text != "AI bullet" {
|
||||
t.Errorf("Observations = %#v", parsed.Observations)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSummarizeTool_FleetUsesFleetNarratorWhenConfigured(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
store, err := metrics.NewStore(metrics.StoreConfig{
|
||||
DBPath: filepath.Join(dir, "metrics.db"),
|
||||
WriteBufferSize: 10,
|
||||
FlushInterval: 50 * time.Millisecond,
|
||||
RetentionRaw: 24 * time.Hour,
|
||||
RetentionMinute: 7 * 24 * time.Hour,
|
||||
RetentionHourly: 30 * 24 * time.Hour,
|
||||
RetentionDaily: 90 * 24 * time.Hour,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("metrics store: %v", err)
|
||||
}
|
||||
defer store.Close()
|
||||
engine := reporting.NewReportEngine(reporting.EngineConfig{MetricsStore: store})
|
||||
prev := reporting.GetEngine()
|
||||
reporting.SetEngine(engine)
|
||||
defer reporting.SetEngine(prev)
|
||||
|
||||
fleet := &stubFleetReportNarrator{
|
||||
response: reporting.FleetNarrative{
|
||||
Source: reporting.NarrativeSourceAI,
|
||||
HealthStatus: "WARNING",
|
||||
HealthMessage: "Memory creeping up",
|
||||
ExecutiveSummary: "AI fleet summary text",
|
||||
Outliers: []reporting.FleetOutlier{
|
||||
{ResourceID: "node-a", ResourceName: "alpha", Reason: "Memory at 92%", Severity: reporting.NarrativeSeverityWarning},
|
||||
},
|
||||
Recommendations: []string{"Review memory allocation"},
|
||||
},
|
||||
}
|
||||
exec := NewPulseToolExecutor(ExecutorConfig{
|
||||
ReportFleetNarrator: fleet,
|
||||
})
|
||||
|
||||
res, err := exec.executeSummarize(context.Background(), map[string]interface{}{
|
||||
"action": "fleet",
|
||||
"resource_type": "node",
|
||||
"resource_ids": "node-a,node-b",
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("executeSummarize: %v", err)
|
||||
}
|
||||
if res.IsError {
|
||||
t.Fatalf("unexpected error: %+v", res.Content)
|
||||
}
|
||||
if !fleet.called {
|
||||
t.Fatal("expected fleet narrator to be invoked")
|
||||
}
|
||||
var parsed summarizeFleetResponse
|
||||
if err := json.Unmarshal([]byte(res.Content[0].Text), &parsed); err != nil {
|
||||
t.Fatalf("decode response: %v", err)
|
||||
}
|
||||
if parsed.NarrativeSource != reporting.NarrativeSourceAI {
|
||||
t.Errorf("NarrativeSource = %q, want ai", parsed.NarrativeSource)
|
||||
}
|
||||
if len(parsed.Outliers) != 1 || parsed.Outliers[0].ResourceName != "alpha" {
|
||||
t.Errorf("Outliers = %#v", parsed.Outliers)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSummarizeRangeWindow(t *testing.T) {
|
||||
cases := map[string]time.Duration{
|
||||
"24h": 24 * time.Hour,
|
||||
|
|
|
|||
|
|
@ -23,6 +23,7 @@ import (
|
|||
recoverymanager "github.com/rcourtman/pulse-go-rewrite/internal/recovery/manager"
|
||||
"github.com/rcourtman/pulse-go-rewrite/internal/unifiedresources"
|
||||
"github.com/rcourtman/pulse-go-rewrite/pkg/aicontracts"
|
||||
"github.com/rcourtman/pulse-go-rewrite/pkg/reporting"
|
||||
"github.com/rs/zerolog/log"
|
||||
)
|
||||
|
||||
|
|
@ -111,6 +112,36 @@ type AIHandler struct {
|
|||
approvalCreatedCallback func(*approval.ApprovalRequest)
|
||||
controlLevelResolver func(context.Context, *config.AIConfig) string
|
||||
patrolRunProvider patrolRunHandoffProvider
|
||||
|
||||
// reportNarratorResolver returns the per-tenant report-narration
|
||||
// interfaces (single-resource, fleet, findings) for chat sessions
|
||||
// so the pulse_summarize tool can produce AI-narrated synthesis
|
||||
// using the same provider, sanitizer, budget gate, and cost ledger
|
||||
// the report PDF endpoint already uses. Wired by the router from
|
||||
// AISettingsHandler.GetAIService; absent values cause the tool to
|
||||
// return heuristic narrative. The ctx must carry the tenant org
|
||||
// via GetOrgID — same convention the reporting handler uses.
|
||||
reportNarratorResolver func(ctx context.Context) (reporting.Narrator, reporting.FleetNarrator, reporting.FindingsProvider)
|
||||
}
|
||||
|
||||
// SetReportNarratorResolver wires the optional per-tenant
|
||||
// report-narrator resolver. When unset (or when the resolver returns
|
||||
// nil), chat sessions construct their tool executor without report
|
||||
// narrators and pulse_summarize falls back to heuristic narrative.
|
||||
func (h *AIHandler) SetReportNarratorResolver(
|
||||
resolver func(ctx context.Context) (reporting.Narrator, reporting.FleetNarrator, reporting.FindingsProvider),
|
||||
) {
|
||||
if h == nil {
|
||||
return
|
||||
}
|
||||
h.reportNarratorResolver = resolver
|
||||
}
|
||||
|
||||
func (h *AIHandler) resolveReportNarrator(ctx context.Context) (reporting.Narrator, reporting.FleetNarrator, reporting.FindingsProvider) {
|
||||
if h == nil || h.reportNarratorResolver == nil {
|
||||
return nil, nil, nil
|
||||
}
|
||||
return h.reportNarratorResolver(ctx)
|
||||
}
|
||||
|
||||
// newChatService is the factory function for creating the AI service.
|
||||
|
|
@ -474,6 +505,8 @@ func (h *AIHandler) initTenantService(ctx context.Context, orgID string) AIServi
|
|||
}
|
||||
}
|
||||
|
||||
chatCfg.ReportNarrator, chatCfg.ReportFleetNarrator, chatCfg.ReportFindingsProvider = h.resolveReportNarrator(tenantCtx)
|
||||
|
||||
svc := newChatService(chatCfg)
|
||||
if err := svc.Start(ctx); err != nil {
|
||||
log.Error().Str("orgID", orgID).Err(err).Msg("Failed to start AI service for tenant")
|
||||
|
|
@ -708,6 +741,8 @@ func (h *AIHandler) Start(ctx context.Context, monitor *monitoring.Monitor) erro
|
|||
chatCfg.RecoveryPointsProvider = tools.NewRecoveryPointsMCPAdapter(recoveryManager, orgID)
|
||||
}
|
||||
|
||||
chatCfg.ReportNarrator, chatCfg.ReportFleetNarrator, chatCfg.ReportFindingsProvider = h.resolveReportNarrator(serviceCtx)
|
||||
|
||||
svc := newChatService(chatCfg)
|
||||
if err := svc.Start(ctx); err != nil {
|
||||
return fmt.Errorf("start AI chat service: %w", err)
|
||||
|
|
|
|||
|
|
@ -646,6 +646,26 @@ func (r *Router) setupRoutes() {
|
|||
r.aiHandler.SetServiceInitializer(func(ctx context.Context, service AIService) {
|
||||
r.wireAIChatDependenciesForService(ctx, service)
|
||||
})
|
||||
// Wire the per-tenant report-narrator resolver into the chat
|
||||
// handler so the pulse_summarize tool can produce AI-narrated
|
||||
// synthesis using the same Service that powers the report PDF
|
||||
// endpoint. The AI service implements all three interfaces; an
|
||||
// unconfigured tenant returns nil and the tool falls back to
|
||||
// heuristic narrative.
|
||||
r.aiHandler.SetReportNarratorResolver(func(ctx context.Context) (reporting.Narrator, reporting.FleetNarrator, reporting.FindingsProvider) {
|
||||
if r.aiSettingsHandler == nil {
|
||||
return nil, nil, nil
|
||||
}
|
||||
svc := r.aiSettingsHandler.GetAIService(ctx)
|
||||
if svc == nil {
|
||||
return nil, nil, nil
|
||||
}
|
||||
cfg := svc.GetAIConfig()
|
||||
if cfg == nil || !cfg.Enabled {
|
||||
return nil, nil, nil
|
||||
}
|
||||
return svc, svc, svc
|
||||
})
|
||||
|
||||
// AI-powered infrastructure discovery handlers
|
||||
// Note: The actual service is wired up later via SetDiscoveryService
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue