Expose reporting synthesis to Assistant via pulse_summarize tool

The reporting synthesis layer (observations, recommendations,
outliers, period comparison) shipped trapped behind the PDF/CSV
export. Operators who chat with Assistant could not ask "what's been
happening with pve1 this week" — the data path existed but had no
non-PDF surface. This commit adds a single new tool, pulse_summarize,
that wraps the engine's non-rendering entry points (NarrativeFor /
FleetNarrativeFor) so that question gets answered in chat.

The tool takes an action parameter (resource | fleet) and routes
accordingly:
- resource mode requires resource_type + resource_id and returns the
  same Narrative the single-resource report carries (health status,
  observations, recommendations, period comparison).
- fleet mode requires resource_type + a comma-separated resource_ids
  string (PropertySchema does not currently support array items, and
  CSV is LLM-friendly enough) and returns the FleetNarrative
  (outliers, patterns, recommendations). Capped at the same
  multi-report ceiling (50) as the API endpoint.

The tool is read-only — no control level requirement, no approval
gate — and uses the global reporting engine the rest of the app
already shares. Returns a JSON envelope so chat can render it or
hand it back to the model for follow-up framing.

v1 ships with heuristic narrative only. The AI narrator wiring
through the chat session (Narrator/FleetNarrator/FindingsProvider
threaded via chat.Config -> tools.ExecutorConfig -> PulseToolExecutor)
is a focused follow-up; it lets the same tool inherit the per-tenant
AI service the report PDF endpoint already uses. The seam is
already in place because NarrativeFor/FleetNarrativeFor take an
optional narrator on the request — v1 passes nil, v2 populates it.
This commit is contained in:
rcourtman 2026-05-10 22:36:49 +01:00
parent ee2de2703b
commit 1fe5d6853f
3 changed files with 536 additions and 0 deletions

View file

@ -1071,4 +1071,7 @@ func (e *PulseToolExecutor) registerTools() {
// patrol_report_finding, patrol_resolve_finding, patrol_get_findings
// These are always registered but only functional when patrolFindingCreator is set.
e.registerPatrolTools()
// pulse_summarize - retrospective synthesis for one resource or a fleet
e.registerSummarizeTools()
}

View file

