mirror of
https://github.com/rcourtman/Pulse.git
synced 2026-04-28 03:20:11 +00:00
fix(servicediscovery): run automatic refresh for changed/stale resources (#1225)
This commit is contained in:
parent
1f74c12ef8
commit
0d6fffbb1c
2 changed files with 124 additions and 13 deletions
|
|
@ -7,6 +7,7 @@ import (
|
|||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
|
|
@ -435,9 +436,8 @@ func (s *Service) IsRunning() bool {
|
|||
return s.running
|
||||
}
|
||||
|
||||
// discoveryLoop runs periodic fingerprint collection (NOT actual discovery).
|
||||
// This is the new fingerprint-based approach: background loop only collects fingerprints
|
||||
// to detect changes. Discovery only runs on-demand when data is actually needed.
|
||||
// discoveryLoop runs periodic fingerprint collection and automatic refreshes.
|
||||
// Fingerprints detect changes cheaply; changed/stale/new resources are then refreshed.
|
||||
func (s *Service) discoveryLoop(ctx context.Context) {
|
||||
delay := s.initialDelay
|
||||
if delay <= 0 {
|
||||
|
|
@ -454,6 +454,7 @@ func (s *Service) discoveryLoop(ctx context.Context) {
|
|||
}
|
||||
|
||||
s.collectFingerprints(ctx)
|
||||
s.runAutomaticDiscoveryRefresh(ctx)
|
||||
|
||||
s.mu.RLock()
|
||||
currentInterval := s.interval
|
||||
|
|
@ -466,6 +467,7 @@ func (s *Service) discoveryLoop(ctx context.Context) {
|
|||
select {
|
||||
case <-ticker.C:
|
||||
s.collectFingerprints(ctx)
|
||||
s.runAutomaticDiscoveryRefresh(ctx)
|
||||
case newInterval := <-s.intervalCh:
|
||||
// Interval changed - reset the ticker
|
||||
ticker.Stop()
|
||||
|
|
@ -481,8 +483,102 @@ func (s *Service) discoveryLoop(ctx context.Context) {
|
|||
}
|
||||
}
|
||||
|
||||
func (s *Service) runAutomaticDiscoveryRefresh(ctx context.Context) {
|
||||
if ctx == nil || ctx.Err() != nil || s.store == nil {
|
||||
return
|
||||
}
|
||||
|
||||
s.mu.RLock()
|
||||
analyzerConfigured := s.aiAnalyzer != nil
|
||||
maxDiscoveryAge := s.maxDiscoveryAge
|
||||
s.mu.RUnlock()
|
||||
if !analyzerConfigured {
|
||||
log.Debug().Msg("Skipping automatic discovery refresh - AI analyzer not configured")
|
||||
return
|
||||
}
|
||||
|
||||
changedResources, err := s.store.GetChangedResources()
|
||||
if err != nil {
|
||||
log.Warn().Err(err).Msg("Failed to fetch changed resources for automatic discovery refresh")
|
||||
return
|
||||
}
|
||||
staleResources, err := s.store.GetStaleResources(maxDiscoveryAge)
|
||||
if err != nil {
|
||||
log.Warn().Err(err).Msg("Failed to fetch stale resources for automatic discovery refresh")
|
||||
return
|
||||
}
|
||||
|
||||
candidates := make(map[string]struct{}, len(changedResources)+len(staleResources))
|
||||
for _, id := range changedResources {
|
||||
if strings.TrimSpace(id) != "" {
|
||||
candidates[id] = struct{}{}
|
||||
}
|
||||
}
|
||||
for _, id := range staleResources {
|
||||
if strings.TrimSpace(id) != "" {
|
||||
candidates[id] = struct{}{}
|
||||
}
|
||||
}
|
||||
if len(candidates) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
resourceIDs := make([]string, 0, len(candidates))
|
||||
for id := range candidates {
|
||||
resourceIDs = append(resourceIDs, id)
|
||||
}
|
||||
sort.Strings(resourceIDs)
|
||||
|
||||
log.Info().
|
||||
Int("changed", len(changedResources)).
|
||||
Int("stale", len(staleResources)).
|
||||
Int("total", len(resourceIDs)).
|
||||
Msg("Running automatic discovery refresh for changed/stale resources")
|
||||
|
||||
discoveredCount := 0
|
||||
failedCount := 0
|
||||
|
||||
for _, id := range resourceIDs {
|
||||
if ctx.Err() != nil {
|
||||
break
|
||||
}
|
||||
|
||||
resourceType, hostID, resourceID, err := ParseResourceID(id)
|
||||
if err != nil {
|
||||
failedCount++
|
||||
log.Warn().
|
||||
Err(err).
|
||||
Str("resource_id", id).
|
||||
Msg("Skipping invalid resource ID during automatic discovery refresh")
|
||||
continue
|
||||
}
|
||||
|
||||
_, err = s.DiscoverResource(ctx, DiscoveryRequest{
|
||||
ResourceType: resourceType,
|
||||
ResourceID: resourceID,
|
||||
HostID: hostID,
|
||||
Hostname: hostID,
|
||||
})
|
||||
if err != nil {
|
||||
failedCount++
|
||||
log.Warn().
|
||||
Err(err).
|
||||
Str("resource_id", id).
|
||||
Str("resource_type", string(resourceType)).
|
||||
Msg("Automatic discovery refresh failed for resource")
|
||||
continue
|
||||
}
|
||||
discoveredCount++
|
||||
}
|
||||
|
||||
log.Info().
|
||||
Int("discovered", discoveredCount).
|
||||
Int("failed", failedCount).
|
||||
Msg("Automatic discovery refresh completed")
|
||||
}
|
||||
|
||||
// collectFingerprints collects fingerprints from all resources (Docker, LXC, VM).
|
||||
// This is FREE (no AI calls) - it just hashes metadata to detect changes.
|
||||
// This is metadata-only and does not invoke the AI analyzer.
|
||||
func (s *Service) collectFingerprints(ctx context.Context) {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
|
|
|
|||
|
|
@ -615,7 +615,7 @@ func TestService_FingerprintLoop_StopAndCancel(t *testing.T) {
|
|||
store.crypto = nil
|
||||
|
||||
service := NewService(store, nil, stubStateProvider{state: state}, DefaultConfig())
|
||||
// Analyzer should NOT be called - background loop only collects fingerprints
|
||||
// Analyzer should be called by automatic refresh for changed/new resources.
|
||||
analyzer := &stubAnalyzer{
|
||||
response: `{"service_type":"nginx","service_name":"Nginx","service_version":"1.2","category":"web_server","cli_access":"docker exec {container} nginx -v","facts":[],"config_paths":[],"data_paths":[],"ports":[],"confidence":0.9,"reasoning":"image"}`,
|
||||
}
|
||||
|
|
@ -632,7 +632,22 @@ func TestService_FingerprintLoop_StopAndCancel(t *testing.T) {
|
|||
close(done)
|
||||
}()
|
||||
|
||||
time.Sleep(5 * time.Millisecond)
|
||||
// Wait for at least one automatic refresh cycle to run.
|
||||
calls := 0
|
||||
deadline := time.Now().Add(200 * time.Millisecond)
|
||||
for time.Now().Before(deadline) {
|
||||
analyzer.mu.Lock()
|
||||
calls = analyzer.calls
|
||||
analyzer.mu.Unlock()
|
||||
if calls > 0 {
|
||||
break
|
||||
}
|
||||
time.Sleep(2 * time.Millisecond)
|
||||
}
|
||||
if calls == 0 {
|
||||
t.Fatalf("expected automatic discovery refresh to invoke AI analyzer")
|
||||
}
|
||||
|
||||
if stopWithCancel {
|
||||
cancel()
|
||||
} else {
|
||||
|
|
@ -645,7 +660,7 @@ func TestService_FingerprintLoop_StopAndCancel(t *testing.T) {
|
|||
t.Fatalf("discoveryLoop did not stop")
|
||||
}
|
||||
|
||||
// Verify fingerprints were collected (background loop does NOT make AI calls)
|
||||
// Verify fingerprints were collected.
|
||||
// Key format is type:host:id
|
||||
fp, err := store.GetFingerprint("docker:host1:web")
|
||||
if err != nil {
|
||||
|
|
@ -655,12 +670,12 @@ func TestService_FingerprintLoop_StopAndCancel(t *testing.T) {
|
|||
t.Fatalf("expected fingerprint to be collected")
|
||||
}
|
||||
|
||||
// Verify NO AI calls were made in background loop
|
||||
analyzer.mu.Lock()
|
||||
calls := analyzer.calls
|
||||
analyzer.mu.Unlock()
|
||||
if calls > 0 {
|
||||
t.Fatalf("expected no AI calls in background loop (fingerprint-only), got %d", calls)
|
||||
discovery, err := store.Get("docker:host1:web")
|
||||
if err != nil {
|
||||
t.Fatalf("Get discovery error: %v", err)
|
||||
}
|
||||
if discovery == nil {
|
||||
t.Fatalf("expected automatic discovery refresh to persist discovery data")
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue