mirror of
https://github.com/rcourtman/Pulse.git
synced 2026-05-22 03:02:35 +00:00
fix(ai-chat): Display messages chronologically in AI chatbot
- Add 'content' type to StreamDisplayEvent for tracking text chunks - Track content events in streamEvents array for chronological display - Update render to use Switch/Match for cleaner conditional rendering - Interleave thinking, tool calls, and content as they stream in - Add fallback for old messages without streamEvents for backwards compat Previously, tool/command outputs stayed at top while AI text responses accumulated at the bottom. Now all events appear in order like a normal chatbot.
This commit is contained in:
parent
37c6654daa
commit
fa13919987
4 changed files with 343 additions and 225 deletions
|
|
@ -1,4 +1,4 @@
|
|||
import { Component, Show, createSignal, For, createEffect, createMemo, onMount } from 'solid-js';
|
||||
import { Component, Show, createSignal, For, createEffect, createMemo, onMount, Switch, Match } from 'solid-js';
|
||||
import { marked } from 'marked';
|
||||
import { AIAPI } from '@/api/ai';
|
||||
import { notificationStore } from '@/stores/notifications';
|
||||
|
|
@ -116,9 +116,10 @@ interface PendingApproval {
|
|||
|
||||
// Unified event type for chronological display
|
||||
interface StreamDisplayEvent {
|
||||
type: 'thinking' | 'tool';
|
||||
type: 'thinking' | 'tool' | 'content';
|
||||
thinking?: string;
|
||||
tool?: AIToolExecution;
|
||||
content?: string; // Text content chunk for chronological display
|
||||
}
|
||||
|
||||
interface Message {
|
||||
|
|
@ -548,13 +549,16 @@ export const AIChat: Component<AIChatProps> = (props) => {
|
|||
}
|
||||
case 'content': {
|
||||
const content = event.data as string;
|
||||
// Append content rather than replace - this allows intermediate AI responses
|
||||
// during tool execution to accumulate, showing the user the full conversation flow
|
||||
if (!content.trim()) return msg; // Skip empty content
|
||||
// Track content in streamEvents for chronological display
|
||||
const events = msg.streamEvents || [];
|
||||
// Also accumulate in content for backwards compatibility / summary
|
||||
const existingContent = msg.content || '';
|
||||
const separator = existingContent && !existingContent.endsWith('\n') ? '\n\n' : '';
|
||||
return {
|
||||
...msg,
|
||||
content: existingContent + separator + content,
|
||||
streamEvents: [...events, { type: 'content' as const, content: content.trim() }],
|
||||
};
|
||||
}
|
||||
case 'complete': {
|
||||
|
|
@ -1089,38 +1093,45 @@ export const AIChat: Component<AIChatProps> = (props) => {
|
|||
: 'bg-gray-100 dark:bg-gray-800 text-gray-900 dark:text-gray-100'
|
||||
}`}
|
||||
>
|
||||
{/* Render all events in chronological order - thinking and tools interleaved */}
|
||||
{/* Render all events in chronological order - thinking, tools, and content interleaved */}
|
||||
<Show when={message.role === 'assistant' && message.streamEvents && message.streamEvents.length > 0}>
|
||||
<div class="mb-3 space-y-2">
|
||||
<div class="space-y-2">
|
||||
<For each={message.streamEvents}>
|
||||
{(evt) => (
|
||||
<Show
|
||||
when={evt.type === 'tool' && evt.tool}
|
||||
fallback={
|
||||
// Thinking chunk
|
||||
<Switch>
|
||||
<Match when={evt.type === 'tool' && evt.tool}>
|
||||
{/* Tool call */}
|
||||
<div class="rounded border border-gray-300 dark:border-gray-600 overflow-hidden">
|
||||
<div class={`px-2 py-1 text-xs font-medium flex items-center gap-2 ${evt.tool!.success
|
||||
? 'bg-green-100 dark:bg-green-900/30 text-green-800 dark:text-green-200'
|
||||
: 'bg-red-100 dark:bg-red-900/30 text-red-800 dark:text-red-200'
|
||||
}`}>
|
||||
<svg class="w-3 h-3" fill="none" stroke="currentColor" viewBox="0 0 24 24">
|
||||
<path stroke-linecap="round" stroke-linejoin="round" stroke-width="2" d="M8 9l3 3-3 3m5 0h3M5 20h14a2 2 0 002-2V6a2 2 0 00-2-2H5a2 2 0 00-2 2v12a2 2 0 002 2z" />
|
||||
</svg>
|
||||
<code class="font-mono">{evt.tool!.input}</code>
|
||||
</div>
|
||||
<Show when={evt.tool!.output}>
|
||||
<pre class="px-2 py-1 text-xs font-mono bg-gray-50 dark:bg-gray-900 text-gray-700 dark:text-gray-300 overflow-x-auto max-h-32 overflow-y-auto whitespace-pre-wrap break-words">
|
||||
{evt.tool!.output.length > 500 ? evt.tool!.output.substring(0, 500) + '...' : evt.tool!.output}
|
||||
</pre>
|
||||
</Show>
|
||||
</div>
|
||||
</Match>
|
||||
<Match when={evt.type === 'thinking' && evt.thinking}>
|
||||
{/* Thinking chunk */}
|
||||
<div class="px-2 py-1.5 text-xs bg-blue-50 dark:bg-blue-900/20 text-gray-700 dark:text-gray-300 rounded border-l-2 border-blue-400 whitespace-pre-wrap">
|
||||
{sanitizeThinking(evt.thinking && evt.thinking.length > 500 ? evt.thinking.substring(0, 500) + '...' : evt.thinking || '')}
|
||||
{sanitizeThinking(evt.thinking!.length > 500 ? evt.thinking!.substring(0, 500) + '...' : evt.thinking!)}
|
||||
</div>
|
||||
}
|
||||
>
|
||||
{/* Tool call */}
|
||||
<div class="rounded border border-gray-300 dark:border-gray-600 overflow-hidden">
|
||||
<div class={`px-2 py-1 text-xs font-medium flex items-center gap-2 ${evt.tool!.success
|
||||
? 'bg-green-100 dark:bg-green-900/30 text-green-800 dark:text-green-200'
|
||||
: 'bg-red-100 dark:bg-red-900/30 text-red-800 dark:text-red-200'
|
||||
}`}>
|
||||
<svg class="w-3 h-3" fill="none" stroke="currentColor" viewBox="0 0 24 24">
|
||||
<path stroke-linecap="round" stroke-linejoin="round" stroke-width="2" d="M8 9l3 3-3 3m5 0h3M5 20h14a2 2 0 002-2V6a2 2 0 00-2-2H5a2 2 0 00-2 2v12a2 2 0 002 2z" />
|
||||
</svg>
|
||||
<code class="font-mono">{evt.tool!.input}</code>
|
||||
</div>
|
||||
<Show when={evt.tool!.output}>
|
||||
<pre class="px-2 py-1 text-xs font-mono bg-gray-50 dark:bg-gray-900 text-gray-700 dark:text-gray-300 overflow-x-auto max-h-32 overflow-y-auto whitespace-pre-wrap break-words">
|
||||
{evt.tool!.output.length > 500 ? evt.tool!.output.substring(0, 500) + '...' : evt.tool!.output}
|
||||
</pre>
|
||||
</Show>
|
||||
</div>
|
||||
</Show>
|
||||
</Match>
|
||||
<Match when={evt.type === 'content' && evt.content}>
|
||||
{/* Content chunk - rendered as markdown */}
|
||||
<div
|
||||
class="text-sm prose prose-sm dark:prose-invert max-w-none prose-pre:bg-gray-800 prose-pre:text-gray-100 prose-code:text-purple-600 dark:prose-code:text-purple-400 prose-code:before:content-none prose-code:after:content-none"
|
||||
innerHTML={renderMarkdown(evt.content!)}
|
||||
/>
|
||||
</Match>
|
||||
</Switch>
|
||||
)}
|
||||
</For>
|
||||
</div>
|
||||
|
|
@ -1146,8 +1157,8 @@ export const AIChat: Component<AIChatProps> = (props) => {
|
|||
</div>
|
||||
</Show>
|
||||
|
||||
{/* Show AI's response text AFTER tool calls */}
|
||||
<Show when={message.content}>
|
||||
{/* Show AI's response text - only if no streamEvents (fallback for old messages or messages without streaming) */}
|
||||
<Show when={message.content && (!message.streamEvents || message.streamEvents.length === 0)}>
|
||||
<div
|
||||
class="text-sm prose prose-sm dark:prose-invert max-w-none prose-pre:bg-gray-800 prose-pre:text-gray-100 prose-code:text-purple-600 dark:prose-code:text-purple-400 prose-code:before:content-none prose-code:after:content-none"
|
||||
innerHTML={renderMarkdown(message.content)}
|
||||
|
|
|
|||
|
|
@ -4797,57 +4797,142 @@ func (m *Monitor) pollPVEInstance(ctx context.Context, instanceName string, clie
|
|||
// Update state first so we have nodes available
|
||||
m.state.UpdateNodesForInstance(instanceName, modelNodes)
|
||||
|
||||
// Now get storage data to use as fallback for disk metrics if needed
|
||||
// Storage fallback is used to provide disk metrics when rootfs is not available.
|
||||
// We run this asynchronously with a short timeout so it doesn't block VM/container polling.
|
||||
// This addresses the issue where slow storage APIs (e.g., NFS mounts) can cause the entire
|
||||
// polling task to timeout before reaching VM/container polling.
|
||||
storageByNode := make(map[string]models.Disk)
|
||||
var storageByNodeMu sync.Mutex
|
||||
storageFallbackDone := make(chan struct{})
|
||||
|
||||
if instanceCfg.MonitorStorage {
|
||||
_, err := client.GetAllStorage(ctx)
|
||||
if err == nil {
|
||||
go func() {
|
||||
defer close(storageFallbackDone)
|
||||
|
||||
// Use a short timeout for storage fallback - it's an optimization, not critical
|
||||
storageFallbackTimeout := 10 * time.Second
|
||||
storageCtx, storageCancel := context.WithTimeout(context.Background(), storageFallbackTimeout)
|
||||
defer storageCancel()
|
||||
|
||||
_, err := client.GetAllStorage(storageCtx)
|
||||
if err != nil {
|
||||
if storageCtx.Err() != nil {
|
||||
log.Debug().
|
||||
Str("instance", instanceName).
|
||||
Dur("timeout", storageFallbackTimeout).
|
||||
Msg("Storage fallback timed out - continuing without disk fallback data")
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
for _, node := range nodes {
|
||||
// Check if context was cancelled
|
||||
select {
|
||||
case <-storageCtx.Done():
|
||||
log.Debug().
|
||||
Str("instance", instanceName).
|
||||
Msg("Storage fallback cancelled - partial data collected")
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
// Skip offline nodes to avoid 595 errors
|
||||
if nodeEffectiveStatus[node.Node] != "online" {
|
||||
continue
|
||||
}
|
||||
|
||||
nodeStorages, err := client.GetStorage(ctx, node.Node)
|
||||
if err == nil {
|
||||
// Look for local or local-lvm storage as most stable disk metric
|
||||
for _, storage := range nodeStorages {
|
||||
if reason, skip := readOnlyFilesystemReason(storage.Type, storage.Total, storage.Used); skip {
|
||||
nodeStorages, err := client.GetStorage(storageCtx, node.Node)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
// Look for local or local-lvm storage as most stable disk metric
|
||||
for _, storage := range nodeStorages {
|
||||
if reason, skip := readOnlyFilesystemReason(storage.Type, storage.Total, storage.Used); skip {
|
||||
log.Debug().
|
||||
Str("node", node.Node).
|
||||
Str("storage", storage.Storage).
|
||||
Str("type", storage.Type).
|
||||
Str("skipReason", reason).
|
||||
Uint64("total", storage.Total).
|
||||
Uint64("used", storage.Used).
|
||||
Msg("Skipping read-only storage while building disk fallback")
|
||||
continue
|
||||
}
|
||||
if storage.Storage == "local" || storage.Storage == "local-lvm" {
|
||||
disk := models.Disk{
|
||||
Total: int64(storage.Total),
|
||||
Used: int64(storage.Used),
|
||||
Free: int64(storage.Available),
|
||||
Usage: safePercentage(float64(storage.Used), float64(storage.Total)),
|
||||
}
|
||||
// Prefer "local" over "local-lvm"
|
||||
storageByNodeMu.Lock()
|
||||
if _, exists := storageByNode[node.Node]; !exists || storage.Storage == "local" {
|
||||
storageByNode[node.Node] = disk
|
||||
log.Debug().
|
||||
Str("node", node.Node).
|
||||
Str("storage", storage.Storage).
|
||||
Str("type", storage.Type).
|
||||
Str("skipReason", reason).
|
||||
Uint64("total", storage.Total).
|
||||
Uint64("used", storage.Used).
|
||||
Msg("Skipping read-only storage while building disk fallback")
|
||||
continue
|
||||
}
|
||||
if storage.Storage == "local" || storage.Storage == "local-lvm" {
|
||||
disk := models.Disk{
|
||||
Total: int64(storage.Total),
|
||||
Used: int64(storage.Used),
|
||||
Free: int64(storage.Available),
|
||||
Usage: safePercentage(float64(storage.Used), float64(storage.Total)),
|
||||
}
|
||||
// Prefer "local" over "local-lvm"
|
||||
if _, exists := storageByNode[node.Node]; !exists || storage.Storage == "local" {
|
||||
storageByNode[node.Node] = disk
|
||||
log.Debug().
|
||||
Str("node", node.Node).
|
||||
Str("storage", storage.Storage).
|
||||
Float64("usage", disk.Usage).
|
||||
Msg("Using storage for disk metrics fallback")
|
||||
}
|
||||
Float64("usage", disk.Usage).
|
||||
Msg("Using storage for disk metrics fallback")
|
||||
}
|
||||
storageByNodeMu.Unlock()
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
} else {
|
||||
// No storage monitoring, close channel immediately
|
||||
close(storageFallbackDone)
|
||||
}
|
||||
|
||||
// Poll VMs and containers FIRST - this is the most critical data.
|
||||
// This happens immediately after starting the storage fallback goroutine,
|
||||
// so VM/container polling runs in parallel with (and is not blocked by) storage operations.
|
||||
if instanceCfg.MonitorVMs || instanceCfg.MonitorContainers {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
pollErr = ctx.Err()
|
||||
return
|
||||
default:
|
||||
// Always try the efficient cluster/resources endpoint first
|
||||
// This endpoint works on both clustered and standalone nodes
|
||||
// Testing confirmed it works on standalone nodes like pimox
|
||||
useClusterEndpoint := m.pollVMsAndContainersEfficient(ctx, instanceName, client, nodeEffectiveStatus)
|
||||
|
||||
if !useClusterEndpoint {
|
||||
// Fall back to traditional polling only if cluster/resources not available
|
||||
// This should be rare - only for very old Proxmox versions
|
||||
log.Debug().
|
||||
Str("instance", instanceName).
|
||||
Msg("cluster/resources endpoint not available, using traditional polling")
|
||||
|
||||
// Check if configuration needs updating
|
||||
if instanceCfg.IsCluster {
|
||||
isActuallyCluster, checkErr := client.IsClusterMember(ctx)
|
||||
if checkErr == nil && !isActuallyCluster {
|
||||
log.Warn().
|
||||
Str("instance", instanceName).
|
||||
Msg("Instance marked as cluster but is actually standalone - consider updating configuration")
|
||||
instanceCfg.IsCluster = false
|
||||
}
|
||||
}
|
||||
|
||||
// Use optimized parallel polling for better performance
|
||||
if instanceCfg.MonitorVMs {
|
||||
m.pollVMsWithNodes(ctx, instanceName, client, nodes, nodeEffectiveStatus)
|
||||
}
|
||||
if instanceCfg.MonitorContainers {
|
||||
m.pollContainersWithNodes(ctx, instanceName, client, nodes, nodeEffectiveStatus)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Poll physical disks for health monitoring (enabled by default unless explicitly disabled)
|
||||
// Skip if MonitorPhysicalDisks is explicitly set to false
|
||||
// Physical disk polling runs in a background goroutine since GetDisks can be slow
|
||||
// and we don't want it to cause task timeouts. It has its own 5-minute interval anyway.
|
||||
if instanceCfg.MonitorPhysicalDisks != nil && !*instanceCfg.MonitorPhysicalDisks {
|
||||
log.Debug().Str("instance", instanceName).Msg("Physical disk monitoring explicitly disabled")
|
||||
// Keep any existing disk data visible (don't clear it)
|
||||
|
|
@ -4887,152 +4972,192 @@ func (m *Monitor) pollPVEInstance(ctx context.Context, instanceName string, clie
|
|||
m.state.UpdatePhysicalDisks(instanceName, updated)
|
||||
}
|
||||
} else {
|
||||
log.Debug().
|
||||
Int("nodeCount", len(nodes)).
|
||||
Dur("interval", pollingInterval).
|
||||
Msg("Starting disk health polling")
|
||||
|
||||
// Get existing disks from state to preserve data for offline nodes
|
||||
currentState := m.state.GetSnapshot()
|
||||
existingDisksMap := make(map[string]models.PhysicalDisk)
|
||||
for _, disk := range currentState.PhysicalDisks {
|
||||
if disk.Instance == instanceName {
|
||||
existingDisksMap[disk.ID] = disk
|
||||
}
|
||||
}
|
||||
|
||||
var allDisks []models.PhysicalDisk
|
||||
polledNodes := make(map[string]bool) // Track which nodes we successfully polled
|
||||
|
||||
for _, node := range nodes {
|
||||
// Skip offline nodes but preserve their existing disk data
|
||||
if nodeEffectiveStatus[node.Node] != "online" {
|
||||
log.Debug().Str("node", node.Node).Msg("Skipping disk poll for offline node - preserving existing data")
|
||||
continue
|
||||
}
|
||||
|
||||
// Get disk list for this node
|
||||
log.Debug().Str("node", node.Node).Msg("Getting disk list for node")
|
||||
disks, err := client.GetDisks(ctx, node.Node)
|
||||
if err != nil {
|
||||
// Check if it's a permission error or if the endpoint doesn't exist
|
||||
errStr := err.Error()
|
||||
if strings.Contains(errStr, "401") || strings.Contains(errStr, "403") {
|
||||
log.Warn().
|
||||
Str("node", node.Node).
|
||||
Err(err).
|
||||
Msg("Insufficient permissions to access disk information - check API token permissions")
|
||||
} else if strings.Contains(errStr, "404") || strings.Contains(errStr, "501") {
|
||||
log.Info().
|
||||
Str("node", node.Node).
|
||||
Msg("Disk monitoring not available on this node (may be using non-standard storage)")
|
||||
} else {
|
||||
log.Warn().
|
||||
Str("node", node.Node).
|
||||
Err(err).
|
||||
Msg("Failed to get disk list")
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
// Run physical disk polling in background to avoid blocking the main task
|
||||
go func(inst string, pveClient PVEClientInterface, nodeList []proxmox.Node, nodeStatus map[string]string, modelNodesCopy []models.Node) {
|
||||
defer recoverFromPanic(fmt.Sprintf("pollPhysicalDisks-%s", inst))
|
||||
|
||||
// Use a generous timeout for disk polling
|
||||
diskTimeout := 60 * time.Second
|
||||
diskCtx, diskCancel := context.WithTimeout(context.Background(), diskTimeout)
|
||||
defer diskCancel()
|
||||
|
||||
log.Debug().
|
||||
Str("node", node.Node).
|
||||
Int("diskCount", len(disks)).
|
||||
Msg("Got disk list for node")
|
||||
Int("nodeCount", len(nodeList)).
|
||||
Dur("interval", pollingInterval).
|
||||
Msg("Starting disk health polling")
|
||||
|
||||
// Mark this node as successfully polled
|
||||
polledNodes[node.Node] = true
|
||||
// Get existing disks from state to preserve data for offline nodes
|
||||
currentState := m.state.GetSnapshot()
|
||||
existingDisksMap := make(map[string]models.PhysicalDisk)
|
||||
for _, disk := range currentState.PhysicalDisks {
|
||||
if disk.Instance == inst {
|
||||
existingDisksMap[disk.ID] = disk
|
||||
}
|
||||
}
|
||||
|
||||
// Check each disk for health issues and add to state
|
||||
for _, disk := range disks {
|
||||
// Create PhysicalDisk model
|
||||
diskID := fmt.Sprintf("%s-%s-%s", instanceName, node.Node, strings.ReplaceAll(disk.DevPath, "/", "-"))
|
||||
physicalDisk := models.PhysicalDisk{
|
||||
ID: diskID,
|
||||
Node: node.Node,
|
||||
Instance: instanceName,
|
||||
DevPath: disk.DevPath,
|
||||
Model: disk.Model,
|
||||
Serial: disk.Serial,
|
||||
WWN: disk.WWN,
|
||||
Type: disk.Type,
|
||||
Size: disk.Size,
|
||||
Health: disk.Health,
|
||||
Wearout: disk.Wearout,
|
||||
RPM: disk.RPM,
|
||||
Used: disk.Used,
|
||||
LastChecked: time.Now(),
|
||||
var allDisks []models.PhysicalDisk
|
||||
polledNodes := make(map[string]bool) // Track which nodes we successfully polled
|
||||
|
||||
for _, node := range nodeList {
|
||||
// Check if context timed out
|
||||
select {
|
||||
case <-diskCtx.Done():
|
||||
log.Debug().
|
||||
Str("instance", inst).
|
||||
Msg("Physical disk polling timed out - preserving existing data")
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
// Skip offline nodes but preserve their existing disk data
|
||||
if nodeStatus[node.Node] != "online" {
|
||||
log.Debug().Str("node", node.Node).Msg("Skipping disk poll for offline node - preserving existing data")
|
||||
continue
|
||||
}
|
||||
|
||||
allDisks = append(allDisks, physicalDisk)
|
||||
// Get disk list for this node
|
||||
log.Debug().Str("node", node.Node).Msg("Getting disk list for node")
|
||||
disks, err := pveClient.GetDisks(diskCtx, node.Node)
|
||||
if err != nil {
|
||||
// Check if it's a permission error or if the endpoint doesn't exist
|
||||
errStr := err.Error()
|
||||
if strings.Contains(errStr, "401") || strings.Contains(errStr, "403") {
|
||||
log.Warn().
|
||||
Str("node", node.Node).
|
||||
Err(err).
|
||||
Msg("Insufficient permissions to access disk information - check API token permissions")
|
||||
} else if strings.Contains(errStr, "404") || strings.Contains(errStr, "501") {
|
||||
log.Info().
|
||||
Str("node", node.Node).
|
||||
Msg("Disk monitoring not available on this node (may be using non-standard storage)")
|
||||
} else {
|
||||
log.Warn().
|
||||
Str("node", node.Node).
|
||||
Err(err).
|
||||
Msg("Failed to get disk list")
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
log.Debug().
|
||||
Str("node", node.Node).
|
||||
Str("disk", disk.DevPath).
|
||||
Str("model", disk.Model).
|
||||
Str("health", disk.Health).
|
||||
Int("wearout", disk.Wearout).
|
||||
Msg("Checking disk health")
|
||||
Int("diskCount", len(disks)).
|
||||
Msg("Got disk list for node")
|
||||
|
||||
normalizedHealth := strings.ToUpper(strings.TrimSpace(disk.Health))
|
||||
if normalizedHealth != "" && normalizedHealth != "UNKNOWN" && normalizedHealth != "PASSED" && normalizedHealth != "OK" {
|
||||
// Disk has failed or is failing - alert manager will handle this
|
||||
log.Warn().
|
||||
// Mark this node as successfully polled
|
||||
polledNodes[node.Node] = true
|
||||
|
||||
// Check each disk for health issues and add to state
|
||||
for _, disk := range disks {
|
||||
// Create PhysicalDisk model
|
||||
diskID := fmt.Sprintf("%s-%s-%s", inst, node.Node, strings.ReplaceAll(disk.DevPath, "/", "-"))
|
||||
physicalDisk := models.PhysicalDisk{
|
||||
ID: diskID,
|
||||
Node: node.Node,
|
||||
Instance: inst,
|
||||
DevPath: disk.DevPath,
|
||||
Model: disk.Model,
|
||||
Serial: disk.Serial,
|
||||
WWN: disk.WWN,
|
||||
Type: disk.Type,
|
||||
Size: disk.Size,
|
||||
Health: disk.Health,
|
||||
Wearout: disk.Wearout,
|
||||
RPM: disk.RPM,
|
||||
Used: disk.Used,
|
||||
LastChecked: time.Now(),
|
||||
}
|
||||
|
||||
allDisks = append(allDisks, physicalDisk)
|
||||
|
||||
log.Debug().
|
||||
Str("node", node.Node).
|
||||
Str("disk", disk.DevPath).
|
||||
Str("model", disk.Model).
|
||||
Str("health", disk.Health).
|
||||
Int("wearout", disk.Wearout).
|
||||
Msg("Disk health issue detected")
|
||||
Msg("Checking disk health")
|
||||
|
||||
// Pass disk info to alert manager
|
||||
m.alertManager.CheckDiskHealth(instanceName, node.Node, disk)
|
||||
} else if disk.Wearout > 0 && disk.Wearout < 10 {
|
||||
// Low wearout warning (less than 10% life remaining)
|
||||
log.Warn().
|
||||
Str("node", node.Node).
|
||||
Str("disk", disk.DevPath).
|
||||
Str("model", disk.Model).
|
||||
Int("wearout", disk.Wearout).
|
||||
Msg("SSD wearout critical - less than 10% life remaining")
|
||||
normalizedHealth := strings.ToUpper(strings.TrimSpace(disk.Health))
|
||||
if normalizedHealth != "" && normalizedHealth != "UNKNOWN" && normalizedHealth != "PASSED" && normalizedHealth != "OK" {
|
||||
// Disk has failed or is failing - alert manager will handle this
|
||||
log.Warn().
|
||||
Str("node", node.Node).
|
||||
Str("disk", disk.DevPath).
|
||||
Str("model", disk.Model).
|
||||
Str("health", disk.Health).
|
||||
Int("wearout", disk.Wearout).
|
||||
Msg("Disk health issue detected")
|
||||
|
||||
// Pass to alert manager for wearout alert
|
||||
m.alertManager.CheckDiskHealth(instanceName, node.Node, disk)
|
||||
// Pass disk info to alert manager
|
||||
m.alertManager.CheckDiskHealth(inst, node.Node, disk)
|
||||
} else if disk.Wearout > 0 && disk.Wearout < 10 {
|
||||
// Low wearout warning (less than 10% life remaining)
|
||||
log.Warn().
|
||||
Str("node", node.Node).
|
||||
Str("disk", disk.DevPath).
|
||||
Str("model", disk.Model).
|
||||
Int("wearout", disk.Wearout).
|
||||
Msg("SSD wearout critical - less than 10% life remaining")
|
||||
|
||||
// Pass to alert manager for wearout alert
|
||||
m.alertManager.CheckDiskHealth(inst, node.Node, disk)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Preserve existing disk data for nodes that weren't polled (offline or error)
|
||||
for _, existingDisk := range existingDisksMap {
|
||||
// Only preserve if we didn't poll this node
|
||||
if !polledNodes[existingDisk.Node] {
|
||||
// Keep the existing disk data but update the LastChecked to indicate it's stale
|
||||
allDisks = append(allDisks, existingDisk)
|
||||
log.Debug().
|
||||
Str("node", existingDisk.Node).
|
||||
Str("disk", existingDisk.DevPath).
|
||||
Msg("Preserving existing disk data for unpolled node")
|
||||
// Preserve existing disk data for nodes that weren't polled (offline or error)
|
||||
for _, existingDisk := range existingDisksMap {
|
||||
// Only preserve if we didn't poll this node
|
||||
if !polledNodes[existingDisk.Node] {
|
||||
// Keep the existing disk data but update the LastChecked to indicate it's stale
|
||||
allDisks = append(allDisks, existingDisk)
|
||||
log.Debug().
|
||||
Str("node", existingDisk.Node).
|
||||
Str("disk", existingDisk.DevPath).
|
||||
Msg("Preserving existing disk data for unpolled node")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
allDisks = mergeNVMeTempsIntoDisks(allDisks, modelNodes)
|
||||
allDisks = mergeNVMeTempsIntoDisks(allDisks, modelNodesCopy)
|
||||
|
||||
// Update physical disks in state
|
||||
log.Debug().
|
||||
Str("instance", instanceName).
|
||||
Int("diskCount", len(allDisks)).
|
||||
Int("preservedCount", len(existingDisksMap)-len(polledNodes)).
|
||||
Msg("Updating physical disks in state")
|
||||
m.state.UpdatePhysicalDisks(instanceName, allDisks)
|
||||
// Update physical disks in state
|
||||
log.Debug().
|
||||
Str("instance", inst).
|
||||
Int("diskCount", len(allDisks)).
|
||||
Int("preservedCount", len(existingDisksMap)-len(polledNodes)).
|
||||
Msg("Updating physical disks in state")
|
||||
m.state.UpdatePhysicalDisks(inst, allDisks)
|
||||
}(instanceName, client, nodes, nodeEffectiveStatus, modelNodes)
|
||||
}
|
||||
}
|
||||
// Note: Physical disk monitoring is now enabled by default with a 5-minute polling interval.
|
||||
// Users can explicitly disable it in node settings. Disk data is preserved between polls.
|
||||
|
||||
// Wait for storage fallback to complete (with a short timeout) before using the data.
|
||||
// This is non-blocking in the sense that VM/container polling has already completed by now.
|
||||
// We give the storage fallback goroutine up to 2 additional seconds to finish if it's still running.
|
||||
select {
|
||||
case <-storageFallbackDone:
|
||||
// Storage fallback completed normally
|
||||
case <-time.After(2 * time.Second):
|
||||
log.Debug().
|
||||
Str("instance", instanceName).
|
||||
Msg("Storage fallback still running - proceeding without waiting (disk fallback may be unavailable)")
|
||||
}
|
||||
|
||||
// Update nodes with storage fallback if rootfs was not available
|
||||
// Copy storageByNode under lock, then release to avoid holding during metric updates
|
||||
storageByNodeMu.Lock()
|
||||
localStorageByNode := make(map[string]models.Disk, len(storageByNode))
|
||||
for k, v := range storageByNode {
|
||||
localStorageByNode[k] = v
|
||||
}
|
||||
storageByNodeMu.Unlock()
|
||||
|
||||
for i := range modelNodes {
|
||||
if modelNodes[i].Disk.Total == 0 {
|
||||
if disk, exists := storageByNode[modelNodes[i].Name]; exists {
|
||||
if disk, exists := localStorageByNode[modelNodes[i].Name]; exists {
|
||||
modelNodes[i].Disk = disk
|
||||
log.Debug().
|
||||
Str("node", modelNodes[i].Name).
|
||||
|
|
@ -5159,55 +5284,25 @@ func (m *Monitor) pollPVEInstance(ctx context.Context, instanceName string, clie
|
|||
}
|
||||
}
|
||||
|
||||
// Poll VMs and containers together using cluster/resources for efficiency
|
||||
if instanceCfg.MonitorVMs || instanceCfg.MonitorContainers {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
pollErr = ctx.Err()
|
||||
return
|
||||
default:
|
||||
// Always try the efficient cluster/resources endpoint first
|
||||
// This endpoint works on both clustered and standalone nodes
|
||||
// Testing confirmed it works on standalone nodes like pimox
|
||||
useClusterEndpoint := m.pollVMsAndContainersEfficient(ctx, instanceName, client, nodeEffectiveStatus)
|
||||
|
||||
if !useClusterEndpoint {
|
||||
// Fall back to traditional polling only if cluster/resources not available
|
||||
// This should be rare - only for very old Proxmox versions
|
||||
log.Debug().
|
||||
Str("instance", instanceName).
|
||||
Msg("cluster/resources endpoint not available, using traditional polling")
|
||||
|
||||
// Check if configuration needs updating
|
||||
if instanceCfg.IsCluster {
|
||||
isActuallyCluster, checkErr := client.IsClusterMember(ctx)
|
||||
if checkErr == nil && !isActuallyCluster {
|
||||
log.Warn().
|
||||
Str("instance", instanceName).
|
||||
Msg("Instance marked as cluster but is actually standalone - consider updating configuration")
|
||||
instanceCfg.IsCluster = false
|
||||
}
|
||||
}
|
||||
|
||||
// Use optimized parallel polling for better performance
|
||||
if instanceCfg.MonitorVMs {
|
||||
m.pollVMsWithNodes(ctx, instanceName, client, nodes, nodeEffectiveStatus)
|
||||
}
|
||||
if instanceCfg.MonitorContainers {
|
||||
m.pollContainersWithNodes(ctx, instanceName, client, nodes, nodeEffectiveStatus)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Poll storage if enabled
|
||||
// Poll storage in background if enabled - storage APIs can be slow (NFS mounts, etc.)
|
||||
// so we run this asynchronously to prevent it from causing task timeouts.
|
||||
// This is similar to how backup polling runs in the background.
|
||||
if instanceCfg.MonitorStorage {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
pollErr = ctx.Err()
|
||||
return
|
||||
default:
|
||||
m.pollStorageWithNodes(ctx, instanceName, client, nodes)
|
||||
go func(inst string, pveClient PVEClientInterface, nodeList []proxmox.Node) {
|
||||
defer recoverFromPanic(fmt.Sprintf("pollStorageWithNodes-%s", inst))
|
||||
|
||||
// Use a generous timeout for storage polling - it's not blocking the main task
|
||||
storageTimeout := 60 * time.Second
|
||||
storageCtx, storageCancel := context.WithTimeout(context.Background(), storageTimeout)
|
||||
defer storageCancel()
|
||||
|
||||
m.pollStorageWithNodes(storageCtx, inst, pveClient, nodeList)
|
||||
}(instanceName, client, nodes)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -2143,7 +2143,10 @@ func (m *Monitor) pollPVENode(
|
|||
var proxyTemp *models.Temperature
|
||||
var err error
|
||||
if m.tempCollector != nil {
|
||||
tempCtx, tempCancel := context.WithTimeout(ctx, 30*time.Second) // Increased to accommodate SSH operations via proxy
|
||||
// Temperature collection is best-effort - use a short timeout to avoid blocking node polling
|
||||
// Use context.Background() so the timeout is truly independent of the parent polling context
|
||||
// If SSH is slow or unresponsive, we'll preserve previous temperature data
|
||||
tempCtx, tempCancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
defer tempCancel()
|
||||
|
||||
// Determine SSH hostname to use (most robust approach):
|
||||
|
|
|
|||
|
|
@ -632,6 +632,8 @@ func (cc *ClusterClient) executeWithFailover(ctx context.Context, fn func(*Clien
|
|||
// Error 400 with "ds" parameter error means Proxmox 9.x doesn't support RRD data source filtering
|
||||
// JSON unmarshal errors are data format issues, not connectivity problems
|
||||
// Context deadline/timeout errors on storage endpoints mean storage issues, not node unreachability
|
||||
// PBS (Proxmox Backup Server) errors are upstream storage issues, not node connectivity problems
|
||||
// RRD data timeouts are secondary data fetch failures, not node unreachability
|
||||
if isVMSpecificError(errStr) ||
|
||||
strings.Contains(errStr, "595") ||
|
||||
(strings.Contains(errStr, "500") && strings.Contains(errStr, "hostname lookup")) ||
|
||||
|
|
@ -643,7 +645,14 @@ func (cc *ClusterClient) executeWithFailover(ctx context.Context, fn func(*Clien
|
|||
(strings.Contains(errStr, "storage '") && strings.Contains(errStr, "is not available on node")) ||
|
||||
strings.Contains(errStr, "unexpected response format") ||
|
||||
(strings.Contains(errStr, "context deadline exceeded") && strings.Contains(errStr, "/storage")) ||
|
||||
(strings.Contains(errStr, "Client.Timeout exceeded") && strings.Contains(errStr, "/storage")) {
|
||||
(strings.Contains(errStr, "Client.Timeout exceeded") && strings.Contains(errStr, "/storage")) ||
|
||||
// PBS storage errors - Proxmox can't reach PBS, but node is still reachable
|
||||
(strings.Contains(errStr, "500") && strings.Contains(errStr, "pbs-") && strings.Contains(errStr, "error fetching datastores")) ||
|
||||
(strings.Contains(errStr, "500") && strings.Contains(errStr, "Can't connect to") && strings.Contains(errStr, ":8007")) ||
|
||||
// RRD data timeouts - secondary metric fetch failures, node is still working
|
||||
(strings.Contains(errStr, "context deadline exceeded") && strings.Contains(errStr, "/rrddata")) ||
|
||||
(strings.Contains(errStr, "context deadline exceeded") && strings.Contains(errStr, "/lxc/") && strings.Contains(errStr, "rrd")) ||
|
||||
(strings.Contains(errStr, "context deadline exceeded") && strings.Contains(errStr, "/qemu/") && strings.Contains(errStr, "rrd")) {
|
||||
// This is likely a node-specific failure, not an endpoint failure
|
||||
// Return the error but don't mark the endpoint as unhealthy
|
||||
log.Debug().
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue