mirror of
https://github.com/rcourtman/Pulse.git
synced 2026-05-19 07:54:10 +00:00
chore: remove dead code and unused files
Remove 604 lines of unreachable code identified by deadcode analysis: - internal/config/credentials.go: unused credential resolver - internal/config/registration.go: unused registration config - internal/monitoring/poller.go: unused channel-based polling (keep types) - internal/api/middleware.go: unused TimeoutHandler, JSONHandler, NewAPIError, ValidationError - internal/api/security.go: unused IsLockedOut, SecurityHeaders - internal/api/auth.go: unused min helper - internal/config/config.go: unused SaveConfig - internal/config/client_helpers.go: unused CreatePBSConfigFromFields - internal/logging/logging.go: unused NewRequestID
This commit is contained in:
parent
506484b072
commit
28aaecd74d
9 changed files with 0 additions and 604 deletions
|
|
@ -186,14 +186,6 @@ func CheckProxyAuth(cfg *config.Config, r *http.Request) (bool, string, bool) {
|
|||
return true, username, isAdmin
|
||||
}
|
||||
|
||||
// min returns the minimum of two integers
|
||||
func min(a, b int) int {
|
||||
if a < b {
|
||||
return a
|
||||
}
|
||||
return b
|
||||
}
|
||||
|
||||
// CheckAuth checks both basic auth and API token
|
||||
func CheckAuth(cfg *config.Config, w http.ResponseWriter, r *http.Request) bool {
|
||||
config.Mu.RLock()
|
||||
|
|
|
|||
|
|
@ -93,44 +93,6 @@ func ErrorHandler(next http.Handler) http.Handler {
|
|||
})
|
||||
}
|
||||
|
||||
// TimeoutHandler wraps handlers with a timeout
|
||||
func TimeoutHandler(timeout time.Duration) func(http.Handler) http.Handler {
|
||||
return func(next http.Handler) http.Handler {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
// Skip timeout for WebSocket and SSE endpoints
|
||||
if r.Header.Get("Upgrade") == "websocket" || r.Header.Get("Accept") == "text/event-stream" {
|
||||
next.ServeHTTP(w, r)
|
||||
return
|
||||
}
|
||||
|
||||
http.TimeoutHandler(next, timeout, "Request timeout").ServeHTTP(w, r)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// JSONHandler ensures proper JSON responses and error handling
|
||||
func JSONHandler(handler func(w http.ResponseWriter, r *http.Request) error) http.HandlerFunc {
|
||||
return func(w http.ResponseWriter, r *http.Request) {
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
|
||||
if err := handler(w, r); err != nil {
|
||||
// Check if it's already an APIError
|
||||
if apiErr, ok := err.(*APIError); ok {
|
||||
writeErrorResponse(w, apiErr.StatusCode, apiErr.Code, apiErr.ErrorMessage, apiErr.Details)
|
||||
return
|
||||
}
|
||||
|
||||
// Generic error
|
||||
log.Error().Err(err).
|
||||
Str("path", r.URL.Path).
|
||||
Str("method", r.Method).
|
||||
Msg("Handler error")
|
||||
|
||||
writeErrorResponse(w, http.StatusInternalServerError, "internal_error",
|
||||
"An error occurred processing the request", nil)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// writeErrorResponse writes a consistent error response
|
||||
func writeErrorResponse(w http.ResponseWriter, statusCode int, code, message string, details map[string]string) {
|
||||
|
|
@ -195,23 +157,3 @@ func (rw *responseWriter) Flush() {
|
|||
}
|
||||
}
|
||||
|
||||
// NewAPIError creates a new API error
|
||||
func NewAPIError(statusCode int, code, message string) error {
|
||||
return &APIError{
|
||||
ErrorMessage: message,
|
||||
Code: code,
|
||||
StatusCode: statusCode,
|
||||
Timestamp: time.Now().Unix(),
|
||||
}
|
||||
}
|
||||
|
||||
// ValidationError creates a validation error with field details
|
||||
func ValidationError(fields map[string]string) error {
|
||||
return &APIError{
|
||||
ErrorMessage: "Validation failed",
|
||||
Code: "validation_error",
|
||||
StatusCode: http.StatusBadRequest,
|
||||
Timestamp: time.Now().Unix(),
|
||||
Details: fields,
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -360,24 +360,6 @@ func ClearFailedLogins(identifier string) {
|
|||
delete(failedLogins, identifier)
|
||||
}
|
||||
|
||||
// IsLockedOut checks if an account is locked out
|
||||
func IsLockedOut(identifier string) bool {
|
||||
failedMu.RLock()
|
||||
defer failedMu.RUnlock()
|
||||
|
||||
failed, exists := failedLogins[identifier]
|
||||
if !exists {
|
||||
return false
|
||||
}
|
||||
|
||||
if time.Now().After(failed.LockedUntil) {
|
||||
// Lockout expired
|
||||
return false
|
||||
}
|
||||
|
||||
return failed.Count >= maxFailedAttempts
|
||||
}
|
||||
|
||||
// GetLockoutInfo returns lockout information for an identifier
|
||||
func GetLockoutInfo(identifier string) (attempts int, lockedUntil time.Time, isLocked bool) {
|
||||
failedMu.RLock()
|
||||
|
|
@ -409,11 +391,6 @@ func ResetLockout(identifier string) {
|
|||
Msg("Lockout manually reset")
|
||||
}
|
||||
|
||||
// Security Headers Middleware
|
||||
func SecurityHeaders(next http.Handler) http.Handler {
|
||||
return SecurityHeadersWithConfig(next, false, "")
|
||||
}
|
||||
|
||||
// SecurityHeadersWithConfig applies security headers with embedding configuration
|
||||
func SecurityHeadersWithConfig(next http.Handler, allowEmbedding bool, allowedOrigins string) http.Handler {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
|
|
|
|||
|
|
@ -131,19 +131,6 @@ func CreateProxmoxConfigFromFields(host, user, password, tokenName, tokenValue,
|
|||
}
|
||||
}
|
||||
|
||||
// CreatePBSConfigFromFields creates a pbs.ClientConfig from individual fields
|
||||
func CreatePBSConfigFromFields(host, user, password, tokenName, tokenValue, fingerprint string, verifySSL bool) pbs.ClientConfig {
|
||||
return pbs.ClientConfig{
|
||||
Host: normalizeHostPort(host, defaultPBSPort),
|
||||
User: user,
|
||||
Password: password,
|
||||
TokenName: tokenName,
|
||||
TokenValue: tokenValue,
|
||||
VerifySSL: verifySSL,
|
||||
Fingerprint: fingerprint,
|
||||
}
|
||||
}
|
||||
|
||||
// CreatePMGConfigFromFields creates a pmg.ClientConfig from individual fields
|
||||
func CreatePMGConfigFromFields(host, user, password, tokenName, tokenValue, fingerprint string, verifySSL bool) pmg.ClientConfig {
|
||||
return pmg.ClientConfig{
|
||||
|
|
|
|||
|
|
@ -1336,46 +1336,6 @@ func Load() (*Config, error) {
|
|||
return cfg, nil
|
||||
}
|
||||
|
||||
// SaveConfig saves the configuration back to encrypted files
|
||||
func SaveConfig(cfg *Config) error {
|
||||
if globalPersistence == nil {
|
||||
return fmt.Errorf("config persistence not initialized")
|
||||
}
|
||||
|
||||
// Save nodes configuration
|
||||
if err := globalPersistence.SaveNodesConfig(cfg.PVEInstances, cfg.PBSInstances, cfg.PMGInstances); err != nil {
|
||||
return fmt.Errorf("failed to save nodes config: %w", err)
|
||||
}
|
||||
|
||||
// Save system configuration
|
||||
adaptiveEnabled := cfg.AdaptivePollingEnabled
|
||||
systemSettings := SystemSettings{
|
||||
PVEPollingInterval: int(cfg.PVEPollingInterval / time.Second),
|
||||
UpdateChannel: cfg.UpdateChannel,
|
||||
AutoUpdateEnabled: cfg.AutoUpdateEnabled,
|
||||
AutoUpdateCheckInterval: int(cfg.AutoUpdateCheckInterval.Hours()),
|
||||
AutoUpdateTime: cfg.AutoUpdateTime,
|
||||
AllowedOrigins: cfg.AllowedOrigins,
|
||||
ConnectionTimeout: int(cfg.ConnectionTimeout.Seconds()),
|
||||
LogLevel: cfg.LogLevel,
|
||||
DiscoveryEnabled: cfg.DiscoveryEnabled,
|
||||
DiscoverySubnet: cfg.DiscoverySubnet,
|
||||
DiscoveryConfig: CloneDiscoveryConfig(cfg.Discovery),
|
||||
AdaptivePollingEnabled: &adaptiveEnabled,
|
||||
AdaptivePollingBaseInterval: int(cfg.AdaptivePollingBaseInterval / time.Second),
|
||||
AdaptivePollingMinInterval: int(cfg.AdaptivePollingMinInterval / time.Second),
|
||||
AdaptivePollingMaxInterval: int(cfg.AdaptivePollingMaxInterval / time.Second),
|
||||
DNSCacheTimeout: int(cfg.DNSCacheTimeout / time.Second),
|
||||
SSHPort: cfg.SSHPort,
|
||||
// APIToken removed - now handled via .env only
|
||||
}
|
||||
if err := globalPersistence.SaveSystemSettings(systemSettings); err != nil {
|
||||
return fmt.Errorf("failed to save system config: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// SaveOIDCConfig persists OIDC settings using the shared config persistence layer.
|
||||
func SaveOIDCConfig(settings *OIDCConfig) error {
|
||||
if globalPersistence == nil {
|
||||
|
|
|
|||
|
|
@ -1,147 +0,0 @@
|
|||
package config
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"regexp"
|
||||
"strings"
|
||||
|
||||
"github.com/rs/zerolog/log"
|
||||
)
|
||||
|
||||
// CredentialResolver handles resolving credential values from various sources
|
||||
type CredentialResolver struct {
|
||||
// Track which credentials are stored insecurely for warnings
|
||||
insecureCredentials []string
|
||||
}
|
||||
|
||||
// NewCredentialResolver creates a new credential resolver
|
||||
func NewCredentialResolver() *CredentialResolver {
|
||||
return &CredentialResolver{
|
||||
insecureCredentials: []string{},
|
||||
}
|
||||
}
|
||||
|
||||
// ResolveValue resolves a credential value that might be:
|
||||
// - A literal value (backwards compatible)
|
||||
// - An environment variable reference: ${VAR_NAME} (for secrets, not node config)
|
||||
// - A file reference: file:///path/to/secret
|
||||
// - Future: vault://path/to/secret, keyring://secret-name, etc.
|
||||
// NOTE: This is for credential values only, not for node configuration which is done via UI
|
||||
func (cr *CredentialResolver) ResolveValue(value string, fieldName string) (string, error) {
|
||||
if value == "" {
|
||||
return "", nil
|
||||
}
|
||||
|
||||
// Check for environment variable reference
|
||||
if strings.HasPrefix(value, "${") && strings.HasSuffix(value, "}") {
|
||||
varName := value[2 : len(value)-1]
|
||||
resolved := os.Getenv(varName)
|
||||
if resolved == "" {
|
||||
return "", fmt.Errorf("environment variable %s not set", varName)
|
||||
}
|
||||
log.Debug().Str("field", fieldName).Str("var", varName).Msg("Resolved credential from environment variable")
|
||||
return resolved, nil
|
||||
}
|
||||
|
||||
// Check for file reference
|
||||
if strings.HasPrefix(value, "file://") {
|
||||
filePath := strings.TrimPrefix(value, "file://")
|
||||
content, err := os.ReadFile(filePath)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("failed to read credential file %s: %w", filePath, err)
|
||||
}
|
||||
// Trim any whitespace/newlines
|
||||
resolved := strings.TrimSpace(string(content))
|
||||
|
||||
// Check file permissions
|
||||
if info, err := os.Stat(filePath); err == nil {
|
||||
mode := info.Mode()
|
||||
if mode&0077 != 0 {
|
||||
log.Warn().
|
||||
Str("file", filePath).
|
||||
Str("permissions", mode.String()).
|
||||
Msg("Credential file has overly permissive permissions. Consider: chmod 600 " + filePath)
|
||||
}
|
||||
}
|
||||
|
||||
log.Debug().Str("field", fieldName).Str("file", filePath).Msg("Resolved credential from file")
|
||||
return resolved, nil
|
||||
}
|
||||
|
||||
// Check if this looks like a credential (UUID pattern, token pattern, etc)
|
||||
if looksLikeCredential(value) {
|
||||
cr.insecureCredentials = append(cr.insecureCredentials, fieldName)
|
||||
}
|
||||
|
||||
// Return as-is (literal value - backwards compatible)
|
||||
return value, nil
|
||||
}
|
||||
|
||||
// CheckConfigSecurity checks the security of the config file and credentials
|
||||
func (cr *CredentialResolver) CheckConfigSecurity(configPath string) {
|
||||
// We now auto-secure the config file, so only log at debug level
|
||||
if info, err := os.Stat(configPath); err == nil {
|
||||
mode := info.Mode()
|
||||
log.Debug().
|
||||
Str("file", configPath).
|
||||
Str("permissions", mode.String()).
|
||||
Int("inline_credentials", len(cr.insecureCredentials)).
|
||||
Msg("Config file security check")
|
||||
}
|
||||
}
|
||||
|
||||
// looksLikeCredential uses heuristics to detect if a value is likely a credential
|
||||
func looksLikeCredential(value string) bool {
|
||||
// Skip if it's a reference
|
||||
if strings.HasPrefix(value, "${") || strings.HasPrefix(value, "file://") {
|
||||
return false
|
||||
}
|
||||
|
||||
// UUID pattern
|
||||
uuidRegex := regexp.MustCompile(`^[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{12}$`)
|
||||
if uuidRegex.MatchString(value) {
|
||||
return true
|
||||
}
|
||||
|
||||
// Long random string (likely a token)
|
||||
if len(value) > 20 && regexp.MustCompile(`^[a-zA-Z0-9_\-]+$`).MatchString(value) {
|
||||
return true
|
||||
}
|
||||
|
||||
// Contains words like secret, token, key, password
|
||||
lowerValue := strings.ToLower(value)
|
||||
if strings.Contains(lowerValue, "secret") || strings.Contains(lowerValue, "token") ||
|
||||
strings.Contains(lowerValue, "key") || strings.Contains(lowerValue, "password") {
|
||||
return true
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
// ResolveNodeCredentials resolves all credentials in a node configuration
|
||||
func (cr *CredentialResolver) ResolveNodeCredentials(node interface{}, nodeName string) error {
|
||||
switch n := node.(type) {
|
||||
case *PVEInstance:
|
||||
var err error
|
||||
n.Password, err = cr.ResolveValue(n.Password, fmt.Sprintf("%s.password", nodeName))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
n.TokenValue, err = cr.ResolveValue(n.TokenValue, fmt.Sprintf("%s.token_value", nodeName))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
case *PBSInstance:
|
||||
var err error
|
||||
n.Password, err = cr.ResolveValue(n.Password, fmt.Sprintf("%s.password", nodeName))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
n.TokenValue, err = cr.ResolveValue(n.TokenValue, fmt.Sprintf("%s.token_value", nodeName))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
@ -1,52 +0,0 @@
|
|||
package config
|
||||
|
||||
import (
|
||||
"os"
|
||||
"strconv"
|
||||
"time"
|
||||
)
|
||||
|
||||
// RegistrationConfig holds registration token configuration
|
||||
type RegistrationConfig struct {
|
||||
RequireToken bool
|
||||
DefaultValidity time.Duration
|
||||
DefaultMaxUses int
|
||||
AllowUnprotected bool
|
||||
}
|
||||
|
||||
// GetRegistrationConfig returns the registration configuration from environment
|
||||
func GetRegistrationConfig() RegistrationConfig {
|
||||
config := RegistrationConfig{
|
||||
RequireToken: false,
|
||||
DefaultValidity: 15 * time.Minute,
|
||||
DefaultMaxUses: 1,
|
||||
AllowUnprotected: true,
|
||||
}
|
||||
|
||||
// Check if registration tokens are required
|
||||
if os.Getenv("REQUIRE_REGISTRATION_TOKEN") == "true" {
|
||||
config.RequireToken = true
|
||||
config.AllowUnprotected = false
|
||||
}
|
||||
|
||||
// Allow explicitly disabling protection for homelab use
|
||||
if os.Getenv("ALLOW_UNPROTECTED_AUTO_REGISTER") == "true" {
|
||||
config.AllowUnprotected = true
|
||||
}
|
||||
|
||||
// Set default validity from environment
|
||||
if validityStr := os.Getenv("REGISTRATION_TOKEN_DEFAULT_VALIDITY"); validityStr != "" {
|
||||
if validitySec, err := strconv.Atoi(validityStr); err == nil {
|
||||
config.DefaultValidity = time.Duration(validitySec) * time.Second
|
||||
}
|
||||
}
|
||||
|
||||
// Set default max uses from environment
|
||||
if maxUsesStr := os.Getenv("REGISTRATION_TOKEN_DEFAULT_MAX_USES"); maxUsesStr != "" {
|
||||
if maxUses, err := strconv.Atoi(maxUsesStr); err == nil {
|
||||
config.DefaultMaxUses = maxUses
|
||||
}
|
||||
}
|
||||
|
||||
return config
|
||||
}
|
||||
|
|
@ -255,11 +255,6 @@ func GetRequestID(ctx context.Context) string {
|
|||
return ""
|
||||
}
|
||||
|
||||
// NewRequestID creates a new UUID string.
|
||||
func NewRequestID() string {
|
||||
return uuid.NewString()
|
||||
}
|
||||
|
||||
func collectOptions(opts ...Option) options {
|
||||
cfg := options{}
|
||||
for _, opt := range opts {
|
||||
|
|
|
|||
|
|
@ -1,15 +1,10 @@
|
|||
package monitoring
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/rcourtman/pulse-go-rewrite/internal/errors"
|
||||
"github.com/rcourtman/pulse-go-rewrite/internal/logging"
|
||||
"github.com/rcourtman/pulse-go-rewrite/pkg/pbs"
|
||||
"github.com/rcourtman/pulse-go-rewrite/pkg/pmg"
|
||||
"github.com/rs/zerolog"
|
||||
"github.com/rs/zerolog/log"
|
||||
)
|
||||
|
||||
// PollResult represents the result of a polling operation
|
||||
|
|
@ -30,256 +25,3 @@ type PollTask struct {
|
|||
PBSClient *pbs.Client
|
||||
PMGClient *pmg.Client
|
||||
}
|
||||
|
||||
// PollerPool manages concurrent polling with channels
|
||||
type PollerPool struct {
|
||||
workers int
|
||||
tasksChan chan PollTask
|
||||
resultsChan chan PollResult
|
||||
monitor *Monitor
|
||||
done chan struct{}
|
||||
closed bool
|
||||
}
|
||||
|
||||
// NewPollerPool creates a new poller pool
|
||||
func NewPollerPool(workers int, monitor *Monitor) *PollerPool {
|
||||
return &PollerPool{
|
||||
workers: workers,
|
||||
tasksChan: make(chan PollTask, workers*2), // Buffer for smooth operation
|
||||
resultsChan: make(chan PollResult, workers*2),
|
||||
monitor: monitor,
|
||||
done: make(chan struct{}),
|
||||
closed: false,
|
||||
}
|
||||
}
|
||||
|
||||
// Start starts the worker pool
|
||||
func (p *PollerPool) Start(ctx context.Context) {
|
||||
// Start workers
|
||||
for i := 0; i < p.workers; i++ {
|
||||
go p.worker(ctx, i)
|
||||
}
|
||||
|
||||
// Start result collector
|
||||
go p.collectResults(ctx)
|
||||
}
|
||||
|
||||
// worker processes polling tasks
|
||||
func (p *PollerPool) worker(ctx context.Context, id int) {
|
||||
if logging.IsLevelEnabled(zerolog.DebugLevel) {
|
||||
log.Debug().Int("worker", id).Msg("Poller worker started")
|
||||
}
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
if logging.IsLevelEnabled(zerolog.DebugLevel) {
|
||||
log.Debug().Int("worker", id).Msg("Poller worker stopped")
|
||||
}
|
||||
return
|
||||
case task, ok := <-p.tasksChan:
|
||||
if !ok {
|
||||
if logging.IsLevelEnabled(zerolog.DebugLevel) {
|
||||
log.Debug().Int("worker", id).Msg("Task channel closed, worker stopping")
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
result := p.executeTask(ctx, task)
|
||||
|
||||
// Send result if context is still active and channel is open
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
default:
|
||||
// Use non-blocking send to avoid panic if channel is closed
|
||||
select {
|
||||
case p.resultsChan <- result:
|
||||
case <-ctx.Done():
|
||||
return
|
||||
default:
|
||||
// Channel might be closed, just continue
|
||||
if logging.IsLevelEnabled(zerolog.DebugLevel) {
|
||||
log.Debug().Int("worker", id).Msg("Results channel appears closed, skipping result")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// executeTask executes a single polling task
|
||||
func (p *PollerPool) executeTask(ctx context.Context, task PollTask) PollResult {
|
||||
result := PollResult{
|
||||
InstanceName: task.InstanceName,
|
||||
InstanceType: task.InstanceType,
|
||||
StartTime: time.Now(),
|
||||
Success: true,
|
||||
}
|
||||
|
||||
switch task.InstanceType {
|
||||
case "pve":
|
||||
if task.PVEClient != nil {
|
||||
p.monitor.pollPVEInstance(ctx, task.InstanceName, task.PVEClient)
|
||||
} else {
|
||||
result.Success = false
|
||||
result.Error = errors.NewMonitorError(errors.ErrorTypeInternal, "poll_pve", task.InstanceName, errors.ErrInvalidInput)
|
||||
}
|
||||
case "pbs":
|
||||
if task.PBSClient != nil {
|
||||
p.monitor.pollPBSInstance(ctx, task.InstanceName, task.PBSClient)
|
||||
} else {
|
||||
result.Success = false
|
||||
result.Error = errors.NewMonitorError(errors.ErrorTypeInternal, "poll_pbs", task.InstanceName, errors.ErrInvalidInput)
|
||||
}
|
||||
case "pmg":
|
||||
if task.PMGClient != nil {
|
||||
p.monitor.pollPMGInstance(ctx, task.InstanceName, task.PMGClient)
|
||||
} else {
|
||||
result.Success = false
|
||||
result.Error = errors.NewMonitorError(errors.ErrorTypeInternal, "poll_pmg", task.InstanceName, errors.ErrInvalidInput)
|
||||
}
|
||||
default:
|
||||
result.Success = false
|
||||
result.Error = errors.NewMonitorError(errors.ErrorTypeValidation, "poll_unknown", task.InstanceName, errors.ErrInvalidInput)
|
||||
}
|
||||
|
||||
result.EndTime = time.Now()
|
||||
return result
|
||||
}
|
||||
|
||||
// collectResults collects polling results
|
||||
func (p *PollerPool) collectResults(ctx context.Context) {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case result, ok := <-p.resultsChan:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
duration := result.EndTime.Sub(result.StartTime)
|
||||
if result.Success {
|
||||
log.Debug().
|
||||
Str("instance", result.InstanceName).
|
||||
Str("type", result.InstanceType).
|
||||
Dur("duration", duration).
|
||||
Msg("Polling completed successfully")
|
||||
} else {
|
||||
log.Error().
|
||||
Err(result.Error).
|
||||
Str("instance", result.InstanceName).
|
||||
Str("type", result.InstanceType).
|
||||
Dur("duration", duration).
|
||||
Msg("Polling failed; request will be retried on next cycle")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// SubmitTask submits a polling task
|
||||
func (p *PollerPool) SubmitTask(ctx context.Context, task PollTask) error {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case p.tasksChan <- task:
|
||||
return nil
|
||||
default:
|
||||
// Channel is full
|
||||
return errors.NewMonitorError(errors.ErrorTypeInternal, "submit_task", task.InstanceName, errors.ErrTimeout)
|
||||
}
|
||||
}
|
||||
|
||||
// Close closes the poller pool
|
||||
func (p *PollerPool) Close() {
|
||||
if p.closed {
|
||||
return
|
||||
}
|
||||
p.closed = true
|
||||
|
||||
// Signal shutdown
|
||||
close(p.done)
|
||||
|
||||
// Close task channel to signal workers to stop
|
||||
close(p.tasksChan)
|
||||
|
||||
// Don't close resultsChan here - let it drain naturally
|
||||
// The collectors will exit when context is done
|
||||
}
|
||||
|
||||
// pollWithChannels implements channel-based concurrent polling
|
||||
func (m *Monitor) pollWithChannels(ctx context.Context) {
|
||||
// Create worker pool based on instance count
|
||||
workerCount := len(m.pveClients) + len(m.pbsClients) + len(m.pmgClients)
|
||||
if workerCount > 10 {
|
||||
workerCount = 10 // Cap at 10 workers
|
||||
}
|
||||
if workerCount < 2 {
|
||||
workerCount = 2 // Minimum 2 workers
|
||||
}
|
||||
|
||||
pool := NewPollerPool(workerCount, m)
|
||||
|
||||
// Create a context with timeout for this polling cycle
|
||||
// Hardcoded to 10s minus 200ms (matches polling interval)
|
||||
timeout := 10*time.Second - 200*time.Millisecond
|
||||
pollCtx, cancel := context.WithTimeout(ctx, timeout)
|
||||
defer cancel()
|
||||
|
||||
// Start the pool
|
||||
pool.Start(pollCtx)
|
||||
|
||||
// Submit all tasks
|
||||
var taskCount int
|
||||
|
||||
// Submit PVE tasks
|
||||
for name, client := range m.pveClients {
|
||||
task := PollTask{
|
||||
InstanceName: name,
|
||||
InstanceType: "pve",
|
||||
PVEClient: client,
|
||||
}
|
||||
if err := pool.SubmitTask(pollCtx, task); err != nil {
|
||||
log.Error().Err(err).Str("instance", name).Msg("Failed to submit PVE polling task")
|
||||
} else {
|
||||
taskCount++
|
||||
}
|
||||
}
|
||||
|
||||
// Submit PBS tasks
|
||||
for name, client := range m.pbsClients {
|
||||
task := PollTask{
|
||||
InstanceName: name,
|
||||
InstanceType: "pbs",
|
||||
PBSClient: client,
|
||||
}
|
||||
if err := pool.SubmitTask(pollCtx, task); err != nil {
|
||||
log.Error().Err(err).Str("instance", name).Msg("Failed to submit PBS polling task")
|
||||
} else {
|
||||
taskCount++
|
||||
}
|
||||
}
|
||||
|
||||
// Submit PMG tasks
|
||||
for name, client := range m.pmgClients {
|
||||
task := PollTask{
|
||||
InstanceName: name,
|
||||
InstanceType: "pmg",
|
||||
PMGClient: client,
|
||||
}
|
||||
if err := pool.SubmitTask(pollCtx, task); err != nil {
|
||||
log.Error().Err(err).Str("instance", name).Msg("Failed to submit PMG polling task")
|
||||
} else {
|
||||
taskCount++
|
||||
}
|
||||
}
|
||||
|
||||
// Wait for all tasks to complete or timeout
|
||||
<-pollCtx.Done()
|
||||
|
||||
// Clean up
|
||||
pool.Close()
|
||||
|
||||
log.Debug().Int("tasks", taskCount).Msg("Channel-based polling cycle completed")
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue