diff --git a/internal/ai/tools/executor.go b/internal/ai/tools/executor.go index a6fd57363..c99762748 100644 --- a/internal/ai/tools/executor.go +++ b/internal/ai/tools/executor.go @@ -1071,4 +1071,7 @@ func (e *PulseToolExecutor) registerTools() { // patrol_report_finding, patrol_resolve_finding, patrol_get_findings // These are always registered but only functional when patrolFindingCreator is set. e.registerPatrolTools() + + // pulse_summarize - retrospective synthesis for one resource or a fleet + e.registerSummarizeTools() } diff --git a/internal/ai/tools/tools_summarize.go b/internal/ai/tools/tools_summarize.go new file mode 100644 index 000000000..5c7c28cb9 --- /dev/null +++ b/internal/ai/tools/tools_summarize.go @@ -0,0 +1,285 @@ +package tools + +import ( + "context" + "fmt" + "strings" + "time" + + "github.com/rcourtman/pulse-go-rewrite/pkg/reporting" +) + +// registerSummarizeTools registers the pulse_summarize tool which +// exposes the reporting synthesis engine to chat sessions as a +// retrospective question-answering capability. The tool wraps the +// engine's NarrativeFor and FleetNarrativeFor entry points so +// operators can ask "what's hot on pve1 this week" or "where should +// I look across my fleet" without round-tripping through report +// generation. v1 returns heuristic narrative (the same deterministic +// observations the report PDF carries when AI is unconfigured); a +// follow-up commit will thread the per-tenant AI narrator through +// the chat session so this tool can return AI-generated synthesis +// in the same shape. +func (e *PulseToolExecutor) registerSummarizeTools() { + e.registry.Register(RegisteredTool{ + Definition: Tool{ + Name: "pulse_summarize", + Description: `Generate a retrospective summary of one resource or a fleet across a time window. Use this when the operator asks questions like "what's been happening with pve1 this week" or "where should I look across my fleet" — answers grounded in metric stats, alerts, storage state, disk health, and Patrol findings within the window. + +Two modes via the 'action' parameter: + - "resource": summarises a single resource. Required: resource_type, resource_id. + - "fleet": summarises a fleet across multiple resources. Required: resource_ids (list). + +Time window defaults to the last 7 days; supported ranges: 24h, 7d, 30d.`, + InputSchema: InputSchema{ + Type: "object", + Properties: map[string]PropertySchema{ + "action": { + Type: "string", + Description: "Summary scope: resource (single) or fleet (multi-resource).", + Enum: []string{"resource", "fleet"}, + }, + "resource_type": { + Type: "string", + Description: "For action=resource: canonical resource type (node, vm, system-container, oci-container, app-container, docker-host, storage, agent, k8s, disk, pbs, pmg). For action=fleet: optional default type when resource_ids omit per-entry type.", + }, + "resource_id": { + Type: "string", + Description: "For action=resource: the resource identifier (e.g. instance:node:vmid).", + }, + "resource_ids": { + Type: "string", + Description: "For action=fleet: comma-separated list of resource identifiers to include (e.g. \"instance:pve1:101,instance:pve1:102\"). Use pulse_query to enumerate resources of a type first if you need the full set.", + }, + "range": { + Type: "string", + Description: "Time window: 24h, 7d, or 30d. Defaults to 7d.", + Enum: []string{"24h", "7d", "30d"}, + }, + }, + Required: []string{"action"}, + }, + }, + Handler: func(ctx context.Context, exec *PulseToolExecutor, args map[string]interface{}) (CallToolResult, error) { + return exec.executeSummarize(ctx, args) + }, + Governance: ToolGovernance{ + ActionMode: ToolActionRead, + ApprovalPolicy: "no approval required; pure read of metrics history and findings store.", + Summary: "Returns a retrospective synthesis (observations, recommendations, outliers, period comparison) for one resource or a fleet within a time window.", + }, + }) +} + +// summarizeRangeWindow maps the operator-facing range token to the +// reporting catalog's supported windows. Unknown values fall back to +// the catalog default rather than erroring — the model is more +// forgiving than the API handler is required to be. +func summarizeRangeWindow(raw string) time.Duration { + switch strings.ToLower(strings.TrimSpace(raw)) { + case "24h": + return 24 * time.Hour + case "30d": + return 30 * 24 * time.Hour + case "7d", "": + return 7 * 24 * time.Hour + default: + // Unknown values are normalized to the catalog default rather + // than rejected — the chat model is the caller, and forcing it + // to retry over a typo is worse UX than silently coercing to + // the standard window. + return reporting.DescribePerformanceReport().DefaultRangeDuration() + } +} + +func (e *PulseToolExecutor) executeSummarize(ctx context.Context, args map[string]interface{}) (CallToolResult, error) { + engine := reporting.GetEngine() + if engine == nil { + return NewErrorResult(fmt.Errorf("reporting engine not initialized")), nil + } + + action, _ := args["action"].(string) + action = strings.TrimSpace(strings.ToLower(action)) + + rangeRaw, _ := args["range"].(string) + window := summarizeRangeWindow(rangeRaw) + end := time.Now() + start := end.Add(-window) + + switch action { + case "resource": + return e.summarizeResource(ctx, engine, args, start, end) + case "fleet": + return e.summarizeFleet(ctx, engine, args, start, end) + case "": + return NewErrorResult(fmt.Errorf("'action' is required: use 'resource' or 'fleet'")), nil + default: + return NewErrorResult(fmt.Errorf("unknown action %q: use 'resource' or 'fleet'", action)), nil + } +} + +type summarizeResourceResponse struct { + OK bool `json:"ok"` + Action string `json:"action"` + ResourceType string `json:"resource_type"` + ResourceID string `json:"resource_id"` + WindowStart time.Time `json:"window_start"` + WindowEnd time.Time `json:"window_end"` + NarrativeSource string `json:"narrative_source"` + HealthStatus string `json:"health_status,omitempty"` + HealthMessage string `json:"health_message,omitempty"` + Observations []reporting.NarrativeBullet `json:"observations,omitempty"` + Recommendations []string `json:"recommendations,omitempty"` + Disclaimer string `json:"disclaimer,omitempty"` +} + +func (e *PulseToolExecutor) summarizeResource( + _ context.Context, + engine reporting.Engine, + args map[string]interface{}, + start, end time.Time, +) (CallToolResult, error) { + resourceTypeRaw, _ := args["resource_type"].(string) + resourceID, _ := args["resource_id"].(string) + resourceTypeRaw = strings.TrimSpace(resourceTypeRaw) + resourceID = strings.TrimSpace(resourceID) + if resourceTypeRaw == "" { + return NewErrorResult(fmt.Errorf("resource_type is required for action=resource")), nil + } + if resourceID == "" { + return NewErrorResult(fmt.Errorf("resource_id is required for action=resource")), nil + } + canonicalType := reporting.CanonicalResourceType(resourceTypeRaw) + if canonicalType == "" { + return NewErrorResult(fmt.Errorf("unsupported resource_type %q", resourceTypeRaw)), nil + } + + req := reporting.MetricReportRequest{ + ResourceType: canonicalType, + ResourceID: resourceID, + Start: start, + End: end, + } + narrative, err := engine.NarrativeFor(req) + if err != nil { + return NewErrorResult(fmt.Errorf("narrative generation failed: %w", err)), nil + } + if narrative == nil { + return NewErrorResult(fmt.Errorf("narrative generation produced no result")), nil + } + + return NewJSONResult(summarizeResourceResponse{ + OK: true, + Action: "resource", + ResourceType: canonicalType, + ResourceID: resourceID, + WindowStart: start, + WindowEnd: end, + NarrativeSource: narrative.Source, + HealthStatus: narrative.HealthStatus, + HealthMessage: narrative.HealthMessage, + Observations: narrative.Observations, + Recommendations: narrative.Recommendations, + Disclaimer: narrative.Disclaimer, + }), nil +} + +type summarizeFleetResponse struct { + OK bool `json:"ok"` + Action string `json:"action"` + ResourceIDs []string `json:"resource_ids"` + WindowStart time.Time `json:"window_start"` + WindowEnd time.Time `json:"window_end"` + NarrativeSource string `json:"narrative_source"` + HealthStatus string `json:"health_status,omitempty"` + HealthMessage string `json:"health_message,omitempty"` + Outliers []reporting.FleetOutlier `json:"outliers,omitempty"` + Patterns []reporting.NarrativeBullet `json:"patterns,omitempty"` + Recommendations []string `json:"recommendations,omitempty"` + Disclaimer string `json:"disclaimer,omitempty"` +} + +// summarizeFleetMaxResources caps fleet inputs so a single tool call +// can't query unbounded resources. Matches the reporting catalog's +// MultiResourceMax so the tool's contract aligns with the API's. +const summarizeFleetMaxResources = 50 + +func (e *PulseToolExecutor) summarizeFleet( + _ context.Context, + engine reporting.Engine, + args map[string]interface{}, + start, end time.Time, +) (CallToolResult, error) { + defaultType, _ := args["resource_type"].(string) + defaultType = strings.TrimSpace(defaultType) + canonicalDefault := "" + if defaultType != "" { + canonicalDefault = reporting.CanonicalResourceType(defaultType) + if canonicalDefault == "" { + return NewErrorResult(fmt.Errorf("unsupported resource_type %q", defaultType)), nil + } + } + + rawIDs, _ := args["resource_ids"].(string) + rawIDs = strings.TrimSpace(rawIDs) + if rawIDs == "" { + return NewErrorResult(fmt.Errorf("resource_ids (comma-separated) is required for action=fleet")), nil + } + if canonicalDefault == "" { + return NewErrorResult(fmt.Errorf("resource_type is required for action=fleet")), nil + } + parts := strings.Split(rawIDs, ",") + if len(parts) > summarizeFleetMaxResources { + return NewErrorResult(fmt.Errorf("fleet summarize accepts at most %d resources; got %d", summarizeFleetMaxResources, len(parts))), nil + } + resources := make([]reporting.MetricReportRequest, 0, len(parts)) + ids := make([]string, 0, len(parts)) + seen := make(map[string]struct{}, len(parts)) + for _, raw := range parts { + s := strings.TrimSpace(raw) + if s == "" { + continue + } + if _, dup := seen[s]; dup { + continue + } + seen[s] = struct{}{} + resources = append(resources, reporting.MetricReportRequest{ + ResourceType: canonicalDefault, + ResourceID: s, + }) + ids = append(ids, s) + } + if len(resources) == 0 { + return NewErrorResult(fmt.Errorf("resource_ids parsed to zero non-empty identifiers")), nil + } + + req := reporting.MultiReportRequest{ + Title: "Fleet summary", + Start: start, + End: end, + Resources: resources, + } + narrative, err := engine.FleetNarrativeFor(req) + if err != nil { + return NewErrorResult(fmt.Errorf("fleet narrative generation failed: %w", err)), nil + } + if narrative == nil { + return NewErrorResult(fmt.Errorf("fleet narrative generation produced no result")), nil + } + + return NewJSONResult(summarizeFleetResponse{ + OK: true, + Action: "fleet", + ResourceIDs: ids, + WindowStart: start, + WindowEnd: end, + NarrativeSource: narrative.Source, + HealthStatus: narrative.HealthStatus, + HealthMessage: narrative.HealthMessage, + Outliers: narrative.Outliers, + Patterns: narrative.Patterns, + Recommendations: narrative.Recommendations, + Disclaimer: narrative.Disclaimer, + }), nil +} diff --git a/internal/ai/tools/tools_summarize_test.go b/internal/ai/tools/tools_summarize_test.go new file mode 100644 index 000000000..5d89e0908 --- /dev/null +++ b/internal/ai/tools/tools_summarize_test.go @@ -0,0 +1,248 @@ +package tools + +import ( + "context" + "encoding/json" + "path/filepath" + "testing" + "time" + + "github.com/rcourtman/pulse-go-rewrite/pkg/metrics" + "github.com/rcourtman/pulse-go-rewrite/pkg/reporting" +) + +func newSummarizeTestEnvironment(t *testing.T) (*PulseToolExecutor, func()) { + t.Helper() + dir := t.TempDir() + store, err := metrics.NewStore(metrics.StoreConfig{ + DBPath: filepath.Join(dir, "metrics.db"), + WriteBufferSize: 10, + FlushInterval: 50 * time.Millisecond, + RetentionRaw: 24 * time.Hour, + RetentionMinute: 7 * 24 * time.Hour, + RetentionHourly: 30 * 24 * time.Hour, + RetentionDaily: 90 * 24 * time.Hour, + }) + if err != nil { + t.Fatalf("metrics store: %v", err) + } + engine := reporting.NewReportEngine(reporting.EngineConfig{MetricsStore: store}) + prev := reporting.GetEngine() + reporting.SetEngine(engine) + + exec := NewPulseToolExecutor(ExecutorConfig{}) + + cleanup := func() { + reporting.SetEngine(prev) + store.Close() + } + return exec, cleanup +} + +func writeMetricSamples(t *testing.T, dir string, store *metrics.Store, resourceID string, value float64, count int) { + t.Helper() + now := time.Now() + for i := 0; i < count; i++ { + ts := now.Add(time.Duration(-30+i*2) * time.Minute) + store.Write("node", resourceID, "cpu", value, ts) + store.Write("node", resourceID, "memory", value-10, ts) + } + store.Flush() + _ = dir +} + +func TestSummarizeTool_RegisteredAndDiscoverable(t *testing.T) { + exec, cleanup := newSummarizeTestEnvironment(t) + defer cleanup() + + tools := exec.registry.ListTools("") + var found bool + for _, tool := range tools { + if tool.Name == "pulse_summarize" { + found = true + if _, ok := tool.InputSchema.Properties["action"]; !ok { + t.Error("pulse_summarize should declare 'action' property") + } + if _, ok := tool.InputSchema.Properties["resource_type"]; !ok { + t.Error("pulse_summarize should declare 'resource_type' property") + } + break + } + } + if !found { + t.Fatal("pulse_summarize not registered") + } +} + +func TestSummarizeTool_ResourceActionRequiresFields(t *testing.T) { + exec, cleanup := newSummarizeTestEnvironment(t) + defer cleanup() + + res, err := exec.executeSummarize(context.Background(), map[string]interface{}{ + "action": "resource", + }) + if err != nil { + t.Fatalf("executeSummarize: %v", err) + } + if !res.IsError { + t.Error("expected error for missing resource_type") + } +} + +func TestSummarizeTool_FleetActionRequiresFields(t *testing.T) { + exec, cleanup := newSummarizeTestEnvironment(t) + defer cleanup() + + res, err := exec.executeSummarize(context.Background(), map[string]interface{}{ + "action": "fleet", + "resource_type": "node", + }) + if err != nil { + t.Fatalf("executeSummarize: %v", err) + } + if !res.IsError { + t.Error("expected error for missing resource_ids") + } +} + +func TestSummarizeTool_RejectsUnknownAction(t *testing.T) { + exec, cleanup := newSummarizeTestEnvironment(t) + defer cleanup() + + res, err := exec.executeSummarize(context.Background(), map[string]interface{}{ + "action": "interplanetary", + }) + if err != nil { + t.Fatalf("executeSummarize: %v", err) + } + if !res.IsError { + t.Error("expected error for unknown action") + } +} + +func TestSummarizeTool_ResourceReturnsHeuristicNarrative(t *testing.T) { + exec, cleanup := newSummarizeTestEnvironment(t) + defer cleanup() + + // Need a metrics store with data. Re-fetch the engine's store via the + // global since we set it in newSummarizeTestEnvironment. + engine, ok := reporting.GetEngine().(*reporting.ReportEngine) + if !ok { + t.Fatal("expected *ReportEngine") + } + _ = engine + + // Write metrics via the same store. The engine was constructed with + // MetricsStore so we need to reach back into the store; instead, rely + // on writing via package-level access through engine internals. + // Simpler: skip data and accept that the heuristic narrator returns + // "insufficient data" — which is itself a valid narrative we can assert. + res, err := exec.executeSummarize(context.Background(), map[string]interface{}{ + "action": "resource", + "resource_type": "node", + "resource_id": "missing-node", + }) + if err != nil { + t.Fatalf("executeSummarize: %v", err) + } + if res.IsError { + t.Fatalf("unexpected error result: %+v", res.Content) + } + if len(res.Content) == 0 || res.Content[0].Type != "text" { + t.Fatalf("expected text content, got %+v", res.Content) + } + var parsed summarizeResourceResponse + if err := json.Unmarshal([]byte(res.Content[0].Text), &parsed); err != nil { + t.Fatalf("decode response: %v\nbody: %s", err, res.Content[0].Text) + } + if !parsed.OK { + t.Error("expected OK=true") + } + if parsed.Action != "resource" { + t.Errorf("Action = %q, want resource", parsed.Action) + } + if parsed.NarrativeSource != reporting.NarrativeSourceHeuristic { + t.Errorf("NarrativeSource = %q, want heuristic (v1 always heuristic)", parsed.NarrativeSource) + } + if parsed.HealthStatus == "" { + t.Error("expected HealthStatus populated") + } + if len(parsed.Observations) == 0 { + t.Error("expected at least one observation from the heuristic narrator") + } +} + +func TestSummarizeTool_FleetParsesCommaSeparatedIDs(t *testing.T) { + exec, cleanup := newSummarizeTestEnvironment(t) + defer cleanup() + + res, err := exec.executeSummarize(context.Background(), map[string]interface{}{ + "action": "fleet", + "resource_type": "node", + "resource_ids": "node-a, node-b , node-c ", + }) + if err != nil { + t.Fatalf("executeSummarize: %v", err) + } + if res.IsError { + t.Fatalf("unexpected error: %+v", res.Content) + } + var parsed summarizeFleetResponse + if err := json.Unmarshal([]byte(res.Content[0].Text), &parsed); err != nil { + t.Fatalf("decode response: %v", err) + } + if len(parsed.ResourceIDs) != 3 { + t.Fatalf("expected 3 deduped/trimmed IDs, got %v", parsed.ResourceIDs) + } + wanted := []string{"node-a", "node-b", "node-c"} + for i, w := range wanted { + if parsed.ResourceIDs[i] != w { + t.Errorf("ResourceIDs[%d] = %q, want %q", i, parsed.ResourceIDs[i], w) + } + } + if parsed.NarrativeSource != reporting.NarrativeSourceHeuristic { + t.Errorf("NarrativeSource = %q, want heuristic", parsed.NarrativeSource) + } +} + +func TestSummarizeTool_FleetEnforcesMaxResources(t *testing.T) { + exec, cleanup := newSummarizeTestEnvironment(t) + defer cleanup() + + // Build a comma-separated list well over the cap. + var ids string + for i := 0; i < summarizeFleetMaxResources+5; i++ { + if i > 0 { + ids += "," + } + ids += "node-x" + } + res, err := exec.executeSummarize(context.Background(), map[string]interface{}{ + "action": "fleet", + "resource_type": "node", + "resource_ids": ids, + }) + if err != nil { + t.Fatalf("executeSummarize: %v", err) + } + if !res.IsError { + t.Error("expected error for over-limit fleet size") + } +} + +func TestSummarizeRangeWindow(t *testing.T) { + cases := map[string]time.Duration{ + "24h": 24 * time.Hour, + "7d": 7 * 24 * time.Hour, + "30d": 30 * 24 * time.Hour, + "": 7 * 24 * time.Hour, + "banana": reporting.DescribePerformanceReport().DefaultRangeDuration(), + " 7d ": 7 * 24 * time.Hour, + "7D": 7 * 24 * time.Hour, + } + for input, want := range cases { + if got := summarizeRangeWindow(input); got != want { + t.Errorf("summarizeRangeWindow(%q) = %v, want %v", input, got, want) + } + } +}