diff --git a/cmd/pulse-agent/main.go b/cmd/pulse-agent/main.go index 397302add..d5d50106c 100644 --- a/cmd/pulse-agent/main.go +++ b/cmd/pulse-agent/main.go @@ -431,7 +431,8 @@ func loadConfig() Config { // Flags urlFlag := flag.String("url", envURL, "Pulse server URL") - tokenFlag := flag.String("token", envToken, "Pulse API token") + tokenFlag := flag.String("token", envToken, "Pulse API token (prefer --token-file for security)") + tokenFileFlag := flag.String("token-file", "", "Path to file containing Pulse API token (more secure than --token)") intervalFlag := flag.Duration("interval", defaultInterval, "Reporting interval") hostnameFlag := flag.String("hostname", envHostname, "Override hostname") agentIDFlag := flag.String("agent-id", envAgentID, "Override agent identifier") @@ -476,9 +477,10 @@ func loadConfig() Config { pulseURL = "http://localhost:7655" } - token := strings.TrimSpace(*tokenFlag) + // Resolve token with priority: --token > --token-file > env > default file + token := resolveToken(*tokenFlag, *tokenFileFlag, envToken) if token == "" { - fmt.Fprintln(os.Stderr, "error: Pulse API token is required") + fmt.Fprintln(os.Stderr, "error: Pulse API token is required (use --token, --token-file, PULSE_TOKEN env, or /var/lib/pulse-agent/token)") os.Exit(1) } @@ -629,6 +631,44 @@ func resolveEnableCommands(enableFlag, disableFlag bool, envEnable, envDisable s return false } +// resolveToken resolves the API token with priority: +// 1. --token flag (direct value) +// 2. --token-file flag (read from file) +// 3. PULSE_TOKEN environment variable +// 4. Default token file at /var/lib/pulse-agent/token +// +// Reading from a file is more secure than CLI args as tokens won't appear in `ps` output. +func resolveToken(tokenFlag, tokenFileFlag, envToken string) string { + // 1. Direct token from --token flag + if t := strings.TrimSpace(tokenFlag); t != "" { + return t + } + + // 2. Token from --token-file flag + if tokenFileFlag != "" { + if content, err := os.ReadFile(tokenFileFlag); err == nil { + if t := strings.TrimSpace(string(content)); t != "" { + return t + } + } + } + + // 3. PULSE_TOKEN environment variable + if t := strings.TrimSpace(envToken); t != "" { + return t + } + + // 4. Default token file (most secure method for systemd services) + defaultTokenFile := "/var/lib/pulse-agent/token" + if content, err := os.ReadFile(defaultTokenFile); err == nil { + if t := strings.TrimSpace(string(content)); t != "" { + return t + } + } + + return "" +} + // initDockerWithRetry attempts to initialize the Docker agent with exponential backoff. // It returns the agent when Docker becomes available, or nil if the context is cancelled. // Retry intervals: 5s, 10s, 20s, 40s, 80s, 160s, then cap at 5 minutes. diff --git a/internal/dockeragent/agent.go b/internal/dockeragent/agent.go index 3cdcf7b69..5a2acab39 100644 --- a/internal/dockeragent/agent.go +++ b/internal/dockeragent/agent.go @@ -537,14 +537,7 @@ func (a *Agent) Run(ctx context.Context) error { initialDelay := 5*time.Second + randomDurationFn(startupJitterWindow) updateTimer := newTimerFn(initialDelay) - defer func() { - if !updateTimer.Stop() { - select { - case <-updateTimer.C: - default: - } - } - }() + defer stopTimer(updateTimer) if err := a.collectOnce(ctx); err != nil { if errors.Is(err, ErrStopRequested) { @@ -556,12 +549,7 @@ func (a *Agent) Run(ctx context.Context) error { for { select { case <-ctx.Done(): - if !updateTimer.Stop() { - select { - case <-updateTimer.C: - default: - } - } + stopTimer(updateTimer) return ctx.Err() case <-ticker.C: if err := a.collectOnce(ctx); err != nil { @@ -581,6 +569,15 @@ func (a *Agent) Run(ctx context.Context) error { } } +func stopTimer(timer *time.Timer) { + if !timer.Stop() { + select { + case <-timer.C: + default: + } + } +} + func (a *Agent) collectOnce(ctx context.Context) error { report, err := a.buildReport(ctx) if err != nil { @@ -1924,7 +1921,7 @@ func (a *Agent) selfUpdate(ctx context.Context) error { tmpFile.Close() return fmt.Errorf("downloaded binary exceeds maximum size (%d bytes)", maxBinarySize) } - if err := tmpFile.Close(); err != nil { + if err := closeFileFn(tmpFile); err != nil { return fmt.Errorf("failed to close temp file: %w", err) } diff --git a/internal/dockeragent/agent_collect_test.go b/internal/dockeragent/agent_collect_test.go new file mode 100644 index 000000000..149354155 --- /dev/null +++ b/internal/dockeragent/agent_collect_test.go @@ -0,0 +1,319 @@ +package dockeragent + +import ( + "context" + "errors" + "io" + "net/http" + "strings" + "testing" + "time" + + containertypes "github.com/docker/docker/api/types/container" + "github.com/rs/zerolog" +) + +func TestCollectContainer(t *testing.T) { + logger := zerolog.Nop() + + t.Run("success with running container", func(t *testing.T) { + stats := containertypes.StatsResponse{ + Read: time.Now(), + CPUStats: containertypes.CPUStats{ + CPUUsage: containertypes.CPUUsage{TotalUsage: 200000000}, + SystemUsage: 2000000000, + OnlineCPUs: 2, + }, + PreCPUStats: containertypes.CPUStats{ + CPUUsage: containertypes.CPUUsage{TotalUsage: 100000000}, + SystemUsage: 1000000000, + }, + MemoryStats: containertypes.MemoryStats{ + Usage: 1000000, + Limit: 4000000, + Stats: map[string]uint64{"cache": 200000}, + }, + BlkioStats: containertypes.BlkioStats{ + IoServiceBytesRecursive: []containertypes.BlkioStatEntry{ + {Op: "Read", Value: 100}, + {Op: "Write", Value: 200}, + }, + }, + } + + sizeRw := int64(1234) + sizeRoot := int64(5678) + inspect := baseInspect() + inspect.ContainerJSONBase.SizeRw = &sizeRw + inspect.ContainerJSONBase.SizeRootFs = &sizeRoot + inspect.ContainerJSONBase.State = &containertypes.State{ + Running: true, + StartedAt: time.Now().Add(-time.Minute).Format(time.RFC3339Nano), + Health: &containertypes.Health{Status: "healthy"}, + } + inspect.Config.Env = []string{"PASSWORD=secret", "PATH=/bin"} + inspect.NetworkSettings.Networks["net1"].IPAddress = "10.0.0.2" + inspect.Mounts = []containertypes.MountPoint{ + {Type: "bind", Source: "/data", Destination: "/data", RW: true}, + } + + agent := &Agent{ + cfg: Config{ + CollectDiskMetrics: true, + }, + runtime: RuntimePodman, + logger: logger, + prevContainerCPU: make(map[string]cpuSample), + registryChecker: NewRegistryChecker(logger), + docker: &fakeDockerClient{ + containerInspectWithRawFn: func(context.Context, string, bool) (containertypes.InspectResponse, []byte, error) { + return inspect, nil, nil + }, + containerStatsOneShotFn: func(context.Context, string) (containertypes.StatsResponseReader, error) { + return statsReader(t, stats), nil + }, + }, + } + + summary := containertypes.Summary{ + ID: "container1", + Names: []string{"/app"}, + Image: "nginx@sha256:abc123", + ImageID: "sha256:local", + Created: time.Now().Add(-time.Hour).Unix(), + State: "running", + Status: "Up", + Ports: []containertypes.Port{ + {PrivatePort: 80, PublicPort: 8080, Type: "tcp", IP: "0.0.0.0"}, + }, + Labels: map[string]string{ + "io.podman.annotations.pod.name": "mypod", + }, + } + + container, err := agent.collectContainer(context.Background(), summary) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if container.Name != "app" { + t.Fatalf("expected name app, got %q", container.Name) + } + if container.Health != "healthy" { + t.Fatalf("expected health status, got %q", container.Health) + } + if container.BlockIO == nil { + t.Fatalf("expected block IO to be populated") + } + if container.Podman == nil || container.Podman.PodName != "mypod" { + t.Fatalf("expected podman metadata") + } + if container.UpdateStatus == nil || container.UpdateStatus.Error == "" { + t.Fatalf("expected update status for digest-pinned image") + } + if len(container.Networks) == 0 { + t.Fatalf("expected networks to be populated") + } + }) + + t.Run("stopped container clears sample", func(t *testing.T) { + agent := &Agent{ + logger: logger, + prevContainerCPU: map[string]cpuSample{ + "container1": {totalUsage: 1}, + }, + docker: &fakeDockerClient{ + containerInspectWithRawFn: func(context.Context, string, bool) (containertypes.InspectResponse, []byte, error) { + inspect := baseInspect() + inspect.ContainerJSONBase.State = &containertypes.State{Running: false} + return inspect, nil, nil + }, + }, + } + + summary := containertypes.Summary{ID: "container1", Names: []string{"/app"}, State: "exited"} + if _, err := agent.collectContainer(context.Background(), summary); err != nil { + t.Fatalf("unexpected error: %v", err) + } + if _, ok := agent.prevContainerCPU["container1"]; ok { + t.Fatalf("expected sample to be removed") + } + }) + + t.Run("inspect error", func(t *testing.T) { + agent := &Agent{ + docker: &fakeDockerClient{ + containerInspectWithRawFn: func(context.Context, string, bool) (containertypes.InspectResponse, []byte, error) { + return containertypes.InspectResponse{}, nil, errors.New("inspect failed") + }, + }, + logger: logger, + } + + if _, err := agent.collectContainer(context.Background(), containertypes.Summary{ID: "container1"}); err == nil { + t.Fatal("expected error") + } + }) + + t.Run("stats error", func(t *testing.T) { + agent := &Agent{ + docker: &fakeDockerClient{ + containerInspectWithRawFn: func(context.Context, string, bool) (containertypes.InspectResponse, []byte, error) { + inspect := baseInspect() + inspect.ContainerJSONBase.State = &containertypes.State{Running: true} + return inspect, nil, nil + }, + containerStatsOneShotFn: func(context.Context, string) (containertypes.StatsResponseReader, error) { + return containertypes.StatsResponseReader{}, errors.New("stats failed") + }, + }, + logger: logger, + } + + if _, err := agent.collectContainer(context.Background(), containertypes.Summary{ID: "container1"}); err == nil { + t.Fatal("expected error") + } + }) + + t.Run("stats decode error", func(t *testing.T) { + agent := &Agent{ + docker: &fakeDockerClient{ + containerInspectWithRawFn: func(context.Context, string, bool) (containertypes.InspectResponse, []byte, error) { + inspect := baseInspect() + inspect.ContainerJSONBase.State = &containertypes.State{Running: true} + return inspect, nil, nil + }, + containerStatsOneShotFn: func(context.Context, string) (containertypes.StatsResponseReader, error) { + return containertypes.StatsResponseReader{Body: io.NopCloser(strings.NewReader("{"))}, nil + }, + }, + logger: logger, + } + + if _, err := agent.collectContainer(context.Background(), containertypes.Summary{ID: "container1"}); err == nil { + t.Fatal("expected error") + } + }) +} + +func TestCollectContainers(t *testing.T) { + logger := zerolog.Nop() + + t.Run("list error", func(t *testing.T) { + agent := &Agent{ + docker: &fakeDockerClient{ + containerListFunc: func(context.Context, containertypes.ListOptions) ([]containertypes.Summary, error) { + return nil, errors.New("list failed") + }, + }, + logger: logger, + } + + if _, err := agent.collectContainers(context.Background()); err == nil { + t.Fatal("expected error") + } + }) + + t.Run("filters and prune", func(t *testing.T) { + agent := &Agent{ + logger: logger, + stateFilters: []string{"running"}, + allowedStates: map[string]struct{}{ + "running": {}, + }, + prevContainerCPU: map[string]cpuSample{ + "stale": {totalUsage: 1}, + }, + docker: &fakeDockerClient{ + containerListFunc: func(_ context.Context, opts containertypes.ListOptions) ([]containertypes.Summary, error) { + if opts.Filters.Len() == 0 { + t.Fatal("expected filters to be set") + } + return []containertypes.Summary{ + {ID: "running1", Names: []string{"/run"}, State: "running"}, + {ID: "exited1", Names: []string{"/exit"}, State: "exited"}, + }, nil + }, + containerInspectWithRawFn: func(context.Context, string, bool) (containertypes.InspectResponse, []byte, error) { + return containertypes.InspectResponse{}, nil, errors.New("inspect failed") + }, + }, + } + + containers, err := agent.collectContainers(context.Background()) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if len(containers) != 0 { + t.Fatalf("expected no containers, got %d", len(containers)) + } + if _, ok := agent.prevContainerCPU["stale"]; ok { + t.Fatalf("expected stale sample to be pruned") + } + }) +} + +func TestPrimaryTargetAndHTTPClient(t *testing.T) { + t.Run("primary target empty", func(t *testing.T) { + agent := &Agent{} + if target := agent.primaryTarget(); target.URL != "" { + t.Fatal("expected empty target") + } + }) + + t.Run("http client selection", func(t *testing.T) { + secure := &http.Client{} + insecure := &http.Client{} + agent := &Agent{ + httpClients: map[bool]*http.Client{ + false: secure, + true: insecure, + }, + } + if got := agent.httpClientFor(TargetConfig{InsecureSkipVerify: true}); got != insecure { + t.Fatal("expected insecure client") + } + if got := agent.httpClientFor(TargetConfig{InsecureSkipVerify: false}); got != secure { + t.Fatal("expected secure client") + } + }) + + t.Run("http client fallback", func(t *testing.T) { + agent := &Agent{ + httpClients: map[bool]*http.Client{}, + } + got := agent.httpClientFor(TargetConfig{InsecureSkipVerify: true}) + if got == nil { + t.Fatal("expected fallback client") + } + }) +} + +func TestNewHTTPClient(t *testing.T) { + client := newHTTPClient(true) + if client.Transport == nil { + t.Fatal("expected transport") + } + transport := client.Transport.(*http.Transport) + if !transport.TLSClientConfig.InsecureSkipVerify { + t.Fatal("expected insecure skip verify true") + } +} + +func TestAgentClose(t *testing.T) { + closed := false + agent := &Agent{ + docker: &fakeDockerClient{ + closeFn: func() error { + closed = true + return nil + }, + }, + } + + if err := agent.Close(); err != nil { + t.Fatalf("unexpected error: %v", err) + } + if !closed { + t.Fatal("expected docker Close to be called") + } +} diff --git a/internal/dockeragent/agent_cpu_test.go b/internal/dockeragent/agent_cpu_test.go new file mode 100644 index 000000000..0dcb227d5 --- /dev/null +++ b/internal/dockeragent/agent_cpu_test.go @@ -0,0 +1,176 @@ +package dockeragent + +import ( + "testing" + "time" + + containertypes "github.com/docker/docker/api/types/container" + "github.com/rs/zerolog" +) + +func TestCalculateContainerCPUPercent(t *testing.T) { + logger := zerolog.Nop() + + t.Run("uses precpu stats", func(t *testing.T) { + agent := &Agent{ + logger: logger, + prevContainerCPU: make(map[string]cpuSample), + cpuCount: 2, + } + + stats := containertypes.StatsResponse{ + Read: time.Now(), + CPUStats: containertypes.CPUStats{ + CPUUsage: containertypes.CPUUsage{TotalUsage: 200}, + SystemUsage: 2000, + OnlineCPUs: 2, + }, + PreCPUStats: containertypes.CPUStats{ + CPUUsage: containertypes.CPUUsage{TotalUsage: 100}, + SystemUsage: 1000, + }, + } + + got := agent.calculateContainerCPUPercent("container1", stats) + if got <= 0 { + t.Fatalf("expected percent > 0, got %f", got) + } + if _, ok := agent.prevContainerCPU["container1"]; !ok { + t.Fatal("expected current sample to be stored") + } + }) + + t.Run("first manual sample returns zero", func(t *testing.T) { + agent := &Agent{ + logger: logger, + prevContainerCPU: make(map[string]cpuSample), + } + stats := containertypes.StatsResponse{ + Read: time.Now(), + CPUStats: containertypes.CPUStats{ + CPUUsage: containertypes.CPUUsage{TotalUsage: 100}, + SystemUsage: 1000, + }, + PreCPUStats: containertypes.CPUStats{ + CPUUsage: containertypes.CPUUsage{TotalUsage: 100}, + SystemUsage: 1000, + }, + } + + got := agent.calculateContainerCPUPercent("container1", stats) + if got != 0 { + t.Fatalf("expected 0, got %f", got) + } + if _, ok := agent.prevContainerCPU["container1"]; !ok { + t.Fatal("expected sample to be stored") + } + }) + + t.Run("manual system delta uses previous sample", func(t *testing.T) { + agent := &Agent{ + logger: logger, + prevContainerCPU: map[string]cpuSample{ + "container1": { + totalUsage: 100, + systemUsage: 1000, + onlineCPUs: 2, + read: time.Now().Add(-time.Second), + }, + }, + } + + stats := containertypes.StatsResponse{ + Read: time.Now(), + CPUStats: containertypes.CPUStats{ + CPUUsage: containertypes.CPUUsage{TotalUsage: 200}, + SystemUsage: 2000, + OnlineCPUs: 0, + }, + PreCPUStats: containertypes.CPUStats{}, + } + + got := agent.calculateContainerCPUPercent("container1", stats) + if got <= 0 { + t.Fatalf("expected percent > 0, got %f", got) + } + }) + + t.Run("manual time delta fallback", func(t *testing.T) { + agent := &Agent{ + logger: logger, + cpuCount: 4, + prevContainerCPU: map[string]cpuSample{ + "container1": { + totalUsage: 100, + systemUsage: 1000, + onlineCPUs: 0, + read: time.Now().Add(-2 * time.Second), + }, + }, + } + + stats := containertypes.StatsResponse{ + Read: time.Now(), + CPUStats: containertypes.CPUStats{ + CPUUsage: containertypes.CPUUsage{TotalUsage: 200}, + SystemUsage: 1000, + OnlineCPUs: 0, + }, + PreCPUStats: containertypes.CPUStats{}, + } + + got := agent.calculateContainerCPUPercent("container1", stats) + if got <= 0 { + t.Fatalf("expected percent > 0, got %f", got) + } + }) + + t.Run("no valid delta returns zero", func(t *testing.T) { + agent := &Agent{ + logger: logger, + prevContainerCPU: map[string]cpuSample{ + "container1": { + totalUsage: 100, + systemUsage: 1000, + onlineCPUs: 0, + read: time.Time{}, + }, + }, + } + + stats := containertypes.StatsResponse{ + Read: time.Time{}, + CPUStats: containertypes.CPUStats{ + CPUUsage: containertypes.CPUUsage{TotalUsage: 200}, + SystemUsage: 1000, + OnlineCPUs: 0, + }, + PreCPUStats: containertypes.CPUStats{}, + } + + got := agent.calculateContainerCPUPercent("container1", stats) + if got != 0 { + t.Fatalf("expected 0, got %f", got) + } + }) + + t.Run("warn after repeated failures", func(t *testing.T) { + agent := &Agent{ + logger: logger, + prevContainerCPU: make(map[string]cpuSample), + } + + stats := containertypes.StatsResponse{ + Read: time.Now(), + CPUStats: containertypes.CPUStats{ + CPUUsage: containertypes.CPUUsage{TotalUsage: 100}, + SystemUsage: 1000, + }, + PreCPUStats: containertypes.CPUStats{}, + } + + for i := 0; i < 10; i++ { + _ = agent.calculateContainerCPUPercent("container1", stats) + } + }) +} diff --git a/internal/dockeragent/agent_flow_test.go b/internal/dockeragent/agent_flow_test.go new file mode 100644 index 000000000..e3d0e5201 --- /dev/null +++ b/internal/dockeragent/agent_flow_test.go @@ -0,0 +1,222 @@ +package dockeragent + +import ( + "bytes" + "context" + "errors" + "net/http" + "net/http/httptest" + "os" + "path/filepath" + "testing" + "time" + + systemtypes "github.com/docker/docker/api/types/system" + "github.com/rcourtman/pulse-go-rewrite/internal/buffer" + "github.com/rcourtman/pulse-go-rewrite/internal/hostmetrics" + agentsdocker "github.com/rcourtman/pulse-go-rewrite/pkg/agents/docker" + "github.com/rs/zerolog" +) + +func TestNewAgent(t *testing.T) { + t.Run("missing targets", func(t *testing.T) { + if _, err := New(Config{}); err == nil { + t.Fatal("expected error for missing target") + } + }) + + t.Run("creates agent with defaults", func(t *testing.T) { + dir := t.TempDir() + path := filepath.Join(dir, "machine-id") + if err := os.WriteFile(path, []byte("machine-1"), 0600); err != nil { + t.Fatalf("write machine-id: %v", err) + } + swap(t, &machineIDPaths, []string{path}) + + fake := &fakeDockerClient{daemonHost: "unix:///var/run/docker.sock"} + swap(t, &connectRuntimeFn, func(_ RuntimeKind, _ *zerolog.Logger) (dockerClient, systemtypes.Info, RuntimeKind, error) { + return fake, systemtypes.Info{ID: "daemon1", ServerVersion: "24.0.0"}, RuntimeDocker, nil + }) + + cfg := Config{ + PulseURL: "https://pulse.example.com/", + APIToken: "token", + LogLevel: zerolog.InfoLevel, + AgentType: "unified", + AgentVersion: "", + } + + agent, err := New(cfg) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if agent.cfg.PulseURL != "https://pulse.example.com" { + t.Fatalf("expected trimmed URL, got %q", agent.cfg.PulseURL) + } + if agent.cfg.IncludeContainers && agent.cfg.IncludeServices && agent.cfg.IncludeTasks { + // ok + } else { + t.Fatal("expected include flags to default to true") + } + if agent.machineID != "machine-1" { + t.Fatalf("expected machine-id to be loaded, got %q", agent.machineID) + } + if agent.agentVersion != Version { + t.Fatalf("expected agent version to default, got %q", agent.agentVersion) + } + }) + + t.Run("podman disables swarm collections", func(t *testing.T) { + fake := &fakeDockerClient{daemonHost: "unix:///run/podman/podman.sock"} + swap(t, &connectRuntimeFn, func(_ RuntimeKind, _ *zerolog.Logger) (dockerClient, systemtypes.Info, RuntimeKind, error) { + return fake, systemtypes.Info{ID: "podman", ServerVersion: "4.6.0"}, RuntimePodman, nil + }) + + cfg := Config{ + PulseURL: "https://pulse.example.com", + APIToken: "token", + IncludeServices: true, + IncludeTasks: true, + } + + agent, err := New(cfg) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if agent.cfg.IncludeServices || agent.cfg.IncludeTasks { + t.Fatal("expected swarm collection to be disabled for podman") + } + }) +} + +func TestStopTimer(t *testing.T) { + t.Run("timer not fired", func(t *testing.T) { + timer := time.NewTimer(time.Hour) + stopTimer(timer) + }) + + t.Run("timer fired and drained", func(t *testing.T) { + timer := time.NewTimer(0) + stopTimer(timer) + select { + case <-timer.C: + t.Fatal("expected timer channel to be drained") + default: + } + }) +} + +func TestCollectOnce(t *testing.T) { + logger := zerolog.Nop() + + t.Run("build report error", func(t *testing.T) { + agent := &Agent{ + docker: &fakeDockerClient{ + infoFunc: func(context.Context) (systemtypes.Info, error) { + return systemtypes.Info{}, errors.New("info failed") + }, + }, + logger: logger, + } + + if err := agent.collectOnce(context.Background()); err == nil { + t.Fatal("expected error") + } + }) + + t.Run("send report error buffers", func(t *testing.T) { + swap(t, &hostmetricsCollect, func(context.Context, []string) (hostmetrics.Snapshot, error) { + return hostmetrics.Snapshot{}, nil + }) + + agent := &Agent{ + cfg: Config{Interval: 30 * time.Second}, + docker: &fakeDockerClient{ + infoFunc: func(context.Context) (systemtypes.Info, error) { + return systemtypes.Info{ID: "daemon", ServerVersion: "24.0.0"}, nil + }, + }, + logger: logger, + targets: []TargetConfig{{URL: "http://invalid", Token: "token"}}, + httpClients: map[bool]*http.Client{false: {Transport: roundTripFunc(func(*http.Request) (*http.Response, error) { + return nil, errors.New("send failed") + })}}, + reportBuffer: buffer.New[agentsdocker.Report](10), + } + + if err := agent.collectOnce(context.Background()); err != nil { + t.Fatalf("unexpected error: %v", err) + } + if agent.reportBuffer.Len() != 1 { + t.Fatalf("expected report to be buffered") + } + }) + + t.Run("send report stop requested", func(t *testing.T) { + swap(t, &hostmetricsCollect, func(context.Context, []string) (hostmetrics.Snapshot, error) { + return hostmetrics.Snapshot{}, nil + }) + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusBadRequest) + _, _ = w.Write([]byte(`{"error":"host was removed","code":"invalid_report"}`)) + })) + defer server.Close() + + agent := &Agent{ + cfg: Config{Interval: 30 * time.Second}, + docker: &fakeDockerClient{ + infoFunc: func(context.Context) (systemtypes.Info, error) { + return systemtypes.Info{ID: "daemon", ServerVersion: "24.0.0"}, nil + }, + }, + logger: logger, + targets: []TargetConfig{{URL: server.URL, Token: "token"}}, + httpClients: map[bool]*http.Client{false: server.Client()}, + reportBuffer: buffer.New[agentsdocker.Report](10), + } + + if err := agent.collectOnce(context.Background()); err != nil { + t.Fatalf("unexpected error: %v", err) + } + if agent.reportBuffer.Len() != 0 { + t.Fatalf("expected no buffering on stop request") + } + }) + + t.Run("flush buffer after success", func(t *testing.T) { + swap(t, &hostmetricsCollect, func(context.Context, []string) (hostmetrics.Snapshot, error) { + return hostmetrics.Snapshot{}, nil + }) + + var buf bytes.Buffer + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte(`{}`)) + })) + defer server.Close() + + agent := &Agent{ + cfg: Config{Interval: 30 * time.Second}, + docker: &fakeDockerClient{ + infoFunc: func(context.Context) (systemtypes.Info, error) { + return systemtypes.Info{ID: "daemon", ServerVersion: "24.0.0"}, nil + }, + }, + logger: zerolog.New(&buf), + targets: []TargetConfig{{URL: server.URL, Token: "token"}}, + httpClients: map[bool]*http.Client{false: server.Client()}, + reportBuffer: buffer.New[agentsdocker.Report](10), + } + + agent.reportBuffer.Push(agentsdocker.Report{Agent: agentsdocker.AgentInfo{ID: "queued"}}) + agent.reportBuffer.Push(agentsdocker.Report{Agent: agentsdocker.AgentInfo{ID: "queued2"}}) + + if err := agent.collectOnce(context.Background()); err != nil { + t.Fatalf("unexpected error: %v", err) + } + if agent.reportBuffer.Len() != 0 { + t.Fatalf("expected buffer to be flushed") + } + }) +} diff --git a/internal/dockeragent/agent_http_test.go b/internal/dockeragent/agent_http_test.go new file mode 100644 index 000000000..073ad8c9d --- /dev/null +++ b/internal/dockeragent/agent_http_test.go @@ -0,0 +1,363 @@ +package dockeragent + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "io" + "math" + "net/http" + "net/http/httptest" + "os" + "strings" + "testing" + + agentsdocker "github.com/rcourtman/pulse-go-rewrite/pkg/agents/docker" + "github.com/rs/zerolog" +) + +func TestSendReport(t *testing.T) { + t.Run("marshal error", func(t *testing.T) { + agent := &Agent{logger: zerolog.Nop()} + report := agentsdocker.Report{ + Host: agentsdocker.HostInfo{ + CPUUsagePercent: math.NaN(), + }, + } + + if err := agent.sendReport(context.Background(), report); err == nil { + t.Fatal("expected marshal error") + } + }) + + t.Run("stop requested", func(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusBadRequest) + _, _ = w.Write([]byte(`{"error":"host was removed","code":"invalid_report"}`)) + })) + defer server.Close() + + agent := &Agent{ + logger: zerolog.Nop(), + hostID: "host1", + targets: []TargetConfig{{URL: server.URL, Token: "token"}}, + httpClients: map[bool]*http.Client{ + false: server.Client(), + }, + } + + if err := agent.sendReport(context.Background(), agentsdocker.Report{}); !errors.Is(err, ErrStopRequested) { + t.Fatalf("expected ErrStopRequested, got %v", err) + } + }) + + t.Run("errors join", func(t *testing.T) { + agent := &Agent{ + logger: zerolog.Nop(), + targets: []TargetConfig{{URL: "http://one", Token: "t1"}, {URL: "http://two", Token: "t2"}}, + httpClients: map[bool]*http.Client{ + false: {Transport: roundTripFunc(func(*http.Request) (*http.Response, error) { + return nil, errors.New("send failed") + })}, + }, + } + + if err := agent.sendReport(context.Background(), agentsdocker.Report{}); err == nil { + t.Fatal("expected error") + } + }) + + t.Run("large payload succeeds", func(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + })) + defer server.Close() + + agent := &Agent{ + logger: zerolog.Nop(), + targets: []TargetConfig{{URL: server.URL, Token: "token"}}, + httpClients: map[bool]*http.Client{ + false: server.Client(), + }, + } + + report := agentsdocker.Report{ + Containers: []agentsdocker.Container{ + {ID: strings.Repeat("a", 500000)}, + }, + } + + if err := agent.sendReport(context.Background(), report); err != nil { + t.Fatalf("unexpected error: %v", err) + } + }) +} + +func TestSendReportToTarget(t *testing.T) { + t.Run("host removed", func(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusBadRequest) + _, _ = w.Write([]byte(`{"error":"host was removed","code":"invalid_report"}`)) + })) + defer server.Close() + + agent := &Agent{ + logger: zerolog.Nop(), + hostID: "host1", + httpClients: map[bool]*http.Client{ + false: server.Client(), + }, + } + err := agent.sendReportToTarget(context.Background(), TargetConfig{URL: server.URL, Token: "token"}, []byte(`{}`), 0) + if !errors.Is(err, ErrStopRequested) { + t.Fatalf("expected ErrStopRequested, got %v", err) + } + }) + + t.Run("status error", func(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusBadRequest) + _, _ = w.Write([]byte("bad request")) + })) + defer server.Close() + + agent := &Agent{ + logger: zerolog.Nop(), + httpClients: map[bool]*http.Client{ + false: server.Client(), + }, + } + + if err := agent.sendReportToTarget(context.Background(), TargetConfig{URL: server.URL, Token: "token"}, []byte(`{}`), 0); err == nil { + t.Fatal("expected error") + } + }) + + t.Run("status error with empty body", func(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusInternalServerError) + })) + defer server.Close() + + agent := &Agent{ + logger: zerolog.Nop(), + httpClients: map[bool]*http.Client{ + false: server.Client(), + }, + } + + if err := agent.sendReportToTarget(context.Background(), TargetConfig{URL: server.URL, Token: "token"}, []byte(`{}`), 0); err == nil { + t.Fatal("expected error") + } + }) + + t.Run("read error", func(t *testing.T) { + client := &http.Client{ + Transport: roundTripFunc(func(*http.Request) (*http.Response, error) { + return &http.Response{ + StatusCode: http.StatusOK, + Body: errReadCloser{err: errors.New("read failed")}, + Header: make(http.Header), + }, nil + }), + } + + agent := &Agent{ + logger: zerolog.Nop(), + httpClients: map[bool]*http.Client{ + false: client, + }, + } + + if err := agent.sendReportToTarget(context.Background(), TargetConfig{URL: "http://example", Token: "token"}, []byte(`{}`), 0); err == nil { + t.Fatal("expected error") + } + }) + + t.Run("invalid json response", func(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte("{")) + })) + defer server.Close() + + agent := &Agent{ + logger: zerolog.Nop(), + httpClients: map[bool]*http.Client{ + false: server.Client(), + }, + } + + if err := agent.sendReportToTarget(context.Background(), TargetConfig{URL: server.URL, Token: "token"}, []byte(`{}`), 0); err != nil { + t.Fatalf("unexpected error: %v", err) + } + }) + + t.Run("stop command", func(t *testing.T) { + var ackBody bytes.Buffer + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch { + case strings.HasSuffix(r.URL.Path, "/report"): + w.Header().Set("Content-Type", "application/json") + _, _ = w.Write([]byte(`{"commands":[{"id":"cmd1","type":"stop"}]}`)) + case strings.Contains(r.URL.Path, "/commands/"): + body, _ := io.ReadAll(r.Body) + ackBody.Write(body) + w.WriteHeader(http.StatusOK) + default: + w.WriteHeader(http.StatusNotFound) + } + })) + defer server.Close() + + agent := &Agent{ + logger: zerolog.Nop(), + hostID: "host1", + httpClients: map[bool]*http.Client{ + false: server.Client(), + }, + } + + err := agent.sendReportToTarget(context.Background(), TargetConfig{URL: server.URL, Token: "token"}, []byte(`{}`), 0) + if !errors.Is(err, ErrStopRequested) { + t.Fatalf("expected ErrStopRequested, got %v", err) + } + }) +} + +func TestSendCommandAck(t *testing.T) { + t.Run("missing host id", func(t *testing.T) { + agent := &Agent{} + if err := agent.sendCommandAck(context.Background(), TargetConfig{URL: "http://example"}, "cmd", "status", "msg"); err == nil { + t.Fatal("expected error") + } + }) + + t.Run("request error", func(t *testing.T) { + agent := &Agent{hostID: "host1"} + badURL := "http://example.com/\x7f" + if err := agent.sendCommandAck(context.Background(), TargetConfig{URL: badURL}, "cmd", "status", "msg"); err == nil { + t.Fatal("expected error") + } + }) + + t.Run("client error", func(t *testing.T) { + agent := &Agent{ + hostID: "host1", + httpClients: map[bool]*http.Client{ + false: {Transport: roundTripFunc(func(*http.Request) (*http.Response, error) { + return nil, errors.New("send failed") + })}, + }, + } + + if err := agent.sendCommandAck(context.Background(), TargetConfig{URL: "http://example", Token: "token"}, "cmd", "status", "msg"); err == nil { + t.Fatal("expected error") + } + }) + + t.Run("status error", func(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusInternalServerError) + _, _ = w.Write([]byte("boom")) + })) + defer server.Close() + + agent := &Agent{ + hostID: "host1", + httpClients: map[bool]*http.Client{ + false: server.Client(), + }, + } + + if err := agent.sendCommandAck(context.Background(), TargetConfig{URL: server.URL, Token: "token"}, "cmd", "status", "msg"); err == nil { + t.Fatal("expected error") + } + }) + + t.Run("success", func(t *testing.T) { + var got agentsdocker.CommandAck + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + body, _ := io.ReadAll(r.Body) + _ = json.Unmarshal(body, &got) + w.WriteHeader(http.StatusOK) + })) + defer server.Close() + + agent := &Agent{ + hostID: "host1", + httpClients: map[bool]*http.Client{ + false: server.Client(), + }, + } + + if err := agent.sendCommandAck(context.Background(), TargetConfig{URL: server.URL, Token: "token"}, "cmd", "completed", "ok"); err != nil { + t.Fatalf("unexpected error: %v", err) + } + if got.Status != "completed" { + t.Fatalf("expected status to be sent, got %q", got.Status) + } + }) +} + +func TestHandleCommand(t *testing.T) { + agent := &Agent{logger: zerolog.Nop()} + if err := agent.handleCommand(context.Background(), TargetConfig{}, agentsdocker.Command{Type: "unknown"}); err != nil { + t.Fatalf("unexpected error: %v", err) + } +} + +func TestHandleStopCommand(t *testing.T) { + t.Run("disable error sends failure ack", func(t *testing.T) { + writeSystemctl(t, "echo 'access denied' >&2\nexit 1") + + var ack agentsdocker.CommandAck + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + body, _ := io.ReadAll(r.Body) + _ = json.Unmarshal(body, &ack) + w.WriteHeader(http.StatusOK) + })) + defer server.Close() + + agent := &Agent{ + logger: zerolog.Nop(), + hostID: "host1", + httpClients: map[bool]*http.Client{ + false: server.Client(), + }, + } + + if err := agent.handleStopCommand(context.Background(), TargetConfig{URL: server.URL, Token: "token"}, agentsdocker.Command{ID: "cmd"}); err != nil { + t.Fatalf("unexpected error: %v", err) + } + if ack.Status != agentsdocker.CommandStatusFailed { + t.Fatalf("expected failed status, got %q", ack.Status) + } + }) + + t.Run("success returns stop requested", func(t *testing.T) { + prev := os.Getenv("PATH") + _ = os.Setenv("PATH", "") + t.Cleanup(func() { + _ = os.Setenv("PATH", prev) + }) + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + })) + defer server.Close() + + agent := &Agent{ + logger: zerolog.Nop(), + hostID: "host1", + httpClients: map[bool]*http.Client{ + false: server.Client(), + }, + } + + if err := agent.handleStopCommand(context.Background(), TargetConfig{URL: server.URL, Token: "token"}, agentsdocker.Command{ID: "cmd"}); !errors.Is(err, ErrStopRequested) { + t.Fatalf("expected ErrStopRequested, got %v", err) + } + }) +} diff --git a/internal/dockeragent/container_update_test.go b/internal/dockeragent/container_update_test.go new file mode 100644 index 000000000..850d3f088 --- /dev/null +++ b/internal/dockeragent/container_update_test.go @@ -0,0 +1,357 @@ +package dockeragent + +import ( + "context" + "errors" + "io" + "net/http" + "net/http/httptest" + "strings" + "sync" + "testing" + "time" + + containertypes "github.com/docker/docker/api/types/container" + "github.com/docker/docker/api/types/image" + "github.com/docker/docker/api/types/network" + "github.com/opencontainers/image-spec/specs-go/v1" + agentsdocker "github.com/rcourtman/pulse-go-rewrite/pkg/agents/docker" + "github.com/rs/zerolog" +) + +func baseInspect() containertypes.InspectResponse { + state := &containertypes.State{Running: true} + hostConfig := &containertypes.HostConfig{} + + return containertypes.InspectResponse{ + ContainerJSONBase: &containertypes.ContainerJSONBase{ + Name: "/app", + Image: "sha256:old0000000000", + State: state, + RestartCount: 1, + HostConfig: hostConfig, + }, + Config: &containertypes.Config{ + Image: "nginx:latest", + }, + NetworkSettings: &network.NetworkSettings{ + Networks: map[string]*network.EndpointSettings{ + "net1": {Aliases: []string{"app"}}, + "net2": {Aliases: []string{"app2"}}, + }, + }, + } +} + +func TestUpdateContainer_Errors(t *testing.T) { + logger := zerolog.Nop() + + t.Run("inspect error", func(t *testing.T) { + agent := &Agent{ + docker: &fakeDockerClient{ + containerInspectFn: func(context.Context, string) (containertypes.InspectResponse, error) { + return containertypes.InspectResponse{}, errors.New("inspect failed") + }, + }, + logger: logger, + } + + result := agent.updateContainer(context.Background(), "container1") + if result.Error == "" { + t.Fatal("expected error for inspect failure") + } + }) + + t.Run("pull error", func(t *testing.T) { + agent := &Agent{ + docker: &fakeDockerClient{ + containerInspectFn: func(context.Context, string) (containertypes.InspectResponse, error) { + return baseInspect(), nil + }, + imagePullFn: func(context.Context, string, image.PullOptions) (io.ReadCloser, error) { + return nil, errors.New("pull failed") + }, + }, + logger: logger, + } + + result := agent.updateContainer(context.Background(), "container1") + if result.Error == "" { + t.Fatal("expected error for pull failure") + } + }) + + t.Run("stop error", func(t *testing.T) { + agent := &Agent{ + docker: &fakeDockerClient{ + containerInspectFn: func(context.Context, string) (containertypes.InspectResponse, error) { + return baseInspect(), nil + }, + imagePullFn: func(context.Context, string, image.PullOptions) (io.ReadCloser, error) { + return io.NopCloser(strings.NewReader("{}")), nil + }, + containerStopFn: func(context.Context, string, containertypes.StopOptions) error { + return errors.New("stop failed") + }, + }, + logger: logger, + } + + result := agent.updateContainer(context.Background(), "container1") + if result.Error == "" { + t.Fatal("expected error for stop failure") + } + }) + + t.Run("rename error", func(t *testing.T) { + startCalled := false + agent := &Agent{ + docker: &fakeDockerClient{ + containerInspectFn: func(context.Context, string) (containertypes.InspectResponse, error) { + return baseInspect(), nil + }, + imagePullFn: func(context.Context, string, image.PullOptions) (io.ReadCloser, error) { + return io.NopCloser(strings.NewReader("{}")), nil + }, + containerStopFn: func(context.Context, string, containertypes.StopOptions) error { + return nil + }, + containerRenameFn: func(context.Context, string, string) error { + return errors.New("rename failed") + }, + containerStartFn: func(context.Context, string, containertypes.StartOptions) error { + startCalled = true + return nil + }, + }, + logger: logger, + } + + result := agent.updateContainer(context.Background(), "container1") + if result.Error == "" { + t.Fatal("expected error for rename failure") + } + if !startCalled { + t.Fatal("expected original container to be restarted") + } + }) + + t.Run("create error", func(t *testing.T) { + renameCalled := false + startCalled := false + agent := &Agent{ + docker: &fakeDockerClient{ + containerInspectFn: func(context.Context, string) (containertypes.InspectResponse, error) { + return baseInspect(), nil + }, + imagePullFn: func(context.Context, string, image.PullOptions) (io.ReadCloser, error) { + return io.NopCloser(strings.NewReader("{}")), nil + }, + containerStopFn: func(context.Context, string, containertypes.StopOptions) error { + return nil + }, + containerRenameFn: func(context.Context, string, string) error { + renameCalled = true + return nil + }, + containerCreateFn: func(context.Context, *containertypes.Config, *containertypes.HostConfig, *network.NetworkingConfig, *v1.Platform, string) (containertypes.CreateResponse, error) { + return containertypes.CreateResponse{}, errors.New("create failed") + }, + containerStartFn: func(context.Context, string, containertypes.StartOptions) error { + startCalled = true + return nil + }, + }, + logger: logger, + } + + result := agent.updateContainer(context.Background(), "container1") + if result.Error == "" { + t.Fatal("expected error for create failure") + } + if !renameCalled || !startCalled { + t.Fatal("expected rollback to rename and restart") + } + }) + + t.Run("start error", func(t *testing.T) { + removed := false + renamed := false + restarted := false + agent := &Agent{ + docker: &fakeDockerClient{ + containerInspectFn: func(context.Context, string) (containertypes.InspectResponse, error) { + return baseInspect(), nil + }, + imagePullFn: func(context.Context, string, image.PullOptions) (io.ReadCloser, error) { + return io.NopCloser(strings.NewReader("{}")), nil + }, + containerStopFn: func(context.Context, string, containertypes.StopOptions) error { + return nil + }, + containerRenameFn: func(context.Context, string, string) error { + return nil + }, + containerCreateFn: func(context.Context, *containertypes.Config, *containertypes.HostConfig, *network.NetworkingConfig, *v1.Platform, string) (containertypes.CreateResponse, error) { + return containertypes.CreateResponse{ID: "new123"}, nil + }, + containerStartFn: func(_ context.Context, id string, _ containertypes.StartOptions) error { + if id == "new123" { + return errors.New("start failed") + } + restarted = true + return nil + }, + containerRemoveFn: func(context.Context, string, containertypes.RemoveOptions) error { + removed = true + return nil + }, + containerRenameFn: func(context.Context, string, string) error { + renamed = true + return nil + }, + }, + logger: logger, + } + + result := agent.updateContainer(context.Background(), "container1") + if result.Error == "" { + t.Fatal("expected error for start failure") + } + if !removed || !renamed || !restarted { + t.Fatal("expected rollback cleanup") + } + }) +} + +func TestUpdateContainer_Success(t *testing.T) { + logger := zerolog.Nop() + swap(t, &sleepFn, func(time.Duration) {}) + swap(t, &nowFn, func() time.Time { + return time.Date(2024, 3, 1, 12, 0, 0, 0, time.UTC) + }) + + var ( + mu sync.Mutex + cleanupCalls int + cleanupErr error + cleanupCh = make(chan struct{}) + ) + + agent := &Agent{ + docker: &fakeDockerClient{ + containerInspectFn: func(_ context.Context, id string) (containertypes.InspectResponse, error) { + if id == "new123" { + inspect := baseInspect() + inspect.ContainerJSONBase.Image = "sha256:new0000000000" + return inspect, nil + } + return baseInspect(), nil + }, + imagePullFn: func(context.Context, string, image.PullOptions) (io.ReadCloser, error) { + return io.NopCloser(strings.NewReader("{}")), nil + }, + containerStopFn: func(context.Context, string, containertypes.StopOptions) error { + return nil + }, + containerRenameFn: func(context.Context, string, string) error { + return nil + }, + containerCreateFn: func(context.Context, *containertypes.Config, *containertypes.HostConfig, *network.NetworkingConfig, *v1.Platform, string) (containertypes.CreateResponse, error) { + return containertypes.CreateResponse{ID: "new123"}, nil + }, + networkConnectFn: func(context.Context, string, string, *network.EndpointSettings) error { + return errors.New("network connect failed") + }, + containerStartFn: func(context.Context, string, containertypes.StartOptions) error { + return nil + }, + containerRemoveFn: func(context.Context, string, containertypes.RemoveOptions) error { + mu.Lock() + cleanupCalls++ + err := cleanupErr + mu.Unlock() + close(cleanupCh) + return err + }, + }, + logger: logger, + } + + result := agent.updateContainer(context.Background(), "container1") + if !result.Success { + t.Fatalf("expected success, got error %q", result.Error) + } + if !result.BackupCreated || result.BackupContainer == "" { + t.Fatalf("expected backup to be created") + } + if result.NewImageDigest == "" { + t.Fatalf("expected new image digest") + } + + <-cleanupCh + + mu.Lock() + if cleanupCalls != 1 { + t.Fatalf("expected cleanup to be called once, got %d", cleanupCalls) + } + mu.Unlock() +} + +func TestHandleUpdateContainerCommand(t *testing.T) { + logger := zerolog.Nop() + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + })) + defer server.Close() + + agent := &Agent{ + logger: logger, + hostID: "host1", + httpClients: map[bool]*http.Client{ + false: server.Client(), + }, + docker: &fakeDockerClient{ + containerInspectFn: func(_ context.Context, id string) (containertypes.InspectResponse, error) { + inspect := baseInspect() + if id == "new123" { + inspect.ContainerJSONBase.Image = "sha256:new0000000000" + } + return inspect, nil + }, + imagePullFn: func(context.Context, string, image.PullOptions) (io.ReadCloser, error) { + return io.NopCloser(strings.NewReader("{}")), nil + }, + containerStopFn: func(context.Context, string, containertypes.StopOptions) error { + return nil + }, + containerRenameFn: func(context.Context, string, string) error { + return nil + }, + containerCreateFn: func(context.Context, *containertypes.Config, *containertypes.HostConfig, *network.NetworkingConfig, *v1.Platform, string) (containertypes.CreateResponse, error) { + return containertypes.CreateResponse{ID: "new123"}, nil + }, + containerStartFn: func(context.Context, string, containertypes.StartOptions) error { + return nil + }, + }, + } + + command := agentsdocker.Command{ + ID: "cmd1", + Type: agentsdocker.CommandTypeUpdateContainer, + Payload: map[string]any{ + "containerId": "container1", + }, + } + + if err := agent.handleUpdateContainerCommand(context.Background(), TargetConfig{URL: server.URL}, command); err != nil { + t.Fatalf("unexpected error: %v", err) + } + + t.Run("missing container id", func(t *testing.T) { + if err := agent.handleUpdateContainerCommand(context.Background(), TargetConfig{URL: server.URL}, agentsdocker.Command{ID: "cmd2"}); err != nil { + t.Fatalf("unexpected error: %v", err) + } + }) +} diff --git a/internal/dockeragent/deps.go b/internal/dockeragent/deps.go index 84c6653d3..4522b60c0 100644 --- a/internal/dockeragent/deps.go +++ b/internal/dockeragent/deps.go @@ -25,6 +25,7 @@ var ( tryRuntimeCandidateFn = tryRuntimeCandidate osExecutableFn = os.Executable osCreateTempFn = os.CreateTemp + closeFileFn = func(f *os.File) error { return f.Close() } osRenameFn = os.Rename osChmodFn = os.Chmod osRemoveFn = os.Remove diff --git a/internal/dockeragent/runtime_coverage_test.go b/internal/dockeragent/runtime_coverage_test.go new file mode 100644 index 000000000..25249fad5 --- /dev/null +++ b/internal/dockeragent/runtime_coverage_test.go @@ -0,0 +1,137 @@ +package dockeragent + +import ( + "context" + "errors" + "strings" + "testing" + + systemtypes "github.com/docker/docker/api/types/system" + "github.com/docker/docker/client" + "github.com/rs/zerolog" +) + +func TestTryRuntimeCandidate(t *testing.T) { + t.Run("new client error", func(t *testing.T) { + swap(t, &newDockerClientFn, func(_ ...client.Opt) (dockerClient, error) { + return nil, errors.New("dial failed") + }) + + if _, _, err := tryRuntimeCandidate(nil); err == nil { + t.Fatal("expected error") + } + }) + + t.Run("info error closes client", func(t *testing.T) { + closed := false + fake := &fakeDockerClient{ + infoFunc: func(_ context.Context) (systemtypes.Info, error) { + return systemtypes.Info{}, errors.New("info failed") + }, + closeFn: func() error { + closed = true + return nil + }, + } + swap(t, &newDockerClientFn, func(_ ...client.Opt) (dockerClient, error) { + return fake, nil + }) + + if _, _, err := tryRuntimeCandidate(nil); err == nil { + t.Fatal("expected error") + } + if !closed { + t.Fatal("expected Close to be called on error") + } + }) + + t.Run("success", func(t *testing.T) { + fake := &fakeDockerClient{ + infoFunc: func(_ context.Context) (systemtypes.Info, error) { + return systemtypes.Info{ServerVersion: "24.0.0"}, nil + }, + } + swap(t, &newDockerClientFn, func(_ ...client.Opt) (dockerClient, error) { + return fake, nil + }) + + gotClient, info, err := tryRuntimeCandidate(nil) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if gotClient != fake { + t.Fatal("expected returned client to match fake") + } + if info.ServerVersion != "24.0.0" { + t.Fatalf("expected info to be returned") + } + }) +} + +func TestConnectRuntime(t *testing.T) { + t.Run("no candidates", func(t *testing.T) { + swap(t, &buildRuntimeCandidatesFn, func(_ RuntimeKind) []runtimeCandidate { + return nil + }) + + if _, _, _, err := connectRuntime(RuntimeAuto, nil); err == nil { + t.Fatal("expected error with no candidates") + } + }) + + t.Run("candidate failure accumulates attempts", func(t *testing.T) { + swap(t, &buildRuntimeCandidatesFn, func(_ RuntimeKind) []runtimeCandidate { + return []runtimeCandidate{{label: "first"}} + }) + swap(t, &tryRuntimeCandidateFn, func(_ []client.Opt) (dockerClient, systemtypes.Info, error) { + return nil, systemtypes.Info{}, errors.New("no socket") + }) + + if _, _, _, err := connectRuntime(RuntimeAuto, nil); err == nil { + t.Fatal("expected error") + } + }) + + t.Run("preference mismatch returns error", func(t *testing.T) { + fake := &fakeDockerClient{daemonHost: "unix:///run/podman/podman.sock"} + swap(t, &buildRuntimeCandidatesFn, func(_ RuntimeKind) []runtimeCandidate { + return []runtimeCandidate{{label: "podman"}} + }) + swap(t, &tryRuntimeCandidateFn, func(_ []client.Opt) (dockerClient, systemtypes.Info, error) { + return fake, systemtypes.Info{ServerVersion: "4.6.1"}, nil + }) + + _, _, _, err := connectRuntime(RuntimeDocker, nil) + if err == nil { + t.Fatal("expected error") + } + if !strings.Contains(err.Error(), "detected podman runtime") { + t.Fatalf("expected mismatch error, got %v", err) + } + }) + + t.Run("success with logger", func(t *testing.T) { + fake := &fakeDockerClient{daemonHost: "unix:///var/run/docker.sock"} + swap(t, &buildRuntimeCandidatesFn, func(_ RuntimeKind) []runtimeCandidate { + return []runtimeCandidate{{label: "docker"}} + }) + swap(t, &tryRuntimeCandidateFn, func(_ []client.Opt) (dockerClient, systemtypes.Info, error) { + return fake, systemtypes.Info{ServerVersion: "24.0.0"}, nil + }) + + logger := zerolog.Nop() + cli, info, runtimeKind, err := connectRuntime(RuntimeAuto, &logger) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if cli != fake { + t.Fatalf("expected client to be returned") + } + if info.ServerVersion != "24.0.0" { + t.Fatalf("expected info to be returned") + } + if runtimeKind != RuntimeDocker { + t.Fatalf("expected runtime docker, got %v", runtimeKind) + } + }) +} diff --git a/internal/dockeragent/swarm_coverage_test.go b/internal/dockeragent/swarm_coverage_test.go new file mode 100644 index 000000000..53e239a9f --- /dev/null +++ b/internal/dockeragent/swarm_coverage_test.go @@ -0,0 +1,309 @@ +package dockeragent + +import ( + "context" + "errors" + "testing" + "time" + + swarmtypes "github.com/docker/docker/api/types/swarm" + systemtypes "github.com/docker/docker/api/types/system" + agentsdocker "github.com/rcourtman/pulse-go-rewrite/pkg/agents/docker" +) + +func TestMapSwarmService(t *testing.T) { + now := time.Date(2024, 1, 2, 3, 4, 5, 0, time.UTC) + updateAt := now.Add(2 * time.Minute) + + svc := &swarmtypes.Service{ + ID: "svc1", + Spec: swarmtypes.ServiceSpec{ + Annotations: swarmtypes.Annotations{ + Name: "web", + Labels: map[string]string{ + "com.docker.stack.namespace": "stack", + }, + }, + TaskTemplate: swarmtypes.TaskSpec{ + ContainerSpec: &swarmtypes.ContainerSpec{ + Image: "nginx:latest", + }, + }, + }, + ServiceStatus: &swarmtypes.ServiceStatus{ + DesiredTasks: 3, + RunningTasks: 2, + CompletedTasks: 1, + }, + UpdateStatus: &swarmtypes.UpdateStatus{ + State: swarmtypes.UpdateStateCompleted, + Message: "done", + CompletedAt: &updateAt, + }, + Endpoint: swarmtypes.Endpoint{ + Ports: []swarmtypes.PortConfig{ + {Name: "http", Protocol: swarmtypes.PortConfigProtocolTCP, TargetPort: 80, PublishedPort: 8080, PublishMode: swarmtypes.PortConfigPublishModeIngress}, + }, + }, + Meta: swarmtypes.Meta{ + CreatedAt: now, + UpdatedAt: updateAt, + }, + } + + got := mapSwarmService(svc) + if got.ID != "svc1" || got.Name != "web" || got.Mode == "" { + t.Fatalf("unexpected service mapping: %+v", got) + } + if got.Stack != "stack" { + t.Fatalf("expected stack label to be mapped, got %q", got.Stack) + } + if got.Image != "nginx:latest" { + t.Fatalf("expected image to be mapped, got %q", got.Image) + } + if got.DesiredTasks != 3 || got.RunningTasks != 2 || got.CompletedTasks != 1 { + t.Fatalf("unexpected task counts: %+v", got) + } + if got.UpdateStatus == nil || got.UpdateStatus.State != string(swarmtypes.UpdateStateCompleted) { + t.Fatalf("expected update status to be mapped, got %+v", got.UpdateStatus) + } + if got.EndpointPorts == nil || len(got.EndpointPorts) != 1 { + t.Fatalf("expected endpoint ports to be mapped") + } + if got.CreatedAt == nil || got.UpdatedAt == nil { + t.Fatalf("expected timestamps to be mapped") + } +} + +func TestMapSwarmTask(t *testing.T) { + now := time.Date(2024, 2, 3, 4, 5, 6, 0, time.UTC) + containerStart := now.Add(-2 * time.Minute) + containerFinish := now.Add(-time.Minute) + + containers := map[string]agentsdocker.Container{ + "container-full": { + ID: "container-full", + Name: "web.1", + StartedAt: &containerStart, + FinishedAt: func() *time.Time { + val := containerFinish + return &val + }(), + }, + } + + t.Run("running task", func(t *testing.T) { + task := &swarmtypes.Task{ + ID: "task1", + ServiceID: "svc1", + Slot: 1, + NodeID: "node1", + Status: swarmtypes.TaskStatus{ + State: swarmtypes.TaskStateRunning, + Timestamp: now, + ContainerStatus: &swarmtypes.ContainerStatus{ + ContainerID: "container-full", + }, + }, + Meta: swarmtypes.Meta{ + CreatedAt: now.Add(-time.Minute), + }, + } + svc := &swarmtypes.Service{Spec: swarmtypes.ServiceSpec{Annotations: swarmtypes.Annotations{Name: "web"}}} + + got := mapSwarmTask(task, svc, containers) + if got.ServiceName != "web" { + t.Fatalf("expected service name, got %q", got.ServiceName) + } + if got.ContainerName != "web.1" { + t.Fatalf("expected container name, got %q", got.ContainerName) + } + if got.StartedAt == nil || !got.StartedAt.Equal(now) { + t.Fatalf("expected started at timestamp from task") + } + }) + + t.Run("completed task uses container timestamps", func(t *testing.T) { + task := &swarmtypes.Task{ + ID: "task2", + ServiceID: "svc2", + Status: swarmtypes.TaskStatus{ + State: swarmtypes.TaskStateCompleted, + ContainerStatus: &swarmtypes.ContainerStatus{ + ContainerID: "container-full", + }, + }, + Meta: swarmtypes.Meta{ + CreatedAt: now.Add(-time.Hour), + }, + } + + got := mapSwarmTask(task, nil, containers) + if got.CompletedAt == nil || !got.CompletedAt.Equal(containerFinish) { + t.Fatalf("expected completed at from container timestamp") + } + if got.StartedAt == nil || !got.StartedAt.Equal(containerStart) { + t.Fatalf("expected started at from container timestamp") + } + }) +} + +func TestCollectSwarmDataFromManager(t *testing.T) { + agent := &Agent{ + docker: &fakeDockerClient{ + serviceListFn: func(_ context.Context, _ swarmtypes.ServiceListOptions) ([]swarmtypes.Service, error) { + return []swarmtypes.Service{ + {ID: "svc1", Spec: swarmtypes.ServiceSpec{Annotations: swarmtypes.Annotations{Name: "alpha"}}}, + {ID: "svc2", Spec: swarmtypes.ServiceSpec{Annotations: swarmtypes.Annotations{Name: "beta"}}}, + }, nil + }, + taskListFn: func(_ context.Context, opts swarmtypes.TaskListOptions) ([]swarmtypes.Task, error) { + if got := opts.Filters.Get("node"); len(got) != 1 || got[0] != "node1" { + t.Fatalf("expected node filter to include node1, got %v", got) + } + return []swarmtypes.Task{ + {ID: "task1", ServiceID: "svc1", Status: swarmtypes.TaskStatus{State: swarmtypes.TaskStateRunning}}, + }, nil + }, + }, + } + + info := systemtypes.Info{ + Swarm: systemtypes.SwarmInfo{ + NodeID: "node1", + }, + } + + services, tasks, err := agent.collectSwarmDataFromManager(context.Background(), info, swarmScopeNode, nil, true, true) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if len(tasks) != 1 { + t.Fatalf("expected 1 task, got %d", len(tasks)) + } + if len(services) != 1 { + t.Fatalf("expected filtered services, got %d", len(services)) + } +} + +func TestCollectSwarmData(t *testing.T) { + t.Run("unsupported swarm returns nils", func(t *testing.T) { + agent := &Agent{supportsSwarm: false} + services, tasks, info := agent.collectSwarmData(context.Background(), systemtypes.Info{}, nil) + if services != nil || tasks != nil || info != nil { + t.Fatal("expected nil outputs when swarm unsupported") + } + }) + + t.Run("inactive swarm returns info only", func(t *testing.T) { + agent := &Agent{supportsSwarm: true, cfg: Config{SwarmScope: swarmScopeNode}} + info := systemtypes.Info{ + Swarm: systemtypes.SwarmInfo{ + NodeID: "node1", + LocalNodeState: swarmtypes.LocalNodeStatePending, + }, + } + + services, tasks, swarmInfo := agent.collectSwarmData(context.Background(), info, nil) + if services != nil || tasks != nil { + t.Fatal("expected nil services/tasks for inactive swarm") + } + if swarmInfo == nil || swarmInfo.NodeID != "node1" { + t.Fatal("expected swarm info to be returned") + } + }) + + t.Run("manager error falls back to containers", func(t *testing.T) { + agent := &Agent{ + supportsSwarm: true, + cfg: Config{ + IncludeServices: true, + IncludeTasks: true, + SwarmScope: swarmScopeAuto, + }, + docker: &fakeDockerClient{ + serviceListFn: func(context.Context, swarmtypes.ServiceListOptions) ([]swarmtypes.Service, error) { + return nil, errors.New("boom") + }, + }, + } + + containers := []agentsdocker.Container{ + { + ID: "container1", + Name: "web.1", + Image: "nginx:latest", + State: "running", + Labels: map[string]string{ + "com.docker.swarm.service.id": "svc1", + "com.docker.swarm.service.name": "web", + "com.docker.swarm.task.id": "task1", + "com.docker.swarm.task.slot": "1", + "com.docker.swarm.task.message": "ok", + "com.docker.swarm.task.error": "", + "com.docker.stack.namespace": "stack", + "com.docker.swarm.node.id": "node1", + "com.docker.swarm.node.name": "node1", + "com.docker.swarm.task.desired-state": "running", + }, + }, + } + + info := systemtypes.Info{ + Swarm: systemtypes.SwarmInfo{ + NodeID: "node1", + ControlAvailable: true, + LocalNodeState: swarmtypes.LocalNodeStateActive, + }, + } + + services, tasks, swarmInfo := agent.collectSwarmData(context.Background(), info, containers) + if len(tasks) != 1 || len(services) != 1 { + t.Fatalf("expected derived tasks/services, got %d/%d", len(tasks), len(services)) + } + if swarmInfo == nil || swarmInfo.Scope != swarmScopeNode { + t.Fatalf("expected effective scope node, got %+v", swarmInfo) + } + }) + + t.Run("manager success uses manager data", func(t *testing.T) { + agent := &Agent{ + supportsSwarm: true, + cfg: Config{ + IncludeServices: true, + IncludeTasks: true, + SwarmScope: swarmScopeCluster, + }, + docker: &fakeDockerClient{ + serviceListFn: func(context.Context, swarmtypes.ServiceListOptions) ([]swarmtypes.Service, error) { + return []swarmtypes.Service{ + {ID: "svc1", Spec: swarmtypes.ServiceSpec{Annotations: swarmtypes.Annotations{Name: "zeta"}}}, + {ID: "svc2", Spec: swarmtypes.ServiceSpec{Annotations: swarmtypes.Annotations{Name: "alpha"}}}, + }, nil + }, + taskListFn: func(context.Context, swarmtypes.TaskListOptions) ([]swarmtypes.Task, error) { + return []swarmtypes.Task{ + {ID: "task2", ServiceID: "svc2", Slot: 2}, + {ID: "task1", ServiceID: "svc2", Slot: 1}, + }, nil + }, + }, + } + + info := systemtypes.Info{ + Swarm: systemtypes.SwarmInfo{ + NodeID: "node1", + ControlAvailable: true, + LocalNodeState: swarmtypes.LocalNodeStateActive, + }, + } + + services, tasks, swarmInfo := agent.collectSwarmData(context.Background(), info, nil) + if len(tasks) != 2 || len(services) != 2 { + t.Fatalf("expected manager tasks/services, got %d/%d", len(tasks), len(services)) + } + if swarmInfo == nil || swarmInfo.Scope != swarmScopeCluster { + t.Fatalf("unexpected swarm info: %+v", swarmInfo) + } + }) +} diff --git a/internal/dockeragent/system_coverage_test.go b/internal/dockeragent/system_coverage_test.go new file mode 100644 index 000000000..3df79578c --- /dev/null +++ b/internal/dockeragent/system_coverage_test.go @@ -0,0 +1,145 @@ +package dockeragent + +import ( + "errors" + "io" + "os" + "path/filepath" + "strings" + "testing" +) + +func TestReadProcUptime(t *testing.T) { + tests := []struct { + name string + reader func() (io.ReadCloser, error) + wantError bool + }{ + { + name: "open error", + reader: func() (io.ReadCloser, error) { + return nil, errors.New("open failed") + }, + wantError: true, + }, + { + name: "empty file", + reader: func() (io.ReadCloser, error) { + return io.NopCloser(strings.NewReader("")), nil + }, + wantError: true, + }, + { + name: "invalid contents", + reader: func() (io.ReadCloser, error) { + return io.NopCloser(strings.NewReader(" ")), nil + }, + wantError: true, + }, + { + name: "parse error", + reader: func() (io.ReadCloser, error) { + return io.NopCloser(strings.NewReader("notnum 0")), nil + }, + wantError: true, + }, + { + name: "success", + reader: func() (io.ReadCloser, error) { + return io.NopCloser(strings.NewReader("123.45 0.00")), nil + }, + wantError: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + swap(t, &openProcUptime, tt.reader) + + value, err := readProcUptime() + if tt.wantError { + if err == nil { + t.Fatal("expected error") + } + return + } + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if value <= 0 { + t.Fatalf("expected uptime > 0, got %f", value) + } + }) + } +} + +func TestReadSystemUptime(t *testing.T) { + t.Run("error returns zero", func(t *testing.T) { + swap(t, &openProcUptime, func() (io.ReadCloser, error) { + return nil, errors.New("boom") + }) + + if got := readSystemUptime(); got != 0 { + t.Fatalf("expected 0, got %d", got) + } + }) + + t.Run("success returns seconds", func(t *testing.T) { + swap(t, &openProcUptime, func() (io.ReadCloser, error) { + return io.NopCloser(strings.NewReader("42.3 0.00")), nil + }) + + if got := readSystemUptime(); got != 42 { + t.Fatalf("expected 42, got %d", got) + } + }) +} + +func TestReadMachineID(t *testing.T) { + t.Run("success", func(t *testing.T) { + dir := t.TempDir() + path := filepath.Join(dir, "machine-id") + if err := os.WriteFile(path, []byte("abc123\n"), 0600); err != nil { + t.Fatalf("write machine-id: %v", err) + } + swap(t, &machineIDPaths, []string{path}) + + got, err := readMachineID() + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if got != "abc123" { + t.Fatalf("expected abc123, got %q", got) + } + }) + + t.Run("not found", func(t *testing.T) { + swap(t, &machineIDPaths, []string{filepath.Join(t.TempDir(), "missing")}) + + if _, err := readMachineID(); err == nil { + t.Fatal("expected error for missing machine-id") + } + }) +} + +func TestIsUnraid(t *testing.T) { + t.Run("true when file exists", func(t *testing.T) { + dir := t.TempDir() + path := filepath.Join(dir, "unraid-version") + if err := os.WriteFile(path, []byte("6.12.0"), 0600); err != nil { + t.Fatalf("write unraid-version: %v", err) + } + swap(t, &unraidVersionPath, path) + + if !isUnraid() { + t.Fatal("expected isUnraid to be true") + } + }) + + t.Run("false when missing", func(t *testing.T) { + swap(t, &unraidVersionPath, filepath.Join(t.TempDir(), "missing")) + if isUnraid() { + t.Fatal("expected isUnraid to be false") + } + }) +} diff --git a/internal/dockeragent/systemd_test.go b/internal/dockeragent/systemd_test.go new file mode 100644 index 000000000..4f7a29564 --- /dev/null +++ b/internal/dockeragent/systemd_test.go @@ -0,0 +1,151 @@ +package dockeragent + +import ( + "context" + "os" + "path/filepath" + "strings" + "testing" +) + +func writeSystemctl(t *testing.T, script string) string { + t.Helper() + + dir := t.TempDir() + path := filepath.Join(dir, "systemctl") + content := "#!/bin/sh\n" + script + "\n" + if err := os.WriteFile(path, []byte(content), 0755); err != nil { + t.Fatalf("write systemctl: %v", err) + } + + prevPath := os.Getenv("PATH") + if err := os.Setenv("PATH", dir); err != nil { + t.Fatalf("set PATH: %v", err) + } + t.Cleanup(func() { + _ = os.Setenv("PATH", prevPath) + }) + + return path +} + +func TestDisableSystemdService(t *testing.T) { + t.Run("no systemctl", func(t *testing.T) { + prev := os.Getenv("PATH") + _ = os.Setenv("PATH", "") + t.Cleanup(func() { + _ = os.Setenv("PATH", prev) + }) + + if err := disableSystemdService(context.Background(), "pulse-docker-agent"); err != nil { + t.Fatalf("unexpected error: %v", err) + } + }) + + t.Run("success", func(t *testing.T) { + writeSystemctl(t, "exit 0") + if err := disableSystemdService(context.Background(), "pulse-docker-agent"); err != nil { + t.Fatalf("unexpected error: %v", err) + } + }) + + t.Run("not found exit code", func(t *testing.T) { + writeSystemctl(t, "echo 'unit not-found' >&2\nexit 5") + if err := disableSystemdService(context.Background(), "pulse-docker-agent"); err != nil { + t.Fatalf("unexpected error: %v", err) + } + }) + + t.Run("access denied", func(t *testing.T) { + writeSystemctl(t, "echo 'access denied' >&2\nexit 1") + err := disableSystemdService(context.Background(), "pulse-docker-agent") + if err == nil || !strings.Contains(err.Error(), "access denied") { + t.Fatalf("expected access denied error, got %v", err) + } + }) + + t.Run("other error", func(t *testing.T) { + writeSystemctl(t, "echo 'boom' >&2\nexit 2") + if err := disableSystemdService(context.Background(), "pulse-docker-agent"); err == nil { + t.Fatal("expected error") + } + }) +} + +func TestStopSystemdService(t *testing.T) { + t.Run("no systemctl", func(t *testing.T) { + prev := os.Getenv("PATH") + _ = os.Setenv("PATH", "") + t.Cleanup(func() { + _ = os.Setenv("PATH", prev) + }) + + if err := stopSystemdService(context.Background(), "pulse-docker-agent"); err != nil { + t.Fatalf("unexpected error: %v", err) + } + }) + + t.Run("success", func(t *testing.T) { + writeSystemctl(t, "exit 0") + if err := stopSystemdService(context.Background(), "pulse-docker-agent"); err != nil { + t.Fatalf("unexpected error: %v", err) + } + }) + + t.Run("not found exit code", func(t *testing.T) { + writeSystemctl(t, "echo 'could not be found' >&2\nexit 5") + if err := stopSystemdService(context.Background(), "pulse-docker-agent"); err != nil { + t.Fatalf("unexpected error: %v", err) + } + }) + + t.Run("permission denied", func(t *testing.T) { + writeSystemctl(t, "echo 'permission denied' >&2\nexit 1") + err := stopSystemdService(context.Background(), "pulse-docker-agent") + if err == nil || !strings.Contains(err.Error(), "access denied") { + t.Fatalf("expected access denied error, got %v", err) + } + }) + + t.Run("other error", func(t *testing.T) { + writeSystemctl(t, "echo 'boom' >&2\nexit 2") + if err := stopSystemdService(context.Background(), "pulse-docker-agent"); err == nil { + t.Fatal("expected error") + } + }) +} + +func TestRemoveFileIfExists(t *testing.T) { + t.Run("missing file", func(t *testing.T) { + if err := removeFileIfExists(filepath.Join(t.TempDir(), "missing")); err != nil { + t.Fatalf("unexpected error: %v", err) + } + }) + + t.Run("removes file", func(t *testing.T) { + path := filepath.Join(t.TempDir(), "file") + if err := os.WriteFile(path, []byte("data"), 0600); err != nil { + t.Fatalf("write file: %v", err) + } + if err := removeFileIfExists(path); err != nil { + t.Fatalf("unexpected error: %v", err) + } + if _, err := os.Stat(path); !os.IsNotExist(err) { + t.Fatalf("expected file to be removed") + } + }) + + t.Run("remove error", func(t *testing.T) { + dir := filepath.Join(t.TempDir(), "dir") + if err := os.MkdirAll(dir, 0700); err != nil { + t.Fatalf("mkdir: %v", err) + } + nested := filepath.Join(dir, "file") + if err := os.WriteFile(nested, []byte("data"), 0600); err != nil { + t.Fatalf("write nested file: %v", err) + } + if err := removeFileIfExists(dir); err == nil { + t.Fatal("expected error") + } + }) +} diff --git a/internal/dockeragent/test_helpers_test.go b/internal/dockeragent/test_helpers_test.go new file mode 100644 index 000000000..6db707cf6 --- /dev/null +++ b/internal/dockeragent/test_helpers_test.go @@ -0,0 +1,167 @@ +package dockeragent + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "io" + "testing" + + containertypes "github.com/docker/docker/api/types/container" + "github.com/docker/docker/api/types/image" + "github.com/docker/docker/api/types/network" + swarmtypes "github.com/docker/docker/api/types/swarm" + systemtypes "github.com/docker/docker/api/types/system" + "github.com/opencontainers/image-spec/specs-go/v1" +) + +type fakeDockerClient struct { + daemonHost string + infoFunc func(ctx context.Context) (systemtypes.Info, error) + containerListFunc func(ctx context.Context, opts containertypes.ListOptions) ([]containertypes.Summary, error) + containerInspectWithRawFn func(ctx context.Context, id string, size bool) (containertypes.InspectResponse, []byte, error) + containerStatsOneShotFn func(ctx context.Context, id string) (containertypes.StatsResponseReader, error) + containerInspectFn func(ctx context.Context, id string) (containertypes.InspectResponse, error) + imagePullFn func(ctx context.Context, ref string, opts image.PullOptions) (io.ReadCloser, error) + containerStopFn func(ctx context.Context, id string, opts containertypes.StopOptions) error + containerRenameFn func(ctx context.Context, id, newName string) error + containerCreateFn func(ctx context.Context, config *containertypes.Config, hostConfig *containertypes.HostConfig, networkingConfig *network.NetworkingConfig, platform *v1.Platform, containerName string) (containertypes.CreateResponse, error) + networkConnectFn func(ctx context.Context, netName, containerID string, endpoint *network.EndpointSettings) error + containerStartFn func(ctx context.Context, id string, opts containertypes.StartOptions) error + containerRemoveFn func(ctx context.Context, id string, opts containertypes.RemoveOptions) error + serviceListFn func(ctx context.Context, opts swarmtypes.ServiceListOptions) ([]swarmtypes.Service, error) + taskListFn func(ctx context.Context, opts swarmtypes.TaskListOptions) ([]swarmtypes.Task, error) + closeFn func() error +} + +func (f *fakeDockerClient) Info(ctx context.Context) (systemtypes.Info, error) { + if f.infoFunc == nil { + return systemtypes.Info{}, errors.New("unexpected Info call") + } + return f.infoFunc(ctx) +} + +func (f *fakeDockerClient) DaemonHost() string { + return f.daemonHost +} + +func (f *fakeDockerClient) ContainerList(ctx context.Context, opts containertypes.ListOptions) ([]containertypes.Summary, error) { + if f.containerListFunc == nil { + return nil, errors.New("unexpected ContainerList call") + } + return f.containerListFunc(ctx, opts) +} + +func (f *fakeDockerClient) ContainerInspectWithRaw(ctx context.Context, id string, size bool) (containertypes.InspectResponse, []byte, error) { + if f.containerInspectWithRawFn == nil { + return containertypes.InspectResponse{}, nil, errors.New("unexpected ContainerInspectWithRaw call") + } + return f.containerInspectWithRawFn(ctx, id, size) +} + +func (f *fakeDockerClient) ContainerStatsOneShot(ctx context.Context, id string) (containertypes.StatsResponseReader, error) { + if f.containerStatsOneShotFn == nil { + return containertypes.StatsResponseReader{}, errors.New("unexpected ContainerStatsOneShot call") + } + return f.containerStatsOneShotFn(ctx, id) +} + +func (f *fakeDockerClient) ContainerInspect(ctx context.Context, id string) (containertypes.InspectResponse, error) { + if f.containerInspectFn == nil { + return containertypes.InspectResponse{}, errors.New("unexpected ContainerInspect call") + } + return f.containerInspectFn(ctx, id) +} + +func (f *fakeDockerClient) ImagePull(ctx context.Context, ref string, opts image.PullOptions) (io.ReadCloser, error) { + if f.imagePullFn == nil { + return nil, errors.New("unexpected ImagePull call") + } + return f.imagePullFn(ctx, ref, opts) +} + +func (f *fakeDockerClient) ContainerStop(ctx context.Context, id string, opts containertypes.StopOptions) error { + if f.containerStopFn == nil { + return errors.New("unexpected ContainerStop call") + } + return f.containerStopFn(ctx, id, opts) +} + +func (f *fakeDockerClient) ContainerRename(ctx context.Context, id, newName string) error { + if f.containerRenameFn == nil { + return errors.New("unexpected ContainerRename call") + } + return f.containerRenameFn(ctx, id, newName) +} + +func (f *fakeDockerClient) ContainerCreate(ctx context.Context, config *containertypes.Config, hostConfig *containertypes.HostConfig, networkingConfig *network.NetworkingConfig, platform *v1.Platform, containerName string) (containertypes.CreateResponse, error) { + if f.containerCreateFn == nil { + return containertypes.CreateResponse{}, errors.New("unexpected ContainerCreate call") + } + return f.containerCreateFn(ctx, config, hostConfig, networkingConfig, platform, containerName) +} + +func (f *fakeDockerClient) NetworkConnect(ctx context.Context, netName, containerID string, endpoint *network.EndpointSettings) error { + if f.networkConnectFn == nil { + return errors.New("unexpected NetworkConnect call") + } + return f.networkConnectFn(ctx, netName, containerID, endpoint) +} + +func (f *fakeDockerClient) ContainerStart(ctx context.Context, id string, opts containertypes.StartOptions) error { + if f.containerStartFn == nil { + return errors.New("unexpected ContainerStart call") + } + return f.containerStartFn(ctx, id, opts) +} + +func (f *fakeDockerClient) ContainerRemove(ctx context.Context, id string, opts containertypes.RemoveOptions) error { + if f.containerRemoveFn == nil { + return errors.New("unexpected ContainerRemove call") + } + return f.containerRemoveFn(ctx, id, opts) +} + +func (f *fakeDockerClient) ServiceList(ctx context.Context, opts swarmtypes.ServiceListOptions) ([]swarmtypes.Service, error) { + if f.serviceListFn == nil { + return nil, errors.New("unexpected ServiceList call") + } + return f.serviceListFn(ctx, opts) +} + +func (f *fakeDockerClient) TaskList(ctx context.Context, opts swarmtypes.TaskListOptions) ([]swarmtypes.Task, error) { + if f.taskListFn == nil { + return nil, errors.New("unexpected TaskList call") + } + return f.taskListFn(ctx, opts) +} + +func (f *fakeDockerClient) Close() error { + if f.closeFn == nil { + return nil + } + return f.closeFn() +} + +func statsReader(t *testing.T, stats containertypes.StatsResponse) containertypes.StatsResponseReader { + t.Helper() + + payload, err := json.Marshal(stats) + if err != nil { + t.Fatalf("marshal stats: %v", err) + } + + return containertypes.StatsResponseReader{ + Body: io.NopCloser(bytes.NewReader(payload)), + } +} + +func swap[T any](t *testing.T, target *T, value T) { + t.Helper() + prev := *target + *target = value + t.Cleanup(func() { + *target = prev + }) +} diff --git a/scripts/install.sh b/scripts/install.sh index 177931f84..f4005cc10 100755 --- a/scripts/install.sh +++ b/scripts/install.sh @@ -1170,10 +1170,36 @@ fi # 5. Linux (Systemd) if command -v systemctl >/dev/null 2>&1; then UNIT="/etc/systemd/system/${AGENT_NAME}.service" + TOKEN_DIR="/var/lib/pulse-agent" + TOKEN_FILE="${TOKEN_DIR}/token" log_info "Configuring Systemd service at $UNIT..." - # Build command line args - build_exec_args + # Write token to secure file (not visible in ps or service file) + mkdir -p "$TOKEN_DIR" + echo -n "$PULSE_TOKEN" > "$TOKEN_FILE" + chmod 600 "$TOKEN_FILE" + chown root:root "$TOKEN_FILE" + log_info "Token stored securely at $TOKEN_FILE (mode 600)" + + # Build command line args WITHOUT the token (token is read from file) + EXEC_ARGS="--url ${PULSE_URL} --interval ${INTERVAL}" + # Always pass enable-host flag since agent defaults to true + if [[ "$ENABLE_HOST" == "true" ]]; then + EXEC_ARGS="$EXEC_ARGS --enable-host" + else + EXEC_ARGS="$EXEC_ARGS --enable-host=false" + fi + if [[ "$ENABLE_DOCKER" == "true" ]]; then EXEC_ARGS="$EXEC_ARGS --enable-docker"; fi + if [[ "$ENABLE_KUBERNETES" == "true" ]]; then EXEC_ARGS="$EXEC_ARGS --enable-kubernetes"; fi + if [[ "$ENABLE_PROXMOX" == "true" ]]; then EXEC_ARGS="$EXEC_ARGS --enable-proxmox"; fi + if [[ -n "$PROXMOX_TYPE" ]]; then EXEC_ARGS="$EXEC_ARGS --proxmox-type ${PROXMOX_TYPE}"; fi + if [[ "$INSECURE" == "true" ]]; then EXEC_ARGS="$EXEC_ARGS --insecure"; fi + if [[ "$ENABLE_COMMANDS" == "true" ]]; then EXEC_ARGS="$EXEC_ARGS --enable-commands"; fi + if [[ -n "$AGENT_ID" ]]; then EXEC_ARGS="$EXEC_ARGS --agent-id ${AGENT_ID}"; fi + # Add disk exclude patterns + for pattern in "${DISK_EXCLUDES[@]}"; do + EXEC_ARGS="$EXEC_ARGS --disk-exclude '${pattern}'" + done cat > "$UNIT" <