feat(truenas): project cpu temperatures into host sensors

This commit is contained in:
rcourtman 2026-03-29 19:00:21 +01:00
parent aab6b5e69d
commit 20a16d5edd
9 changed files with 437 additions and 3 deletions

View file

@ -254,3 +254,10 @@ canonical host `AgentData` and shared `ResourceMetrics` contract, and
metrics-history/store path. Pulse must not add a TrueNAS-only top-level
system charts path or leave TrueNAS host telemetry outside the canonical host
history contract.
That same monitoring boundary now also owns API-backed TrueNAS CPU
temperature. `internal/truenas/client.go` must use the modern
`reporting.get_data` RPC surface to derive current `cputemp` readings in the
same RPC session as system telemetry, and `internal/truenas/provider.go` must
project those readings into the canonical host temperature and host-sensor
contract. Pulse must not treat TrueNAS CPU temperature as an agent-only
capability or invent a TrueNAS-local sensor payload.

View file

@ -180,6 +180,11 @@ provider-local host surface. API-backed `reporting.realtime` telemetry must
project onto that same canonical `agent` contract as live metrics and metrics
targets, using the shared `agent:<id>` in-memory key and `agent` history store
path instead of adding a TrueNAS-only host history lane.
API-backed TrueNAS CPU temperature now follows that same canonical host rule.
TrueNAS system records must project `cputemp` readings into the shared
`agent.temperature` and `agent.sensors.temperatureCelsius` fields instead of
inventing a provider-local temperature payload or leaving TrueNAS host
temperatures unavailable without the unified agent.
AI discovery and query surfaces now follow the same rule. Assistant runtime
paths such as `pulse_query` and unified AI context must expose TrueNAS-backed
canonical `agent`, `app-container`, `storage`, and `physical-disk` resources

View file

