From fa13919987b4e12022f35002e455a790e00614db Mon Sep 17 00:00:00 2001 From: rcourtman Date: Thu, 11 Dec 2025 23:02:59 +0000 Subject: [PATCH] fix(ai-chat): Display messages chronologically in AI chatbot - Add 'content' type to StreamDisplayEvent for tracking text chunks - Track content events in streamEvents array for chronological display - Update render to use Switch/Match for cleaner conditional rendering - Interleave thinking, tool calls, and content as they stream in - Add fallback for old messages without streamEvents for backwards compat Previously, tool/command outputs stayed at top while AI text responses accumulated at the bottom. Now all events appear in order like a normal chatbot. --- frontend-modern/src/components/AI/AIChat.tsx | 77 +-- internal/monitoring/monitor.go | 475 +++++++++++-------- internal/monitoring/monitor_polling.go | 5 +- pkg/proxmox/cluster_client.go | 11 +- 4 files changed, 343 insertions(+), 225 deletions(-) diff --git a/frontend-modern/src/components/AI/AIChat.tsx b/frontend-modern/src/components/AI/AIChat.tsx index 5bab0714d..86537fdc2 100644 --- a/frontend-modern/src/components/AI/AIChat.tsx +++ b/frontend-modern/src/components/AI/AIChat.tsx @@ -1,4 +1,4 @@ -import { Component, Show, createSignal, For, createEffect, createMemo, onMount } from 'solid-js'; +import { Component, Show, createSignal, For, createEffect, createMemo, onMount, Switch, Match } from 'solid-js'; import { marked } from 'marked'; import { AIAPI } from '@/api/ai'; import { notificationStore } from '@/stores/notifications'; @@ -116,9 +116,10 @@ interface PendingApproval { // Unified event type for chronological display interface StreamDisplayEvent { - type: 'thinking' | 'tool'; + type: 'thinking' | 'tool' | 'content'; thinking?: string; tool?: AIToolExecution; + content?: string; // Text content chunk for chronological display } interface Message { @@ -548,13 +549,16 @@ export const AIChat: Component = (props) => { } case 'content': { const content = event.data as string; - // Append content rather than replace - this allows intermediate AI responses - // during tool execution to accumulate, showing the user the full conversation flow + if (!content.trim()) return msg; // Skip empty content + // Track content in streamEvents for chronological display + const events = msg.streamEvents || []; + // Also accumulate in content for backwards compatibility / summary const existingContent = msg.content || ''; const separator = existingContent && !existingContent.endsWith('\n') ? '\n\n' : ''; return { ...msg, content: existingContent + separator + content, + streamEvents: [...events, { type: 'content' as const, content: content.trim() }], }; } case 'complete': { @@ -1089,38 +1093,45 @@ export const AIChat: Component = (props) => { : 'bg-gray-100 dark:bg-gray-800 text-gray-900 dark:text-gray-100' }`} > - {/* Render all events in chronological order - thinking and tools interleaved */} + {/* Render all events in chronological order - thinking, tools, and content interleaved */} 0}> -
+
{(evt) => ( - + + {/* Tool call */} +
+
+ + + + {evt.tool!.input} +
+ +
+                                    {evt.tool!.output.length > 500 ? evt.tool!.output.substring(0, 500) + '...' : evt.tool!.output}
+                                  
+
+
+
+ + {/* Thinking chunk */}
- {sanitizeThinking(evt.thinking && evt.thinking.length > 500 ? evt.thinking.substring(0, 500) + '...' : evt.thinking || '')} + {sanitizeThinking(evt.thinking!.length > 500 ? evt.thinking!.substring(0, 500) + '...' : evt.thinking!)}
- } - > - {/* Tool call */} -
-
- - - - {evt.tool!.input} -
- -
-                                  {evt.tool!.output.length > 500 ? evt.tool!.output.substring(0, 500) + '...' : evt.tool!.output}
-                                
-
-
-
+ + + {/* Content chunk - rendered as markdown */} +
+ + )}
@@ -1146,8 +1157,8 @@ export const AIChat: Component = (props) => {
- {/* Show AI's response text AFTER tool calls */} - + {/* Show AI's response text - only if no streamEvents (fallback for old messages or messages without streaming) */} +
0 && disk.Wearout < 10 { - // Low wearout warning (less than 10% life remaining) - log.Warn(). - Str("node", node.Node). - Str("disk", disk.DevPath). - Str("model", disk.Model). - Int("wearout", disk.Wearout). - Msg("SSD wearout critical - less than 10% life remaining") + normalizedHealth := strings.ToUpper(strings.TrimSpace(disk.Health)) + if normalizedHealth != "" && normalizedHealth != "UNKNOWN" && normalizedHealth != "PASSED" && normalizedHealth != "OK" { + // Disk has failed or is failing - alert manager will handle this + log.Warn(). + Str("node", node.Node). + Str("disk", disk.DevPath). + Str("model", disk.Model). + Str("health", disk.Health). + Int("wearout", disk.Wearout). + Msg("Disk health issue detected") - // Pass to alert manager for wearout alert - m.alertManager.CheckDiskHealth(instanceName, node.Node, disk) + // Pass disk info to alert manager + m.alertManager.CheckDiskHealth(inst, node.Node, disk) + } else if disk.Wearout > 0 && disk.Wearout < 10 { + // Low wearout warning (less than 10% life remaining) + log.Warn(). + Str("node", node.Node). + Str("disk", disk.DevPath). + Str("model", disk.Model). + Int("wearout", disk.Wearout). + Msg("SSD wearout critical - less than 10% life remaining") + + // Pass to alert manager for wearout alert + m.alertManager.CheckDiskHealth(inst, node.Node, disk) + } } } - } - // Preserve existing disk data for nodes that weren't polled (offline or error) - for _, existingDisk := range existingDisksMap { - // Only preserve if we didn't poll this node - if !polledNodes[existingDisk.Node] { - // Keep the existing disk data but update the LastChecked to indicate it's stale - allDisks = append(allDisks, existingDisk) - log.Debug(). - Str("node", existingDisk.Node). - Str("disk", existingDisk.DevPath). - Msg("Preserving existing disk data for unpolled node") + // Preserve existing disk data for nodes that weren't polled (offline or error) + for _, existingDisk := range existingDisksMap { + // Only preserve if we didn't poll this node + if !polledNodes[existingDisk.Node] { + // Keep the existing disk data but update the LastChecked to indicate it's stale + allDisks = append(allDisks, existingDisk) + log.Debug(). + Str("node", existingDisk.Node). + Str("disk", existingDisk.DevPath). + Msg("Preserving existing disk data for unpolled node") + } } - } - allDisks = mergeNVMeTempsIntoDisks(allDisks, modelNodes) + allDisks = mergeNVMeTempsIntoDisks(allDisks, modelNodesCopy) - // Update physical disks in state - log.Debug(). - Str("instance", instanceName). - Int("diskCount", len(allDisks)). - Int("preservedCount", len(existingDisksMap)-len(polledNodes)). - Msg("Updating physical disks in state") - m.state.UpdatePhysicalDisks(instanceName, allDisks) + // Update physical disks in state + log.Debug(). + Str("instance", inst). + Int("diskCount", len(allDisks)). + Int("preservedCount", len(existingDisksMap)-len(polledNodes)). + Msg("Updating physical disks in state") + m.state.UpdatePhysicalDisks(inst, allDisks) + }(instanceName, client, nodes, nodeEffectiveStatus, modelNodes) } } // Note: Physical disk monitoring is now enabled by default with a 5-minute polling interval. // Users can explicitly disable it in node settings. Disk data is preserved between polls. + // Wait for storage fallback to complete (with a short timeout) before using the data. + // This is non-blocking in the sense that VM/container polling has already completed by now. + // We give the storage fallback goroutine up to 2 additional seconds to finish if it's still running. + select { + case <-storageFallbackDone: + // Storage fallback completed normally + case <-time.After(2 * time.Second): + log.Debug(). + Str("instance", instanceName). + Msg("Storage fallback still running - proceeding without waiting (disk fallback may be unavailable)") + } + // Update nodes with storage fallback if rootfs was not available + // Copy storageByNode under lock, then release to avoid holding during metric updates + storageByNodeMu.Lock() + localStorageByNode := make(map[string]models.Disk, len(storageByNode)) + for k, v := range storageByNode { + localStorageByNode[k] = v + } + storageByNodeMu.Unlock() + for i := range modelNodes { if modelNodes[i].Disk.Total == 0 { - if disk, exists := storageByNode[modelNodes[i].Name]; exists { + if disk, exists := localStorageByNode[modelNodes[i].Name]; exists { modelNodes[i].Disk = disk log.Debug(). Str("node", modelNodes[i].Name). @@ -5159,55 +5284,25 @@ func (m *Monitor) pollPVEInstance(ctx context.Context, instanceName string, clie } } - // Poll VMs and containers together using cluster/resources for efficiency - if instanceCfg.MonitorVMs || instanceCfg.MonitorContainers { - select { - case <-ctx.Done(): - pollErr = ctx.Err() - return - default: - // Always try the efficient cluster/resources endpoint first - // This endpoint works on both clustered and standalone nodes - // Testing confirmed it works on standalone nodes like pimox - useClusterEndpoint := m.pollVMsAndContainersEfficient(ctx, instanceName, client, nodeEffectiveStatus) - - if !useClusterEndpoint { - // Fall back to traditional polling only if cluster/resources not available - // This should be rare - only for very old Proxmox versions - log.Debug(). - Str("instance", instanceName). - Msg("cluster/resources endpoint not available, using traditional polling") - - // Check if configuration needs updating - if instanceCfg.IsCluster { - isActuallyCluster, checkErr := client.IsClusterMember(ctx) - if checkErr == nil && !isActuallyCluster { - log.Warn(). - Str("instance", instanceName). - Msg("Instance marked as cluster but is actually standalone - consider updating configuration") - instanceCfg.IsCluster = false - } - } - - // Use optimized parallel polling for better performance - if instanceCfg.MonitorVMs { - m.pollVMsWithNodes(ctx, instanceName, client, nodes, nodeEffectiveStatus) - } - if instanceCfg.MonitorContainers { - m.pollContainersWithNodes(ctx, instanceName, client, nodes, nodeEffectiveStatus) - } - } - } - } - - // Poll storage if enabled + // Poll storage in background if enabled - storage APIs can be slow (NFS mounts, etc.) + // so we run this asynchronously to prevent it from causing task timeouts. + // This is similar to how backup polling runs in the background. if instanceCfg.MonitorStorage { select { case <-ctx.Done(): pollErr = ctx.Err() return default: - m.pollStorageWithNodes(ctx, instanceName, client, nodes) + go func(inst string, pveClient PVEClientInterface, nodeList []proxmox.Node) { + defer recoverFromPanic(fmt.Sprintf("pollStorageWithNodes-%s", inst)) + + // Use a generous timeout for storage polling - it's not blocking the main task + storageTimeout := 60 * time.Second + storageCtx, storageCancel := context.WithTimeout(context.Background(), storageTimeout) + defer storageCancel() + + m.pollStorageWithNodes(storageCtx, inst, pveClient, nodeList) + }(instanceName, client, nodes) } } diff --git a/internal/monitoring/monitor_polling.go b/internal/monitoring/monitor_polling.go index 412684d79..f1223517f 100644 --- a/internal/monitoring/monitor_polling.go +++ b/internal/monitoring/monitor_polling.go @@ -2143,7 +2143,10 @@ func (m *Monitor) pollPVENode( var proxyTemp *models.Temperature var err error if m.tempCollector != nil { - tempCtx, tempCancel := context.WithTimeout(ctx, 30*time.Second) // Increased to accommodate SSH operations via proxy + // Temperature collection is best-effort - use a short timeout to avoid blocking node polling + // Use context.Background() so the timeout is truly independent of the parent polling context + // If SSH is slow or unresponsive, we'll preserve previous temperature data + tempCtx, tempCancel := context.WithTimeout(context.Background(), 10*time.Second) defer tempCancel() // Determine SSH hostname to use (most robust approach): diff --git a/pkg/proxmox/cluster_client.go b/pkg/proxmox/cluster_client.go index f726e98ef..48a4772ad 100644 --- a/pkg/proxmox/cluster_client.go +++ b/pkg/proxmox/cluster_client.go @@ -632,6 +632,8 @@ func (cc *ClusterClient) executeWithFailover(ctx context.Context, fn func(*Clien // Error 400 with "ds" parameter error means Proxmox 9.x doesn't support RRD data source filtering // JSON unmarshal errors are data format issues, not connectivity problems // Context deadline/timeout errors on storage endpoints mean storage issues, not node unreachability + // PBS (Proxmox Backup Server) errors are upstream storage issues, not node connectivity problems + // RRD data timeouts are secondary data fetch failures, not node unreachability if isVMSpecificError(errStr) || strings.Contains(errStr, "595") || (strings.Contains(errStr, "500") && strings.Contains(errStr, "hostname lookup")) || @@ -643,7 +645,14 @@ func (cc *ClusterClient) executeWithFailover(ctx context.Context, fn func(*Clien (strings.Contains(errStr, "storage '") && strings.Contains(errStr, "is not available on node")) || strings.Contains(errStr, "unexpected response format") || (strings.Contains(errStr, "context deadline exceeded") && strings.Contains(errStr, "/storage")) || - (strings.Contains(errStr, "Client.Timeout exceeded") && strings.Contains(errStr, "/storage")) { + (strings.Contains(errStr, "Client.Timeout exceeded") && strings.Contains(errStr, "/storage")) || + // PBS storage errors - Proxmox can't reach PBS, but node is still reachable + (strings.Contains(errStr, "500") && strings.Contains(errStr, "pbs-") && strings.Contains(errStr, "error fetching datastores")) || + (strings.Contains(errStr, "500") && strings.Contains(errStr, "Can't connect to") && strings.Contains(errStr, ":8007")) || + // RRD data timeouts - secondary metric fetch failures, node is still working + (strings.Contains(errStr, "context deadline exceeded") && strings.Contains(errStr, "/rrddata")) || + (strings.Contains(errStr, "context deadline exceeded") && strings.Contains(errStr, "/lxc/") && strings.Contains(errStr, "rrd")) || + (strings.Contains(errStr, "context deadline exceeded") && strings.Contains(errStr, "/qemu/") && strings.Contains(errStr, "rrd")) { // This is likely a node-specific failure, not an endpoint failure // Return the error but don't mark the endpoint as unhealthy log.Debug().