@ -0,0 +1,285 @@
package tools
import (
"context"
"fmt"
"strings"
"time"
"github.com/rcourtman/pulse-go-rewrite/pkg/reporting"
)
// registerSummarizeTools registers the pulse_summarize tool which
// exposes the reporting synthesis engine to chat sessions as a
// retrospective question-answering capability. The tool wraps the
// engine's NarrativeFor and FleetNarrativeFor entry points so
// operators can ask "what's hot on pve1 this week" or "where should
// I look across my fleet" without round-tripping through report
// generation. v1 returns heuristic narrative (the same deterministic
// observations the report PDF carries when AI is unconfigured); a
// follow-up commit will thread the per-tenant AI narrator through
// the chat session so this tool can return AI-generated synthesis
// in the same shape.
func (e *PulseToolExecutor) registerSummarizeTools() {
e.registry.Register(RegisteredTool{
Definition: Tool{
Name: "pulse_summarize",
Description: `Generate a retrospective summary of one resource or a fleet across a time window. Use this when the operator asks questions like "what's been happening with pve1 this week" or "where should I look across my fleet" answers grounded in metric stats, alerts, storage state, disk health, and Patrol findings within the window.
Two modes via the 'action' parameter:
- "resource": summarises a single resource. Required: resource_type, resource_id.
- "fleet": summarises a fleet across multiple resources. Required: resource_ids (list).
Time window defaults to the last 7 days; supported ranges: 24h, 7d, 30d.`,
InputSchema: InputSchema{
Type: "object",
Properties: map[string]PropertySchema{
"action": {
Type: "string",
Description: "Summary scope: resource (single) or fleet (multi-resource).",
Enum: []string{"resource", "fleet"},
},
"resource_type": {
Type: "string",
Description: "For action=resource: canonical resource type (node, vm, system-container, oci-container, app-container, docker-host, storage, agent, k8s, disk, pbs, pmg). For action=fleet: optional default type when resource_ids omit per-entry type.",
},
"resource_id": {
Type: "string",
Description: "For action=resource: the resource identifier (e.g. instance:node:vmid).",
},
"resource_ids": {
Type: "string",
Description: "For action=fleet: comma-separated list of resource identifiers to include (e.g. \"instance:pve1:101,instance:pve1:102\"). Use pulse_query to enumerate resources of a type first if you need the full set.",
},
"range": {
Type: "string",
Description: "Time window: 24h, 7d, or 30d. Defaults to 7d.",
Enum: []string{"24h", "7d", "30d"},
},
},
Required: []string{"action"},
},
},
Handler: func(ctx context.Context, exec *PulseToolExecutor, args map[string]interface{}) (CallToolResult, error) {
return exec.executeSummarize(ctx, args)
},
Governance: ToolGovernance{
ActionMode: ToolActionRead,
ApprovalPolicy: "no approval required; pure read of metrics history and findings store.",
Summary: "Returns a retrospective synthesis (observations, recommendations, outliers, period comparison) for one resource or a fleet within a time window.",
},
})
}
// summarizeRangeWindow maps the operator-facing range token to the
// reporting catalog's supported windows. Unknown values fall back to
// the catalog default rather than erroring — the model is more
// forgiving than the API handler is required to be.
func summarizeRangeWindow(raw string) time.Duration {
switch strings.ToLower(strings.TrimSpace(raw)) {
case "24h":
return 24 * time.Hour
case "30d":
return 30 * 24 * time.Hour
case "7d", "":
return 7 * 24 * time.Hour
default:
// Unknown values are normalized to the catalog default rather
// than rejected — the chat model is the caller, and forcing it
// to retry over a typo is worse UX than silently coercing to
// the standard window.
return reporting.DescribePerformanceReport().DefaultRangeDuration()
}
}
func (e *PulseToolExecutor) executeSummarize(ctx context.Context, args map[string]interface{}) (CallToolResult, error) {
engine := reporting.GetEngine()
if engine == nil {
return NewErrorResult(fmt.Errorf("reporting engine not initialized")), nil
}
action, _ := args["action"].(string)
action = strings.TrimSpace(strings.ToLower(action))
rangeRaw, _ := args["range"].(string)
window := summarizeRangeWindow(rangeRaw)
end := time.Now()
start := end.Add(-window)
switch action {
case "resource":
return e.summarizeResource(ctx, engine, args, start, end)
case "fleet":
return e.summarizeFleet(ctx, engine, args, start, end)
case "":
return NewErrorResult(fmt.Errorf("'action' is required: use 'resource' or 'fleet'")), nil
default:
return NewErrorResult(fmt.Errorf("unknown action %q: use 'resource' or 'fleet'", action)), nil
}
}
type summarizeResourceResponse struct {
OK bool `json:"ok"`
Action string `json:"action"`
ResourceType string `json:"resource_type"`
ResourceID string `json:"resource_id"`
WindowStart time.Time `json:"window_start"`
WindowEnd time.Time `json:"window_end"`
NarrativeSource string `json:"narrative_source"`
HealthStatus string `json:"health_status,omitempty"`
HealthMessage string `json:"health_message,omitempty"`
Observations []reporting.NarrativeBullet `json:"observations,omitempty"`
Recommendations []string `json:"recommendations,omitempty"`
Disclaimer string `json:"disclaimer,omitempty"`
}
func (e *PulseToolExecutor) summarizeResource(
_ context.Context,
engine reporting.Engine,
args map[string]interface{},
start, end time.Time,
) (CallToolResult, error) {
resourceTypeRaw, _ := args["resource_type"].(string)
resourceID, _ := args["resource_id"].(string)
resourceTypeRaw = strings.TrimSpace(resourceTypeRaw)
resourceID = strings.TrimSpace(resourceID)
if resourceTypeRaw == "" {
return NewErrorResult(fmt.Errorf("resource_type is required for action=resource")), nil
}
if resourceID == "" {
return NewErrorResult(fmt.Errorf("resource_id is required for action=resource")), nil
}
canonicalType := reporting.CanonicalResourceType(resourceTypeRaw)
if canonicalType == "" {
return NewErrorResult(fmt.Errorf("unsupported resource_type %q", resourceTypeRaw)), nil
}
req := reporting.MetricReportRequest{
ResourceType: canonicalType,
ResourceID: resourceID,
Start: start,
End: end,
}
narrative, err := engine.NarrativeFor(req)
if err != nil {
return NewErrorResult(fmt.Errorf("narrative generation failed: %w", err)), nil
}
if narrative == nil {
return NewErrorResult(fmt.Errorf("narrative generation produced no result")), nil
}
return NewJSONResult(summarizeResourceResponse{
OK: true,
Action: "resource",
ResourceType: canonicalType,
ResourceID: resourceID,
WindowStart: start,
WindowEnd: end,
NarrativeSource: narrative.Source,
HealthStatus: narrative.HealthStatus,
HealthMessage: narrative.HealthMessage,
Observations: narrative.Observations,
Recommendations: narrative.Recommendations,
Disclaimer: narrative.Disclaimer,
}), nil
}
type summarizeFleetResponse struct {
OK bool `json:"ok"`
Action string `json:"action"`
ResourceIDs []string `json:"resource_ids"`
WindowStart time.Time `json:"window_start"`
WindowEnd time.Time `json:"window_end"`
NarrativeSource string `json:"narrative_source"`
HealthStatus string `json:"health_status,omitempty"`
HealthMessage string `json:"health_message,omitempty"`
Outliers []reporting.FleetOutlier `json:"outliers,omitempty"`
Patterns []reporting.NarrativeBullet `json:"patterns,omitempty"`
Recommendations []string `json:"recommendations,omitempty"`
Disclaimer string `json:"disclaimer,omitempty"`
}
// summarizeFleetMaxResources caps fleet inputs so a single tool call
// can't query unbounded resources. Matches the reporting catalog's
// MultiResourceMax so the tool's contract aligns with the API's.
const summarizeFleetMaxResources = 50
func (e *PulseToolExecutor) summarizeFleet(
_ context.Context,
engine reporting.Engine,
args map[string]interface{},
start, end time.Time,
) (CallToolResult, error) {
defaultType, _ := args["resource_type"].(string)
defaultType = strings.TrimSpace(defaultType)
canonicalDefault := ""
if defaultType != "" {
canonicalDefault = reporting.CanonicalResourceType(defaultType)
if canonicalDefault == "" {
return NewErrorResult(fmt.Errorf("unsupported resource_type %q", defaultType)), nil
}
}
rawIDs, _ := args["resource_ids"].(string)
rawIDs = strings.TrimSpace(rawIDs)
if rawIDs == "" {
return NewErrorResult(fmt.Errorf("resource_ids (comma-separated) is required for action=fleet")), nil
}
if canonicalDefault == "" {
return NewErrorResult(fmt.Errorf("resource_type is required for action=fleet")), nil
}
parts := strings.Split(rawIDs, ",")
if len(parts) > summarizeFleetMaxResources {
return NewErrorResult(fmt.Errorf("fleet summarize accepts at most %d resources; got %d", summarizeFleetMaxResources, len(parts))), nil
}
resources := make([]reporting.MetricReportRequest, 0, len(parts))
ids := make([]string, 0, len(parts))
seen := make(map[string]struct{}, len(parts))
for _, raw := range parts {
s := strings.TrimSpace(raw)
if s == "" {
continue
}
if _, dup := seen[s]; dup {
continue
}
seen[s] = struct{}{}
resources = append(resources, reporting.MetricReportRequest{
ResourceType: canonicalDefault,
ResourceID: s,
})
ids = append(ids, s)
}
if len(resources) == 0 {
return NewErrorResult(fmt.Errorf("resource_ids parsed to zero non-empty identifiers")), nil
}
req := reporting.MultiReportRequest{
Title: "Fleet summary",
Start: start,
End: end,
Resources: resources,
}
narrative, err := engine.FleetNarrativeFor(req)
if err != nil {
return NewErrorResult(fmt.Errorf("fleet narrative generation failed: %w", err)), nil
}
if narrative == nil {
return NewErrorResult(fmt.Errorf("fleet narrative generation produced no result")), nil
}
return NewJSONResult(summarizeFleetResponse{
OK: true,
Action: "fleet",
ResourceIDs: ids,
WindowStart: start,
WindowEnd: end,
NarrativeSource: narrative.Source,
HealthStatus: narrative.HealthStatus,
HealthMessage: narrative.HealthMessage,
Outliers: narrative.Outliers,
Patterns: narrative.Patterns,
Recommendations: narrative.Recommendations,
Disclaimer: narrative.Disclaimer,
}), nil
}