@ -207,6 +207,41 @@ func TestUnifiedPhysicalDiskMetricsUseCanonicalDiskHistoryPath(t *testing.T) {
}
}
func TestTrueNASSystemTelemetryUsesCanonicalHostTemperatureModel(t *testing.T) {
clientData, err := os.ReadFile(filepath.Join("..", "truenas", "client.go"))
if err != nil {
t.Fatalf("failed to read ../truenas/client.go: %v", err)
}
clientSource := string(clientData)
clientSnippets := []string{
`"reporting.get_data"`,
`telemetry.TemperatureCelsius = cloneTemperatureMap(temperatures)`,
`return parseSystemTemperatures(response), nil`,
}
for _, snippet := range clientSnippets {
if !strings.Contains(clientSource, snippet) {
t.Fatalf("../truenas/client.go must contain %q", snippet)
}
}
providerData, err := os.ReadFile(filepath.Join("..", "truenas", "provider.go"))
if err != nil {
t.Fatalf("failed to read ../truenas/provider.go: %v", err)
}
providerSource := string(providerData)
providerSnippets := []string{
`if temperature := maxTrueNASSystemTemperature(system); temperature != nil {`,
`agent.Temperature = temperature`,
`if sensors := sensorMetaFromTrueNASSystem(system); sensors != nil {`,
`agent.Sensors = sensors`,
}
for _, snippet := range providerSnippets {
if !strings.Contains(providerSource, snippet) {
t.Fatalf("../truenas/provider.go must contain %q", snippet)
}
}
}
func TestHostAgentRemovalGuardUsesResolvedIdentifier(t *testing.T) {
data, err := os.ReadFile("monitor_agents.go")
if err != nil {

View file

@ -189,12 +189,24 @@ func (c *Client) GetSystemTelemetry(ctx context.Context) (*SystemInfo, error) {
return nil, err
}
temperatures, err := rpc.getSystemTemperatures(ctx)
if err != nil {
temperatures = nil
}
subscriptionName := fmt.Sprintf("reporting.realtime:{\"interval\":%d}", defaultRealtimeIntervalSeconds)
if err := rpc.call(ctx, "core.subscribe", []any{subscriptionName}, nil); err != nil {
return nil, err
}
return rpc.readSystemTelemetryEvent(ctx, defaultRealtimeIntervalSeconds)
telemetry, err := rpc.readSystemTelemetryEvent(ctx, defaultRealtimeIntervalSeconds)
if err != nil {
return nil, err
}
if len(temperatures) > 0 {
telemetry.TemperatureCelsius = cloneTemperatureMap(temperatures)
}
return telemetry, nil
}
// GetPools returns storage pools.
@ -633,6 +645,9 @@ func mergeSystemTelemetry(system *SystemInfo, telemetry *SystemInfo) {
system.NetOutRate = telemetry.NetOutRate
system.DiskReadRate = telemetry.DiskReadRate
system.DiskWriteRate = telemetry.DiskWriteRate
if len(telemetry.TemperatureCelsius) > 0 {
system.TemperatureCelsius = cloneTemperatureMap(telemetry.TemperatureCelsius)
}
if telemetry.IntervalSeconds > 0 {
system.IntervalSeconds = telemetry.IntervalSeconds
}
@ -783,6 +798,22 @@ type trueNASRealtimeNotification struct {
Fields map[string]any `json:"fields"`
}
type trueNASReportingGetDataResponse struct {
Name string `json:"name"`
Identifier any `json:"identifier"`
Data []any `json:"data"`
Aggregations trueNASReportingAggregations `json:"aggregations"`
Start int64 `json:"start"`
End int64 `json:"end"`
Legend []string `json:"legend"`
}
type trueNASReportingAggregations struct {
Min any `json:"min"`
Mean any `json:"mean"`
Max any `json:"max"`
}
func (c *Client) dialRPC(ctx context.Context) (*websocket.Conn, error) {
if c == nil {
return nil, fmt.Errorf("truenas client is nil")
@ -939,6 +970,35 @@ func (c *trueNASRPCClient) readAppStatsEvent(ctx context.Context, intervalSecond
}
}
func (c *trueNASRPCClient) getSystemTemperatures(ctx context.Context) (map[string]float64, error) {
if c == nil || c.conn == nil {
return nil, fmt.Errorf("truenas rpc connection is nil")
}
end := time.Now().Unix()
start := end - 300
if start <= 0 {
start = end
}
var response []trueNASReportingGetDataResponse
params := []any{
[]map[string]any{{
"name": "cputemp",
"identifier": nil,
}},
map[string]any{
"aggregate": true,
"start": start,
"end": end,
},
}
if err := c.call(ctx, "reporting.get_data", params, &response); err != nil {
return nil, err
}
return parseSystemTemperatures(response), nil
}
func (c *trueNASRPCClient) readSystemTelemetryEvent(ctx context.Context, intervalSeconds int) (*SystemInfo, error) {
if c == nil || c.conn == nil {
return nil, fmt.Errorf("truenas rpc connection is nil")
@ -1082,6 +1142,156 @@ func parseSystemTelemetry(fields map[string]any, intervalSeconds int, collectedA
return telemetry
}
func parseSystemTemperatures(responses []trueNASReportingGetDataResponse) map[string]float64 {
if len(responses) == 0 {
return nil
}
temperatures := make(map[string]float64)
for _, response := range responses {
if strings.TrimSpace(strings.ToLower(response.Name)) != "cputemp" || len(response.Legend) == 0 {
continue
}
values := extractReportingLegendFloatValues(response.Aggregations.Mean, response.Legend)
if len(values) == 0 && len(response.Data) > 0 {
values = extractReportingLegendFloatValues(response.Data[len(response.Data)-1], response.Legend)
}
if len(values) == 0 {
continue
}
for index, legend := range response.Legend {
value, ok := values[legend]
if !ok || value <= 0 {
continue
}
key := canonicalSystemTemperatureKey(legend, index, len(response.Legend))
if key == "" {
continue
}
temperatures[key] = value
}
}
if len(temperatures) == 0 {
return nil
}
return temperatures
}
func extractReportingLegendFloatValues(raw any, legends []string) map[string]float64 {
if raw == nil || len(legends) == 0 {
return nil
}
values := make(map[string]float64)
switch typed := raw.(type) {
case []any:
for index, value := range typed {
if index >= len(legends) {
break
}
if parsed, ok := parseFloat64Any(value); ok {
values[legends[index]] = parsed
}
}
case map[string]any:
for index, legend := range legends {
for _, key := range reportingLegendLookupKeys(legend, index) {
value, ok := typed[key]
if !ok {
continue
}
if parsed, ok := parseFloat64Any(value); ok {
values[legend] = parsed
break
}
}
}
if len(values) == 0 && len(legends) == 1 {
for _, value := range typed {
if parsed, ok := parseFloat64Any(value); ok {
values[legends[0]] = parsed
break
}
}
}
}
if len(values) == 0 {
return nil
}
return values
}
func reportingLegendLookupKeys(legend string, index int) []string {
trimmed := strings.TrimSpace(legend)
keys := []string{trimmed}
normalized := normalizeTemperatureLegendLabel(trimmed)
if normalized != "" && normalized != trimmed {
keys = append(keys, normalized)
}
keys = append(keys, strconv.Itoa(index))
return keys
}
func canonicalSystemTemperatureKey(legend string, index int, total int) string {
normalized := normalizeTemperatureLegendLabel(legend)
switch {
case normalized == "":
if total == 1 {
return "cpu_package"
}
return fmt.Sprintf("cpu_temp_%d", index)
case normalized == "cpu" || normalized == "temp" || normalized == "temperature":
return "cpu_package"
case strings.Contains(normalized, "package"):
return "cpu_package"
case strings.HasPrefix(normalized, "cpu"):
return normalized
case strings.HasPrefix(normalized, "core"):
return "cpu_" + normalized
default:
if total == 1 {
return "cpu_package"
}
return normalized
}
}
func normalizeTemperatureLegendLabel(value string) string {
trimmed := strings.TrimSpace(strings.ToLower(value))
if trimmed == "" {
return ""
}
var builder strings.Builder
builder.Grow(len(trimmed))
lastUnderscore := false
for _, r := range trimmed {
if (r >= 'a' && r <= 'z') || (r >= '0' && r <= '9') {
builder.WriteRune(r)
lastUnderscore = false
continue
}
if !lastUnderscore {
builder.WriteByte('_')
lastUnderscore = true
}
}
return strings.Trim(builder.String(), "_")
}
func cloneTemperatureMap(src map[string]float64) map[string]float64 {
if len(src) == 0 {
return nil
}
out := make(map[string]float64, len(src))
for key, value := range src {
out[key] = value
}
return out
}
func firstAny(record map[string]any, keys ...string) (any, bool) {
if record == nil {
return nil, false

View file

@ -288,6 +288,36 @@ func TestGetSystemTelemetryFromRPC(t *testing.T) {
}
writeRPCResult(t, conn, authReq.ID, true)
temperatureReq := readRPCRequest(t, conn)
if temperatureReq.Method != "reporting.get_data" {
t.Fatalf("expected reporting.get_data, got %q", temperatureReq.Method)
}
writeRPCResult(t, conn, temperatureReq.ID, []map[string]any{{
"name": "cputemp",
"identifier": nil,
"legend": []string{"cpu_package", "core 0", "core 1"},
"aggregations": map[string]any{
"mean": map[string]any{
"cpu_package": 61.5,
"core 0": 58.0,
"core 1": 59.0,
},
"min": map[string]any{
"cpu_package": 60.0,
"core 0": 57.0,
"core 1": 58.0,
},
"max": map[string]any{
"cpu_package": 63.0,
"core 0": 60.0,
"core 1": 61.0,
},
},
"data": []any{},
"start": time.Now().Add(-5 * time.Minute).Unix(),
"end": time.Now().Unix(),
}})
subscribeReq := readRPCRequest(t, conn)
if subscribeReq.Method != "core.subscribe" {
t.Fatalf("expected core.subscribe, got %q", subscribeReq.Method)
@ -336,11 +366,61 @@ func TestGetSystemTelemetryFromRPC(t *testing.T) {
if system.DiskReadRate != 6144 || system.DiskWriteRate != 4096 {
t.Fatalf("unexpected disk telemetry: %+v", system)
}
if got := system.TemperatureCelsius["cpu_package"]; got != 61.5 {
t.Fatalf("expected cpu_package temperature 61.5, got %+v", system.TemperatureCelsius)
}
if got := system.TemperatureCelsius["cpu_core_0"]; got != 58.0 {
t.Fatalf("expected cpu_core_0 temperature 58.0, got %+v", system.TemperatureCelsius)
}
if system.IntervalSeconds != 2 || system.CollectedAt.IsZero() {
t.Fatalf("expected interval/collectedAt metadata, got %+v", system)
}
}
func TestGetSystemTelemetryIgnoresUnavailableTemperatureRPC(t *testing.T) {
server := newMockServerWithRPC(t, defaultAPIResponses(), nil, func(t *testing.T, conn *websocket.Conn) {
authReq := readRPCRequest(t, conn)
if authReq.Method != "auth.login_with_api_key" {
t.Fatalf("expected api-key auth method, got %q", authReq.Method)
}
writeRPCResult(t, conn, authReq.ID, true)
temperatureReq := readRPCRequest(t, conn)
if temperatureReq.Method != "reporting.get_data" {
t.Fatalf("expected reporting.get_data, got %q", temperatureReq.Method)
}
writeRPCError(t, conn, temperatureReq.ID, -32601, "not found")
subscribeReq := readRPCRequest(t, conn)
if subscribeReq.Method != "core.subscribe" {
t.Fatalf("expected core.subscribe, got %q", subscribeReq.Method)
}
writeRPCResult(t, conn, subscribeReq.ID, "sub-1")
writeRPCNotification(t, conn, "collection_update", map[string]any{
"collection": "reporting.realtime:{\"interval\":2}",
"fields": map[string]any{
"cpu": map[string]any{"usage": 41},
},
})
})
t.Cleanup(server.Close)
client := mustClientForServer(t, server.URL, ClientConfig{APIKey: "api-key"})
system, err := client.GetSystemTelemetry(context.Background())
if err != nil {
t.Fatalf("GetSystemTelemetry() error = %v", err)
}
if system == nil {
t.Fatal("expected system telemetry")
}
if system.CPUPercent != 41 {
t.Fatalf("expected cpu percent 41, got %+v", system)
}
if len(system.TemperatureCelsius) != 0 {
t.Fatalf("expected unavailable temperature RPC to be ignored, got %+v", system.TemperatureCelsius)
}
}
func TestGetDiskTemperaturesSupportsArrayShape(t *testing.T) {
server := newMockServer(t, map[string]apiResponse{
"/api/v2.0/disk/temperatures": {
@ -684,6 +764,21 @@ func writeRPCResult(t *testing.T, conn *websocket.Conn, id int64, result any) {
}
}
func writeRPCError(t *testing.T, conn *websocket.Conn, id int64, code int, message string) {
t.Helper()
if err := conn.WriteJSON(trueNASRPCResponse{
JSONRPC: "2.0",
ID: id,
Error: &trueNASRPCError{
Code: code,
Message: message,
},
}); err != nil {
t.Fatalf("WriteJSON() rpc error response = %v", err)
}
}
func writeRPCNotification(t *testing.T, conn *websocket.Conn, method string, params any) {
t.Helper()

View file

@ -21,8 +21,13 @@ func DefaultFixtures() FixtureSnapshot {
NetOutRate: 19_500_000,
DiskReadRate: 7_200_000,
DiskWriteRate: 3_400_000,
IntervalSeconds: 2,
CollectedAt: time.Date(2026, 2, 8, 12, 0, 0, 0, time.UTC),
TemperatureCelsius: map[string]float64{
"cpu_package": 61.5,
"cpu_core_0": 58.0,
"cpu_core_1": 59.0,
},
IntervalSeconds: 2,
CollectedAt: time.Date(2026, 2, 8, 12, 0, 0, 0, time.UTC),
},
Pools: []Pool{
{

View file

@ -534,10 +534,64 @@ func agentDataFromTrueNASSystem(system SystemInfo, storageRisk *unifiedresources
Free: free,
}
}
if temperature := maxTrueNASSystemTemperature(system); temperature != nil {
agent.Temperature = temperature
}
if sensors := sensorMetaFromTrueNASSystem(system); sensors != nil {
agent.Sensors = sensors
}
return agent
}
func maxTrueNASSystemTemperature(system SystemInfo) *float64 {
if len(system.TemperatureCelsius) == 0 {
return nil
}
if value, ok := system.TemperatureCelsius["cpu_package"]; ok && value > 0 {
canonical := value
return &canonical
}
var best float64
found := false
for key, value := range system.TemperatureCelsius {
key = strings.TrimSpace(strings.ToLower(key))
if value <= 0 || !strings.HasPrefix(key, "cpu") {
continue
}
if !found || value > best {
best = value
found = true
}
}
if !found {
return nil
}
return &best
}
func sensorMetaFromTrueNASSystem(system SystemInfo) *unifiedresources.HostSensorMeta {
if len(system.TemperatureCelsius) == 0 {
return nil
}
sensors := &unifiedresources.HostSensorMeta{
TemperatureCelsius: make(map[string]float64, len(system.TemperatureCelsius)),
}
for key, value := range system.TemperatureCelsius {
key = strings.TrimSpace(key)
if key == "" {
continue
}
sensors.TemperatureCelsius[key] = value
}
if len(sensors.TemperatureCelsius) == 0 {
return nil
}
return sensors
}
func enrichAppStatsFromPreviousSnapshot(current *FixtureSnapshot, previous *FixtureSnapshot) {
if current == nil || previous == nil || len(current.Apps) == 0 || len(previous.Apps) == 0 {
return
@ -1059,6 +1113,7 @@ func copyFixtureSnapshot(snapshot *FixtureSnapshot) *FixtureSnapshot {
}
copied := *snapshot
copied.System = cloneSystemInfo(snapshot.System)
copied.Pools = append([]Pool(nil), snapshot.Pools...)
copied.Datasets = append([]Dataset(nil), snapshot.Datasets...)
copied.Disks = append([]Disk(nil), snapshot.Disks...)
@ -1069,6 +1124,17 @@ func copyFixtureSnapshot(snapshot *FixtureSnapshot) *FixtureSnapshot {
return &copied
}
func cloneSystemInfo(system SystemInfo) SystemInfo {
cloned := system
if len(system.TemperatureCelsius) > 0 {
cloned.TemperatureCelsius = make(map[string]float64, len(system.TemperatureCelsius))
for key, value := range system.TemperatureCelsius {
cloned.TemperatureCelsius[key] = value
}
}
return cloned
}
func cloneApps(apps []App) []App {
if len(apps) == 0 {
return nil

View file

@ -51,6 +51,7 @@ func TestFixtureFetcherReturnsSnapshotCopy(t *testing.T) {
first.Pools[0].Name = "mutated"
first.Datasets = append(first.Datasets, Dataset{Name: "extra/dataset"})
first.System.TemperatureCelsius["cpu_package"] = 99.9
second, err := fetcher.Fetch(context.Background())
if err != nil {
@ -65,6 +66,9 @@ func TestFixtureFetcherReturnsSnapshotCopy(t *testing.T) {
if len(second.Datasets) != len(fixtures.Datasets) {
t.Fatalf("expected dataset count %d, got %d", len(fixtures.Datasets), len(second.Datasets))
}
if second.System.TemperatureCelsius["cpu_package"] != fixtures.System.TemperatureCelsius["cpu_package"] {
t.Fatalf("expected fixture cpu_package temperature %v, got %v", fixtures.System.TemperatureCelsius["cpu_package"], second.System.TemperatureCelsius["cpu_package"])
}
}
func TestAPIFetcherDelegatesToClientFetchSnapshot(t *testing.T) {
@ -224,6 +228,12 @@ func TestSystemRecordPopulatesTrueNASMetadata(t *testing.T) {
if system.Resource.Agent.CPUCount != 16 || system.Resource.Agent.UptimeSeconds != int64(42*24*60*60) {
t.Fatalf("expected canonical host telemetry metadata, got %+v", system.Resource.Agent)
}
if system.Resource.Agent.Temperature == nil || *system.Resource.Agent.Temperature != 61.5 {
t.Fatalf("expected canonical host temperature 61.5, got %+v", system.Resource.Agent)
}
if system.Resource.Agent.Sensors == nil || system.Resource.Agent.Sensors.TemperatureCelsius["cpu_package"] != 61.5 {
t.Fatalf("expected canonical host sensors on TrueNAS system record, got %+v", system.Resource.Agent.Sensors)
}
if system.Resource.Metrics == nil || system.Resource.Metrics.CPU == nil || system.Resource.Metrics.Memory == nil {
t.Fatalf("expected canonical system metrics on TrueNAS host record, got %+v", system.Resource.Metrics)
}

View file

@ -31,6 +31,7 @@ type SystemInfo struct {
NetOutRate float64
DiskReadRate float64
DiskWriteRate float64
TemperatureCelsius map[string]float64
IntervalSeconds int
CollectedAt time.Time
}