mirror of
https://github.com/rcourtman/Pulse.git
synced 2026-05-20 01:01:20 +00:00
Remove local upgrade-metrics API registration, settings payload wiring, startup store migration, and backend conversion recorder hooks from the normal product runtime. Delete the retired conversion/funnel and metering packages from compiled licensing code, and extend diagnostics boundary audits and governance contracts so maintainer commercial analytics cannot return through Settings or diagnostics.
944 lines
31 KiB
Go
944 lines
31 KiB
Go
package server
|
|
|
|
import (
|
|
"context"
|
|
"crypto/subtle"
|
|
"crypto/tls"
|
|
"encoding/base64"
|
|
"fmt"
|
|
"net"
|
|
"net/http"
|
|
"os"
|
|
"os/signal"
|
|
"path/filepath"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"syscall"
|
|
"time"
|
|
|
|
"github.com/prometheus/client_golang/prometheus/promhttp"
|
|
"github.com/rcourtman/pulse-go-rewrite/internal/alerts"
|
|
"github.com/rcourtman/pulse-go-rewrite/internal/api"
|
|
"github.com/rcourtman/pulse-go-rewrite/internal/config"
|
|
"github.com/rcourtman/pulse-go-rewrite/internal/crypto"
|
|
"github.com/rcourtman/pulse-go-rewrite/internal/hosted"
|
|
"github.com/rcourtman/pulse-go-rewrite/internal/logging"
|
|
"github.com/rcourtman/pulse-go-rewrite/internal/mock"
|
|
"github.com/rcourtman/pulse-go-rewrite/internal/models"
|
|
"github.com/rcourtman/pulse-go-rewrite/internal/monitoring"
|
|
"github.com/rcourtman/pulse-go-rewrite/internal/telemetry"
|
|
|
|
"github.com/rcourtman/pulse-go-rewrite/internal/websocket"
|
|
"github.com/rcourtman/pulse-go-rewrite/pkg/aicontracts"
|
|
"github.com/rcourtman/pulse-go-rewrite/pkg/audit"
|
|
"github.com/rcourtman/pulse-go-rewrite/pkg/auth"
|
|
"github.com/rcourtman/pulse-go-rewrite/pkg/extensions"
|
|
pkglicensing "github.com/rcourtman/pulse-go-rewrite/pkg/licensing"
|
|
"github.com/rcourtman/pulse-go-rewrite/pkg/metrics"
|
|
"github.com/rcourtman/pulse-go-rewrite/pkg/reporting"
|
|
"github.com/rs/zerolog/log"
|
|
)
|
|
|
|
// Version information
|
|
var (
|
|
MetricsPort = 9091
|
|
)
|
|
|
|
// BusinessHooks allows enterprise features to hook into the server lifecycle.
|
|
type BusinessHooks struct {
|
|
// OnMetricsStoreReady is called when the metrics store is initialized.
|
|
// This allows enterprise features to access metrics for reporting.
|
|
OnMetricsStoreReady func(store *metrics.Store)
|
|
|
|
// BindRBACAdminEndpoints allows enterprise modules to replace or decorate
|
|
// RBAC admin endpoints without importing internal API packages.
|
|
BindRBACAdminEndpoints extensions.BindRBACAdminEndpointsFunc
|
|
|
|
// BindAuditAdminEndpoints allows enterprise modules to replace or decorate
|
|
// audit admin endpoints without importing internal API packages.
|
|
BindAuditAdminEndpoints extensions.BindAuditAdminEndpointsFunc
|
|
|
|
// BindSSOAdminEndpoints allows enterprise modules to replace or decorate
|
|
// SSO admin endpoints without importing internal API packages.
|
|
BindSSOAdminEndpoints extensions.BindSSOAdminEndpointsFunc
|
|
|
|
// BindReportingAdminEndpoints allows enterprise modules to replace or decorate
|
|
// reporting admin endpoints without importing internal API packages.
|
|
BindReportingAdminEndpoints extensions.BindReportingAdminEndpointsFunc
|
|
|
|
// BindAIAutoFixEndpoints allows enterprise modules to replace or decorate
|
|
// safe remediation endpoints (investigation, remediation, autonomy, fix execution).
|
|
BindAIAutoFixEndpoints extensions.BindAIAutoFixEndpointsFunc
|
|
|
|
// BindAIAlertAnalysisEndpoints allows enterprise modules to replace or decorate
|
|
// AI alert analysis endpoints (alert investigation, Kubernetes analysis).
|
|
BindAIAlertAnalysisEndpoints extensions.BindAIAlertAnalysisEndpointsFunc
|
|
|
|
// AIInvestigationEnabled controls whether premium AI investigation and
|
|
// remediation components are created at runtime. When nil or returns false,
|
|
// patrol runs in monitor-only mode (findings reported but never investigated,
|
|
// no remediation plans generated). Enterprise sets this to return true.
|
|
AIInvestigationEnabled func() bool
|
|
|
|
// CreateRemediationEngine creates the premium remediation engine.
|
|
// Returns nil in OSS (no implementation available). Enterprise provides
|
|
// the concrete engine implementation wrapped behind the interface.
|
|
CreateRemediationEngine func(cfg aicontracts.EngineConfig) aicontracts.RemediationEngine
|
|
|
|
// CreateInvestigationStore creates the premium investigation session store.
|
|
// Returns nil in OSS. Enterprise provides a concrete implementation.
|
|
CreateInvestigationStore func(dataDir string) aicontracts.InvestigationStore
|
|
|
|
// CreateInvestigationOrchestrator creates the premium investigation orchestrator.
|
|
// Returns nil in OSS. Enterprise provides a concrete implementation.
|
|
CreateInvestigationOrchestrator func(deps aicontracts.OrchestratorDeps) aicontracts.InvestigationOrchestrator
|
|
|
|
// CreateAlertAnalyzer creates the premium alert-triggered analyzer.
|
|
// Returns nil in OSS. Enterprise provides a concrete implementation.
|
|
CreateAlertAnalyzer func(deps aicontracts.AlertAnalyzerDeps) aicontracts.AlertAnalyzer
|
|
|
|
// ResolveMonitoredSystemAdmissionPolicy allows private builds to own the
|
|
// commercial monitored-system admission decision without importing internal
|
|
// API packages. Public runtime still owns counted-system projection.
|
|
ResolveMonitoredSystemAdmissionPolicy extensions.ResolveMonitoredSystemAdmissionPolicyFunc
|
|
}
|
|
|
|
var (
|
|
globalHooks BusinessHooks
|
|
globalHooksMu sync.Mutex
|
|
)
|
|
|
|
// SetBusinessHooks registers hooks for the server.
|
|
func SetBusinessHooks(h BusinessHooks) {
|
|
globalHooksMu.Lock()
|
|
defer globalHooksMu.Unlock()
|
|
globalHooks = h
|
|
}
|
|
|
|
// Run starts the Pulse monitoring server.
|
|
func Run(ctx context.Context, version string) error {
|
|
// Initialize logger with baseline defaults for early startup logs
|
|
logging.Init(logging.Config{
|
|
Format: "auto",
|
|
Level: "info",
|
|
Component: "pulse",
|
|
})
|
|
defer logging.Shutdown()
|
|
|
|
if err := api.ValidateTrustedProxyCIDRsFromEnv(); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Check for auto-import on first startup
|
|
if ShouldAutoImport() {
|
|
if err := PerformAutoImport(); err != nil {
|
|
log.Error().Err(err).Msg("Auto-import failed, continuing with normal startup")
|
|
}
|
|
}
|
|
|
|
// Load unified configuration
|
|
cfg, err := config.Load()
|
|
if err != nil {
|
|
return fmt.Errorf("failed to load configuration: %w", err)
|
|
}
|
|
|
|
// Re-initialize logging with configuration-driven settings
|
|
logging.Init(logging.Config{
|
|
Format: cfg.LogFormat,
|
|
Level: cfg.LogLevel,
|
|
Component: "pulse",
|
|
FilePath: cfg.LogFile,
|
|
MaxSizeMB: cfg.LogMaxSize,
|
|
MaxAgeDays: cfg.LogMaxAge,
|
|
Compress: cfg.LogCompress,
|
|
})
|
|
|
|
// Initialize license public key for Pro feature validation
|
|
pkglicensing.InitEmbeddedPublicKey()
|
|
|
|
// Multi-tenant persistence is the canonical way to resolve the base data directory.
|
|
// It uses cfg.DataPath, which already includes PULSE_DATA_DIR overrides.
|
|
mtPersistence := config.NewMultiTenantPersistence(cfg.DataPath)
|
|
baseDataDir := mtPersistence.BaseDataDir()
|
|
|
|
// Initialize RBAC manager for role-based access control
|
|
rbacManager, err := auth.NewFileManager(baseDataDir)
|
|
if err != nil {
|
|
log.Warn().Err(err).Msg("Failed to initialize RBAC manager, role management will be unavailable")
|
|
} else {
|
|
auth.SetManager(rbacManager)
|
|
log.Info().Msg("RBAC manager initialized")
|
|
}
|
|
|
|
// Run multi-tenant data migration only when the feature is explicitly enabled.
|
|
// This prevents any on-disk layout changes for default (single-tenant) users.
|
|
if api.IsMultiTenantEnabled() {
|
|
if err := config.RunMigrationIfNeeded(baseDataDir); err != nil {
|
|
log.Error().Err(err).Msg("Multi-tenant data migration failed")
|
|
// Continue anyway - migration failure shouldn't block startup
|
|
}
|
|
}
|
|
if err := ensureDefaultOrgOwnerMembership(mtPersistence, cfg.AuthUser); err != nil {
|
|
log.Warn().Err(err).Msg("Failed to ensure default organization owner membership")
|
|
}
|
|
|
|
// Always capture audit events to SQLite (defense in depth). Read/export endpoints are license-gated.
|
|
// For the default org, TenantLoggerManager routes to the global logger, so initialize it as SQLite too.
|
|
var globalCrypto audit.CryptoEncryptor
|
|
if cm, err := crypto.NewCryptoManagerAt(baseDataDir); err != nil {
|
|
log.Warn().Err(err).Str("data_dir", baseDataDir).Msg("Failed to initialize crypto manager for audit signing; signatures will be disabled")
|
|
} else {
|
|
globalCrypto = cm
|
|
if sqliteLogger, err := audit.NewSQLiteLogger(audit.SQLiteLoggerConfig{
|
|
DataDir: baseDataDir,
|
|
CryptoMgr: cm,
|
|
}); err != nil {
|
|
log.Warn().Err(err).Str("data_dir", baseDataDir).Msg("Failed to initialize global SQLite audit logger; falling back to console logger")
|
|
} else {
|
|
audit.SetLogger(sqliteLogger)
|
|
}
|
|
}
|
|
|
|
// Initialize tenant audit manager for per-tenant audit logging
|
|
tenantAuditManager := audit.NewTenantLoggerManager(baseDataDir, &audit.SQLiteLoggerFactory{
|
|
// Prefer per-tenant crypto managers so each org has its own .encryption.key.
|
|
CryptoMgrForDataDir: func(dataDir string) (audit.CryptoEncryptor, error) {
|
|
return crypto.NewCryptoManagerAt(dataDir)
|
|
},
|
|
// Fallback for environments where per-tenant crypto initialization fails.
|
|
CryptoMgr: globalCrypto,
|
|
})
|
|
api.SetTenantAuditManager(tenantAuditManager)
|
|
log.Info().Msg("Tenant audit manager initialized")
|
|
|
|
// Enable async audit logging to avoid request latency on audit writes.
|
|
if !strings.EqualFold(os.Getenv("PULSE_AUDIT_ASYNC"), "false") {
|
|
audit.EnableAsyncLogging(audit.AsyncLoggerConfig{BufferSize: 4096})
|
|
log.Info().Msg("Async audit logging enabled")
|
|
}
|
|
|
|
log.Info().Msg("Starting Pulse monitoring server")
|
|
|
|
// TODO: Validate agent binaries are available for download
|
|
// updates.EnsureHostAgentBinaries(version)
|
|
|
|
// Create derived context that cancels on interrupt
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
defer cancel()
|
|
|
|
// Metrics port is configurable via MetricsPort variable
|
|
metricsAddr := fmt.Sprintf("%s:%d", cfg.BindAddress, MetricsPort)
|
|
startMetricsServer(ctx, metricsAddr, cfg.MetricsToken)
|
|
|
|
// Initialize WebSocket hub first
|
|
wsHub := websocket.NewHub(nil)
|
|
// Gate X-Forwarded-* trust in checkOrigin on the same trusted-proxy list
|
|
// used by the main API's auth.go / security.go.
|
|
wsHub.SetTrustedProxyChecker(api.IsTrustedProxyIP)
|
|
// Set allowed origins from configuration
|
|
if cfg.AllowedOrigins != "" {
|
|
if cfg.AllowedOrigins == "*" {
|
|
// Explicit wildcard - allow all origins (less secure)
|
|
wsHub.SetAllowedOrigins([]string{"*"})
|
|
} else {
|
|
// Use configured origins
|
|
wsHub.SetAllowedOrigins(strings.Split(cfg.AllowedOrigins, ","))
|
|
}
|
|
} else {
|
|
// Default: don't set any specific origins
|
|
wsHub.SetAllowedOrigins([]string{})
|
|
}
|
|
go wsHub.Run()
|
|
defer wsHub.Stop()
|
|
|
|
// Initialize reloadable monitoring system
|
|
reloadableMonitor, err := monitoring.NewReloadableMonitor(cfg, mtPersistence, wsHub)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to initialize monitoring system: %w", err)
|
|
}
|
|
|
|
// Trigger enterprise hooks if registered
|
|
globalHooksMu.Lock()
|
|
onMetricsStoreReady := globalHooks.OnMetricsStoreReady
|
|
bindRBACAdminEndpoints := globalHooks.BindRBACAdminEndpoints
|
|
bindAuditAdminEndpoints := globalHooks.BindAuditAdminEndpoints
|
|
bindSSOAdminEndpoints := globalHooks.BindSSOAdminEndpoints
|
|
bindReportingAdminEndpoints := globalHooks.BindReportingAdminEndpoints
|
|
bindAIAutoFixEndpoints := globalHooks.BindAIAutoFixEndpoints
|
|
bindAIAlertAnalysisEndpoints := globalHooks.BindAIAlertAnalysisEndpoints
|
|
aiInvestigationEnabled := globalHooks.AIInvestigationEnabled
|
|
createRemediationEngine := globalHooks.CreateRemediationEngine
|
|
createInvestigationStore := globalHooks.CreateInvestigationStore
|
|
createInvestigationOrchestrator := globalHooks.CreateInvestigationOrchestrator
|
|
createAlertAnalyzer := globalHooks.CreateAlertAnalyzer
|
|
resolveMonitoredSystemAdmissionPolicy := globalHooks.ResolveMonitoredSystemAdmissionPolicy
|
|
globalHooksMu.Unlock()
|
|
|
|
api.SetAIInvestigationEnabled(aiInvestigationEnabled)
|
|
api.SetCreateRemediationEngine(createRemediationEngine)
|
|
api.SetCreateInvestigationStore(createInvestigationStore)
|
|
api.SetCreateInvestigationOrchestrator(createInvestigationOrchestrator)
|
|
api.SetCreateAlertAnalyzer(createAlertAnalyzer)
|
|
api.SetResolveMonitoredSystemAdmissionPolicy(resolveMonitoredSystemAdmissionPolicy)
|
|
api.SetRBACAdminEndpointsBinder(bindRBACAdminEndpoints)
|
|
api.SetAuditAdminEndpointsBinder(bindAuditAdminEndpoints)
|
|
api.SetSSOAdminEndpointsBinder(bindSSOAdminEndpoints)
|
|
api.SetReportingAdminEndpointsBinder(bindReportingAdminEndpoints)
|
|
api.SetAIAutoFixEndpointsBinder(bindAIAutoFixEndpoints)
|
|
api.SetAIAlertAnalysisEndpointsBinder(bindAIAlertAnalysisEndpoints)
|
|
|
|
if onMetricsStoreReady != nil {
|
|
func() {
|
|
defer func() {
|
|
if r := recover(); r != nil {
|
|
log.Error().Interface("panic", r).Msg("Enterprise OnMetricsStoreReady hook panicked")
|
|
}
|
|
}()
|
|
if store := reloadableMonitor.GetMonitor().GetMetricsStore(); store != nil {
|
|
onMetricsStoreReady(store)
|
|
}
|
|
}()
|
|
}
|
|
|
|
// Initialize reporting engine if not already set by enterprise hooks.
|
|
// This ensures Pro license holders get reporting even with the standard binary.
|
|
// Uses a dynamic store getter so the engine always queries the current monitor's
|
|
// metrics store, even after monitor reloads (which close and recreate the store).
|
|
if reporting.GetEngine() == nil {
|
|
if store := reloadableMonitor.GetMonitor().GetMetricsStore(); store != nil {
|
|
engine := reporting.NewReportEngine(reporting.EngineConfig{
|
|
MetricsStoreGetter: func() *metrics.Store {
|
|
return reloadableMonitor.GetMonitor().GetMetricsStore()
|
|
},
|
|
})
|
|
reporting.SetEngine(engine)
|
|
log.Info().Msg("Advanced Infrastructure Reporting (PDF/CSV) initialized")
|
|
}
|
|
}
|
|
|
|
// WebSocket state ownership is tenant-aware by construction, including default-org state.
|
|
wsHub.SetStateGetter(func(orgID string) interface{} {
|
|
if orgID == "" || orgID == "default" {
|
|
return reloadableMonitor.GetMonitor().BuildFrontendState()
|
|
}
|
|
|
|
mtMonitor := reloadableMonitor.GetMultiTenantMonitor()
|
|
if mtMonitor == nil {
|
|
// Security: never expose default-org state to non-default org clients.
|
|
log.Warn().Str("org_id", orgID).Msg("Tenant monitor unavailable for org state request")
|
|
return models.EmptyStateFrontend()
|
|
}
|
|
monitor, err := mtMonitor.GetMonitor(orgID)
|
|
if err != nil || monitor == nil {
|
|
log.Warn().Err(err).Str("org_id", orgID).Msg("Failed to get tenant monitor for org state request")
|
|
return models.EmptyStateFrontend()
|
|
}
|
|
return monitor.BuildFrontendState()
|
|
})
|
|
|
|
// Set org authorization checker for WebSocket connections
|
|
// This ensures clients can only subscribe to orgs they have access to
|
|
orgLoader := api.NewMultiTenantOrganizationLoader(mtPersistence)
|
|
wsHub.SetOrgAuthChecker(api.NewAuthorizationChecker(orgLoader))
|
|
|
|
// Set multi-tenant checker for WebSocket connections
|
|
// This ensures the feature flag and license are checked before allowing non-default org connections
|
|
hostedMode := os.Getenv("PULSE_HOSTED_MODE") == "true"
|
|
wsHub.SetMultiTenantChecker(api.NewMultiTenantChecker(hostedMode))
|
|
|
|
// Wire up Prometheus metrics for alert lifecycle
|
|
alerts.SetMetricHooks(
|
|
metrics.RecordAlertFired,
|
|
metrics.RecordAlertResolved,
|
|
metrics.RecordAlertSuppressed,
|
|
metrics.RecordAlertAcknowledged,
|
|
)
|
|
log.Info().Msg("Alert metrics hooks registered")
|
|
|
|
// Start monitoring
|
|
reloadableMonitor.Start(ctx)
|
|
|
|
// Initialize API server with reload function
|
|
var router *api.Router
|
|
reloadFunc := func() error {
|
|
if err := reloadableMonitor.Reload(); err != nil {
|
|
return err
|
|
}
|
|
if router != nil {
|
|
router.SetMonitor(reloadableMonitor.GetMonitor())
|
|
router.SetMultiTenantMonitor(reloadableMonitor.GetMultiTenantMonitor())
|
|
if cfg := reloadableMonitor.GetConfig(); cfg != nil {
|
|
router.SetConfig(cfg)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
router = api.NewRouter(cfg, reloadableMonitor.GetMonitor(), reloadableMonitor.GetMultiTenantMonitor(), wsHub, reloadFunc, version)
|
|
|
|
// Inject resource store into monitor for WebSocket broadcasts
|
|
router.SetMonitor(reloadableMonitor.GetMonitor())
|
|
// Wire multi-tenant monitor to resource handlers for tenant-aware state
|
|
router.SetMultiTenantMonitor(reloadableMonitor.GetMultiTenantMonitor())
|
|
|
|
// Start AI patrol service for background infrastructure monitoring
|
|
router.StartPatrol(ctx)
|
|
|
|
// Start AI chat service
|
|
router.StartAIChat(ctx)
|
|
|
|
// Start hosted tenant reaper for automatic soft-delete cleanup
|
|
if os.Getenv("PULSE_HOSTED_MODE") == "true" {
|
|
reaper := hosted.NewReaper(mtPersistence, mtPersistence, 5*time.Minute, true)
|
|
reaper.OnBeforeDelete = func(orgID string) error {
|
|
return router.CleanupTenant(ctx, orgID)
|
|
}
|
|
go reaper.Run(ctx)
|
|
log.Info().Msg("Hosted tenant reaper started")
|
|
}
|
|
|
|
// Start relay client for mobile remote access
|
|
router.StartRelay(ctx)
|
|
|
|
// Wire alert-triggered AI analysis
|
|
router.WireAlertTriggeredAI()
|
|
|
|
// Start anonymous telemetry (enabled by default; opt out via PULSE_TELEMETRY=false or Settings toggle).
|
|
// Persistence is created once here (outside the closure) to avoid NewConfigPersistence's
|
|
// fatal-on-error path running inside the telemetry goroutine.
|
|
isDocker := os.Getenv("PULSE_DOCKER") == "true"
|
|
telemetryPersistence := config.NewConfigPersistence(baseDataDir)
|
|
telemetryCfg := telemetry.Config{
|
|
Version: version,
|
|
DataDir: baseDataDir,
|
|
IsDocker: isDocker,
|
|
Enabled: cfg.TelemetryEnabled,
|
|
GetSnapshot: func() telemetry.Snapshot {
|
|
// Use the latest config (may have been swapped by a reload).
|
|
currentCfg := cfg
|
|
if reloaded := reloadableMonitor.GetConfig(); reloaded != nil {
|
|
currentCfg = reloaded
|
|
}
|
|
|
|
snap := telemetry.Snapshot{
|
|
MultiTenant: currentCfg.MultiTenantEnabled,
|
|
HasAPITokens: currentCfg.HasAPITokens(),
|
|
}
|
|
|
|
// Resource counts come from the tenant-aware monitor aggregate, not the
|
|
// default-org compatibility shim.
|
|
counts := reloadableMonitor.AggregateInstallSnapshotCounts()
|
|
snap.PVENodes = counts.PVENodes
|
|
snap.PBSInstances = counts.PBSInstances
|
|
snap.PMGInstances = counts.PMGInstances
|
|
snap.VMs = counts.VMs
|
|
snap.Containers = counts.Containers
|
|
snap.DockerHosts = counts.DockerHosts
|
|
snap.KubernetesClusters = counts.KubernetesClusters
|
|
snap.ActiveAlerts = counts.ActiveAlerts
|
|
|
|
// Feature flags from persisted config (using pre-created persistence).
|
|
if aiCfg, err := telemetryPersistence.LoadAIConfig(); err == nil && aiCfg != nil {
|
|
snap.AIEnabled = aiCfg.Enabled
|
|
}
|
|
if relayCfg, err := telemetryPersistence.LoadRelayConfig(); err == nil {
|
|
snap.RelayEnabled = relayCfg.Enabled
|
|
}
|
|
|
|
// SSO/OIDC status.
|
|
if ssoCfg, err := telemetryPersistence.LoadSSOConfig(); err == nil && ssoCfg != nil {
|
|
snap.SSOEnabled = ssoCfg.HasEnabledProviders()
|
|
}
|
|
|
|
// Coarse commercial posture only; telemetry does not send exact tiers.
|
|
if router != nil && router.GetLicenseHandlers() != nil {
|
|
if svc := router.GetLicenseHandlers().Service(context.Background()); svc != nil {
|
|
if lic := svc.Current(); lic != nil {
|
|
snap.PaidLicense = lic.Claims.Tier != pkglicensing.TierFree
|
|
}
|
|
}
|
|
}
|
|
|
|
return snap
|
|
},
|
|
}
|
|
telemetry.Start(ctx, telemetryCfg)
|
|
defer telemetry.Stop()
|
|
|
|
// Wire live telemetry toggle so Settings changes take effect immediately.
|
|
router.SetTelemetryToggleFunc(func(enabled bool) {
|
|
if enabled {
|
|
telemetryCfg.Enabled = true
|
|
telemetry.Start(ctx, telemetryCfg)
|
|
log.Info().Msg("Telemetry re-enabled via settings (live toggle)")
|
|
} else {
|
|
telemetryCfg.Enabled = false
|
|
telemetry.Stop()
|
|
log.Info().Msg("Telemetry disabled via settings (live toggle)")
|
|
}
|
|
})
|
|
router.SetTelemetryPreviewFunc(func() (telemetry.Ping, error) {
|
|
return telemetry.BuildPreview(telemetryCfg)
|
|
})
|
|
router.SetTelemetryResetFunc(func() (telemetry.Ping, error) {
|
|
if _, err := telemetry.ResetInstallID(baseDataDir); err != nil {
|
|
return telemetry.Ping{}, err
|
|
}
|
|
if telemetryCfg.Enabled {
|
|
telemetry.Start(ctx, telemetryCfg)
|
|
}
|
|
return telemetry.BuildPreview(telemetryCfg)
|
|
})
|
|
|
|
// Create HTTP server with unified configuration
|
|
srv := &http.Server{
|
|
Addr: fmt.Sprintf("%s:%d", cfg.BindAddress, cfg.FrontendPort),
|
|
Handler: router.Handler(),
|
|
ReadHeaderTimeout: 15 * time.Second,
|
|
WriteTimeout: 0, // Disabled to support SSE/streaming
|
|
IdleTimeout: 120 * time.Second,
|
|
TLSConfig: &tls.Config{
|
|
MinVersion: tls.VersionTLS12,
|
|
},
|
|
}
|
|
|
|
// Start config watcher for .env file changes
|
|
configWatcher, err := config.NewConfigWatcher(cfg)
|
|
if err != nil {
|
|
log.Warn().Err(err).Msg("Failed to create config watcher, .env changes will require restart")
|
|
} else {
|
|
configWatcher.SetMockReloadCallback(func() {
|
|
log.Info().Msg(".env mock settings changed, reloading monitor")
|
|
if err := reloadableMonitor.Reload(); err != nil {
|
|
log.Error().Err(err).Msg("Failed to reload monitor after .env mock setting change")
|
|
} else if router != nil {
|
|
router.SetMonitor(reloadableMonitor.GetMonitor())
|
|
router.SetMultiTenantMonitor(reloadableMonitor.GetMultiTenantMonitor())
|
|
if cfg := reloadableMonitor.GetConfig(); cfg != nil {
|
|
router.SetConfig(cfg)
|
|
}
|
|
}
|
|
})
|
|
|
|
configWatcher.SetAPITokenReloadCallback(func() {
|
|
if monitor := reloadableMonitor.GetMonitor(); monitor != nil {
|
|
monitor.RebuildTokenBindings()
|
|
}
|
|
})
|
|
|
|
if err := configWatcher.Start(); err != nil {
|
|
log.Warn().Err(err).Msg("Failed to start config watcher")
|
|
}
|
|
defer configWatcher.Stop()
|
|
}
|
|
|
|
// Start HTTP→HTTPS redirect server when HTTPS is active and redirect port is configured.
|
|
var redirectSrv *http.Server
|
|
if cfg.HTTPSEnabled && cfg.TLSCertFile != "" && cfg.TLSKeyFile != "" && cfg.HTTPRedirectPort > 0 {
|
|
if cfg.HTTPRedirectPort == cfg.FrontendPort {
|
|
log.Error().
|
|
Int("redirect_port", cfg.HTTPRedirectPort).
|
|
Int("frontend_port", cfg.FrontendPort).
|
|
Msg("HTTP_REDIRECT_PORT must differ from FRONTEND_PORT; skipping redirect server")
|
|
} else {
|
|
httpsPort := cfg.FrontendPort
|
|
redirectSrv = &http.Server{
|
|
Addr: fmt.Sprintf("%s:%d", cfg.BindAddress, cfg.HTTPRedirectPort),
|
|
Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
|
if r.Host == "" {
|
|
http.Error(w, "Bad Request", http.StatusBadRequest)
|
|
return
|
|
}
|
|
// Extract the hostname, stripping any port. net.SplitHostPort
|
|
// handles IPv6 bracket notation (e.g. [::1]:80 → "::1").
|
|
// If SplitHostPort fails, the Host has no port — strip any
|
|
// stray brackets that a bare IPv6 literal may carry.
|
|
host := r.Host
|
|
if h, _, err := net.SplitHostPort(host); err == nil {
|
|
host = h
|
|
} else {
|
|
host = strings.TrimPrefix(strings.TrimSuffix(host, "]"), "[")
|
|
}
|
|
// Rebuild the authority with the HTTPS port.
|
|
// net.JoinHostPort handles IPv6 bracketing automatically.
|
|
var authority string
|
|
if httpsPort != 443 {
|
|
authority = net.JoinHostPort(host, strconv.Itoa(httpsPort))
|
|
} else if strings.Contains(host, ":") {
|
|
authority = "[" + host + "]"
|
|
} else {
|
|
authority = host
|
|
}
|
|
target := "https://" + authority + r.URL.RequestURI()
|
|
http.Redirect(w, r, target, http.StatusMovedPermanently)
|
|
}),
|
|
ReadHeaderTimeout: 5 * time.Second,
|
|
}
|
|
go func() {
|
|
log.Info().
|
|
Str("host", cfg.BindAddress).
|
|
Int("port", cfg.HTTPRedirectPort).
|
|
Int("https_port", httpsPort).
|
|
Msg("HTTP→HTTPS redirect server listening")
|
|
if err := redirectSrv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
|
|
log.Error().Err(err).Msg("Failed to start HTTP redirect server")
|
|
}
|
|
}()
|
|
}
|
|
}
|
|
|
|
// Start server
|
|
go func() {
|
|
if cfg.HTTPSEnabled && cfg.TLSCertFile != "" && cfg.TLSKeyFile != "" {
|
|
log.Info().
|
|
Str("host", cfg.BindAddress).
|
|
Int("port", cfg.FrontendPort).
|
|
Str("protocol", "HTTPS").
|
|
Msg("Server listening")
|
|
if err := srv.ListenAndServeTLS(cfg.TLSCertFile, cfg.TLSKeyFile); err != nil && err != http.ErrServerClosed {
|
|
log.Error().Err(err).Msg("Failed to start HTTPS server")
|
|
}
|
|
} else {
|
|
if cfg.HTTPSEnabled {
|
|
log.Warn().Msg("HTTPS_ENABLED is true but TLS_CERT_FILE or TLS_KEY_FILE not configured, falling back to HTTP")
|
|
}
|
|
log.Info().
|
|
Str("host", cfg.BindAddress).
|
|
Int("port", cfg.FrontendPort).
|
|
Str("protocol", "HTTP").
|
|
Msg("Server listening")
|
|
if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
|
|
log.Error().Err(err).Msg("Failed to start HTTP server")
|
|
}
|
|
}
|
|
}()
|
|
|
|
// Setup signal handlers
|
|
sigChan := make(chan os.Signal, 1)
|
|
reloadChan := make(chan os.Signal, 1)
|
|
|
|
signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM)
|
|
signal.Notify(reloadChan, syscall.SIGHUP)
|
|
defer signal.Stop(sigChan)
|
|
defer signal.Stop(reloadChan)
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
log.Info().Msg("Context cancelled, shutting down...")
|
|
goto shutdown
|
|
|
|
case <-reloadChan:
|
|
log.Info().Msg("Received SIGHUP, reloading configuration...")
|
|
if configWatcher != nil {
|
|
configWatcher.ReloadConfig()
|
|
}
|
|
|
|
if err := reloadFunc(); err != nil {
|
|
log.Error().Err(err).Msg("Failed to reload monitor after SIGHUP")
|
|
} else {
|
|
log.Info().Msg("Runtime configuration reloaded")
|
|
}
|
|
|
|
case <-sigChan:
|
|
log.Info().Msg("Shutting down server...")
|
|
goto shutdown
|
|
}
|
|
}
|
|
|
|
shutdown:
|
|
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 30*time.Second)
|
|
defer shutdownCancel()
|
|
|
|
if redirectSrv != nil {
|
|
if err := redirectSrv.Shutdown(shutdownCtx); err != nil {
|
|
log.Error().Err(err).Msg("HTTP redirect server shutdown error")
|
|
}
|
|
}
|
|
if err := srv.Shutdown(shutdownCtx); err != nil {
|
|
log.Error().Err(err).Msg("Server shutdown error")
|
|
}
|
|
|
|
// Stop license grant refresh loops
|
|
router.StopGrantRefresh()
|
|
|
|
// Gracefully stop AI intelligence services (patrol, investigations, triggers)
|
|
router.ShutdownAIIntelligence()
|
|
|
|
// Stop relay client
|
|
router.StopRelay()
|
|
|
|
// Stop AI chat service (kills sidecar process group)
|
|
router.StopAIChat(shutdownCtx)
|
|
|
|
// Ensure mock-mode background update ticker is stopped before process exit.
|
|
if mock.IsMockEnabled() {
|
|
mock.SetEnabled(false)
|
|
}
|
|
|
|
cancel()
|
|
reloadableMonitor.Stop()
|
|
|
|
if configWatcher != nil {
|
|
configWatcher.Stop()
|
|
}
|
|
|
|
// Close tenant audit loggers
|
|
tenantAuditManager.Close()
|
|
if err := audit.Close(); err != nil {
|
|
log.Error().Err(err).Msg("Failed to close audit logger")
|
|
}
|
|
log.Info().Msg("Server stopped")
|
|
return nil
|
|
}
|
|
|
|
// startMetricsServer starts the Prometheus /metrics endpoint. When metricsToken
|
|
// is non-empty, requests must include a matching Authorization: Bearer <token> header.
|
|
func startMetricsServer(ctx context.Context, addr string, metricsToken string) {
|
|
handler := promhttp.Handler()
|
|
if metricsToken != "" {
|
|
inner := handler
|
|
handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
|
auth := r.Header.Get("Authorization")
|
|
const prefix = "Bearer "
|
|
providedToken := ""
|
|
if len(auth) >= len(prefix) && strings.EqualFold(auth[:len(prefix)], prefix) {
|
|
providedToken = auth[len(prefix):]
|
|
}
|
|
if len(providedToken) != len(metricsToken) ||
|
|
subtle.ConstantTimeCompare([]byte(providedToken), []byte(metricsToken)) != 1 {
|
|
http.Error(w, "Unauthorized", http.StatusUnauthorized)
|
|
return
|
|
}
|
|
inner.ServeHTTP(w, r)
|
|
})
|
|
log.Info().Msg("Metrics endpoint requires bearer token authentication")
|
|
}
|
|
mux := http.NewServeMux()
|
|
mux.Handle("/metrics", handler)
|
|
|
|
srv := &http.Server{
|
|
Addr: addr,
|
|
Handler: mux,
|
|
ReadHeaderTimeout: 5 * time.Second,
|
|
}
|
|
|
|
go func() {
|
|
log.Info().Str("addr", addr).Msg("Metrics server listening")
|
|
if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
|
|
log.Error().Err(err).Msg("Metrics server failed")
|
|
}
|
|
}()
|
|
|
|
go func() {
|
|
<-ctx.Done()
|
|
shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
defer cancel()
|
|
_ = srv.Shutdown(shutdownCtx)
|
|
}()
|
|
}
|
|
|
|
func ShouldAutoImport() bool {
|
|
configPath := config.ResolveRuntimeDataDir("")
|
|
|
|
if _, err := os.Stat(filepath.Join(configPath, "nodes.enc")); err == nil {
|
|
return false
|
|
}
|
|
|
|
return os.Getenv("PULSE_INIT_CONFIG_DATA") != "" ||
|
|
os.Getenv("PULSE_INIT_CONFIG_FILE") != ""
|
|
}
|
|
|
|
func PerformAutoImport() error {
|
|
configData := os.Getenv("PULSE_INIT_CONFIG_DATA")
|
|
configFile := os.Getenv("PULSE_INIT_CONFIG_FILE")
|
|
configPass := os.Getenv("PULSE_INIT_CONFIG_PASSPHRASE")
|
|
source := "none"
|
|
if configFile != "" {
|
|
source = "file"
|
|
} else if configData != "" {
|
|
source = "env_data"
|
|
}
|
|
|
|
logAudit := func(success bool, reason string) {
|
|
details := "source=" + source
|
|
if reason != "" {
|
|
details += " reason=" + reason
|
|
}
|
|
audit.Log("config_auto_import", "system", "", "/startup/auto-import", success, details)
|
|
}
|
|
|
|
if configPass == "" {
|
|
logAudit(false, "missing_passphrase")
|
|
return fmt.Errorf("PULSE_INIT_CONFIG_PASSPHRASE is required for auto-import")
|
|
}
|
|
|
|
var encryptedData string
|
|
|
|
if configFile != "" {
|
|
data, err := os.ReadFile(configFile)
|
|
if err != nil {
|
|
logAudit(false, "read_config_file_failed")
|
|
return fmt.Errorf("failed to read config file: %w", err)
|
|
}
|
|
payload, err := NormalizeImportPayload(data)
|
|
if err != nil {
|
|
logAudit(false, "normalize_payload_failed")
|
|
return err
|
|
}
|
|
encryptedData = payload
|
|
} else if configData != "" {
|
|
payload, err := NormalizeImportPayload([]byte(configData))
|
|
if err != nil {
|
|
logAudit(false, "normalize_payload_failed")
|
|
return err
|
|
}
|
|
encryptedData = payload
|
|
} else {
|
|
logAudit(false, "missing_payload")
|
|
return fmt.Errorf("no config data provided")
|
|
}
|
|
|
|
configPath := config.ResolveRuntimeDataDir("")
|
|
|
|
persistence := config.NewConfigPersistence(configPath)
|
|
if err := persistence.ImportConfig(encryptedData, configPass); err != nil {
|
|
logAudit(false, "import_failed")
|
|
return fmt.Errorf("failed to import configuration: %w", err)
|
|
}
|
|
|
|
logAudit(true, "")
|
|
log.Info().Msg("Configuration auto-imported successfully")
|
|
return nil
|
|
}
|
|
|
|
func NormalizeImportPayload(raw []byte) (string, error) {
|
|
trimmed := strings.TrimSpace(string(raw))
|
|
if trimmed == "" {
|
|
return "", fmt.Errorf("configuration payload is empty")
|
|
}
|
|
|
|
if decoded, err := base64.StdEncoding.DecodeString(trimmed); err == nil {
|
|
decodedTrimmed := strings.TrimSpace(string(decoded))
|
|
if LooksLikeBase64(decodedTrimmed) {
|
|
return decodedTrimmed, nil
|
|
}
|
|
return trimmed, nil
|
|
}
|
|
|
|
return base64.StdEncoding.EncodeToString(raw), nil
|
|
}
|
|
|
|
func LooksLikeBase64(s string) bool {
|
|
if s == "" {
|
|
return false
|
|
}
|
|
compact := strings.Map(func(r rune) rune {
|
|
switch r {
|
|
case '\n', '\r', '\t', ' ':
|
|
return -1
|
|
default:
|
|
return r
|
|
}
|
|
}, s)
|
|
|
|
if compact == "" || len(compact)%4 != 0 {
|
|
return false
|
|
}
|
|
for i := 0; i < len(compact); i++ {
|
|
c := compact[i]
|
|
isAlphaNum := (c >= 'A' && c <= 'Z') || (c >= 'a' && c <= 'z') || (c >= '0' && c <= '9')
|
|
if isAlphaNum || c == '+' || c == '/' || c == '=' {
|
|
continue
|
|
}
|
|
return false
|
|
}
|
|
return true
|
|
}
|
|
|
|
func ensureDefaultOrgOwnerMembership(mtp *config.MultiTenantPersistence, adminUser string) error {
|
|
if mtp == nil {
|
|
return nil
|
|
}
|
|
adminUser = strings.TrimSpace(adminUser)
|
|
if adminUser == "" {
|
|
return nil
|
|
}
|
|
|
|
org, err := mtp.LoadOrganization("default")
|
|
if err != nil {
|
|
return fmt.Errorf("load default organization: %w", err)
|
|
}
|
|
if org == nil {
|
|
org = &models.Organization{}
|
|
}
|
|
|
|
changed := false
|
|
now := time.Now().UTC()
|
|
if strings.TrimSpace(org.ID) == "" {
|
|
org.ID = "default"
|
|
changed = true
|
|
}
|
|
if strings.TrimSpace(org.DisplayName) == "" {
|
|
org.DisplayName = "default"
|
|
changed = true
|
|
}
|
|
if org.CreatedAt.IsZero() {
|
|
org.CreatedAt = now
|
|
changed = true
|
|
}
|
|
if strings.TrimSpace(org.OwnerUserID) == "" {
|
|
org.OwnerUserID = adminUser
|
|
changed = true
|
|
}
|
|
|
|
if ensureOrgOwnerMembership(org, adminUser, now) {
|
|
changed = true
|
|
}
|
|
|
|
if ownerUserID := strings.TrimSpace(org.OwnerUserID); ownerUserID != "" && ownerUserID != adminUser {
|
|
if ensureOrgOwnerMembership(org, ownerUserID, now) {
|
|
changed = true
|
|
}
|
|
}
|
|
|
|
if !changed {
|
|
return nil
|
|
}
|
|
return mtp.SaveOrganization(org)
|
|
}
|
|
|
|
func ensureOrgOwnerMembership(org *models.Organization, userID string, now time.Time) bool {
|
|
userID = strings.TrimSpace(userID)
|
|
if org == nil || userID == "" {
|
|
return false
|
|
}
|
|
|
|
for i := range org.Members {
|
|
if strings.TrimSpace(org.Members[i].UserID) != userID {
|
|
continue
|
|
}
|
|
changed := false
|
|
if org.Members[i].Role != models.OrgRoleOwner {
|
|
org.Members[i].Role = models.OrgRoleOwner
|
|
changed = true
|
|
}
|
|
if org.Members[i].AddedAt.IsZero() {
|
|
org.Members[i].AddedAt = now
|
|
changed = true
|
|
}
|
|
if strings.TrimSpace(org.Members[i].AddedBy) == "" {
|
|
org.Members[i].AddedBy = userID
|
|
changed = true
|
|
}
|
|
return changed
|
|
}
|
|
|
|
org.Members = append(org.Members, models.OrganizationMember{
|
|
UserID: userID,
|
|
Role: models.OrgRoleOwner,
|
|
AddedAt: now,
|
|
AddedBy: userID,
|
|
})
|
|
return true
|
|
}
|