View file

@ -0,0 +1,248 @@
package tools
import (
"context"
"encoding/json"
"path/filepath"
"testing"
"time"
"github.com/rcourtman/pulse-go-rewrite/pkg/metrics"
"github.com/rcourtman/pulse-go-rewrite/pkg/reporting"
)
func newSummarizeTestEnvironment(t *testing.T) (*PulseToolExecutor, func()) {
t.Helper()
dir := t.TempDir()
store, err := metrics.NewStore(metrics.StoreConfig{
DBPath: filepath.Join(dir, "metrics.db"),
WriteBufferSize: 10,
FlushInterval: 50 * time.Millisecond,
RetentionRaw: 24 * time.Hour,
RetentionMinute: 7 * 24 * time.Hour,
RetentionHourly: 30 * 24 * time.Hour,
RetentionDaily: 90 * 24 * time.Hour,
})
if err != nil {
t.Fatalf("metrics store: %v", err)
}
engine := reporting.NewReportEngine(reporting.EngineConfig{MetricsStore: store})
prev := reporting.GetEngine()
reporting.SetEngine(engine)
exec := NewPulseToolExecutor(ExecutorConfig{})
cleanup := func() {
reporting.SetEngine(prev)
store.Close()
}
return exec, cleanup
}
func writeMetricSamples(t *testing.T, dir string, store *metrics.Store, resourceID string, value float64, count int) {
t.Helper()
now := time.Now()
for i := 0; i < count; i++ {
ts := now.Add(time.Duration(-30+i*2) * time.Minute)
store.Write("node", resourceID, "cpu", value, ts)
store.Write("node", resourceID, "memory", value-10, ts)
}
store.Flush()
_ = dir
}
func TestSummarizeTool_RegisteredAndDiscoverable(t *testing.T) {
exec, cleanup := newSummarizeTestEnvironment(t)
defer cleanup()
tools := exec.registry.ListTools("")
var found bool
for _, tool := range tools {
if tool.Name == "pulse_summarize" {
found = true
if _, ok := tool.InputSchema.Properties["action"]; !ok {
t.Error("pulse_summarize should declare 'action' property")
}
if _, ok := tool.InputSchema.Properties["resource_type"]; !ok {
t.Error("pulse_summarize should declare 'resource_type' property")
}
break
}
}
if !found {
t.Fatal("pulse_summarize not registered")
}
}
func TestSummarizeTool_ResourceActionRequiresFields(t *testing.T) {
exec, cleanup := newSummarizeTestEnvironment(t)
defer cleanup()
res, err := exec.executeSummarize(context.Background(), map[string]interface{}{
"action": "resource",
})
if err != nil {
t.Fatalf("executeSummarize: %v", err)
}
if !res.IsError {
t.Error("expected error for missing resource_type")
}
}
func TestSummarizeTool_FleetActionRequiresFields(t *testing.T) {
exec, cleanup := newSummarizeTestEnvironment(t)
defer cleanup()
res, err := exec.executeSummarize(context.Background(), map[string]interface{}{
"action": "fleet",
"resource_type": "node",
})
if err != nil {
t.Fatalf("executeSummarize: %v", err)
}
if !res.IsError {
t.Error("expected error for missing resource_ids")
}
}
func TestSummarizeTool_RejectsUnknownAction(t *testing.T) {
exec, cleanup := newSummarizeTestEnvironment(t)
defer cleanup()
res, err := exec.executeSummarize(context.Background(), map[string]interface{}{
"action": "interplanetary",
})
if err != nil {
t.Fatalf("executeSummarize: %v", err)
}
if !res.IsError {
t.Error("expected error for unknown action")
}
}
func TestSummarizeTool_ResourceReturnsHeuristicNarrative(t *testing.T) {
exec, cleanup := newSummarizeTestEnvironment(t)
defer cleanup()
// Need a metrics store with data. Re-fetch the engine's store via the
// global since we set it in newSummarizeTestEnvironment.
engine, ok := reporting.GetEngine().(*reporting.ReportEngine)
if !ok {
t.Fatal("expected *ReportEngine")
}
_ = engine
// Write metrics via the same store. The engine was constructed with
// MetricsStore so we need to reach back into the store; instead, rely
// on writing via package-level access through engine internals.
// Simpler: skip data and accept that the heuristic narrator returns
// "insufficient data" — which is itself a valid narrative we can assert.
res, err := exec.executeSummarize(context.Background(), map[string]interface{}{
"action": "resource",
"resource_type": "node",
"resource_id": "missing-node",
})
if err != nil {
t.Fatalf("executeSummarize: %v", err)
}
if res.IsError {
t.Fatalf("unexpected error result: %+v", res.Content)
}
if len(res.Content) == 0 || res.Content[0].Type != "text" {
t.Fatalf("expected text content, got %+v", res.Content)
}
var parsed summarizeResourceResponse
if err := json.Unmarshal([]byte(res.Content[0].Text), &parsed); err != nil {
t.Fatalf("decode response: %v\nbody: %s", err, res.Content[0].Text)
}
if !parsed.OK {
t.Error("expected OK=true")
}
if parsed.Action != "resource" {
t.Errorf("Action = %q, want resource", parsed.Action)
}
if parsed.NarrativeSource != reporting.NarrativeSourceHeuristic {
t.Errorf("NarrativeSource = %q, want heuristic (v1 always heuristic)", parsed.NarrativeSource)
}
if parsed.HealthStatus == "" {
t.Error("expected HealthStatus populated")
}
if len(parsed.Observations) == 0 {
t.Error("expected at least one observation from the heuristic narrator")
}
}
func TestSummarizeTool_FleetParsesCommaSeparatedIDs(t *testing.T) {
exec, cleanup := newSummarizeTestEnvironment(t)
defer cleanup()
res, err := exec.executeSummarize(context.Background(), map[string]interface{}{
"action": "fleet",
"resource_type": "node",
"resource_ids": "node-a, node-b , node-c ",
})
if err != nil {
t.Fatalf("executeSummarize: %v", err)
}
if res.IsError {
t.Fatalf("unexpected error: %+v", res.Content)
}
var parsed summarizeFleetResponse
if err := json.Unmarshal([]byte(res.Content[0].Text), &parsed); err != nil {
t.Fatalf("decode response: %v", err)
}
if len(parsed.ResourceIDs) != 3 {
t.Fatalf("expected 3 deduped/trimmed IDs, got %v", parsed.ResourceIDs)
}
wanted := []string{"node-a", "node-b", "node-c"}
for i, w := range wanted {
if parsed.ResourceIDs[i] != w {
t.Errorf("ResourceIDs[%d] = %q, want %q", i, parsed.ResourceIDs[i], w)
}
}
if parsed.NarrativeSource != reporting.NarrativeSourceHeuristic {
t.Errorf("NarrativeSource = %q, want heuristic", parsed.NarrativeSource)
}
}
func TestSummarizeTool_FleetEnforcesMaxResources(t *testing.T) {
exec, cleanup := newSummarizeTestEnvironment(t)
defer cleanup()
// Build a comma-separated list well over the cap.
var ids string
for i := 0; i < summarizeFleetMaxResources+5; i++ {
if i > 0 {
ids += ","
}
ids += "node-x"
}
res, err := exec.executeSummarize(context.Background(), map[string]interface{}{
"action": "fleet",
"resource_type": "node",
"resource_ids": ids,
})
if err != nil {
t.Fatalf("executeSummarize: %v", err)
}
if !res.IsError {
t.Error("expected error for over-limit fleet size")
}
}
func TestSummarizeRangeWindow(t *testing.T) {
cases := map[string]time.Duration{
"24h": 24 * time.Hour,
"7d": 7 * 24 * time.Hour,
"30d": 30 * 24 * time.Hour,
"": 7 * 24 * time.Hour,
"banana": reporting.DescribePerformanceReport().DefaultRangeDuration(),
" 7d ": 7 * 24 * time.Hour,
"7D": 7 * 24 * time.Hour,
}
for input, want := range cases {
if got := summarizeRangeWindow(input); got != want {
t.Errorf("summarizeRangeWindow(%q) = %v, want %v", input, got, want)
}
}
}