mirror of
https://github.com/rcourtman/Pulse.git
synced 2026-04-28 03:20:11 +00:00
Refactor Docker agent: metrics collection, security checks, and batch updates
- Separated metrics collection into internal/dockeragent/collect.go - Added agent self-update pre-flight check (--self-test) - Implemented signed binary verification with key rotation for updates - Added batch update support to frontend with parallel processing - Cleaned up agent.go and added startup cleanup for backup containers - Updated documentation for Docker features and agent security
This commit is contained in:
parent
d38a37fe3d
commit
d07b471e40
15 changed files with 1628 additions and 1112 deletions
|
|
@ -77,6 +77,8 @@ Access the dashboard at `http://<your-ip>:7655`.
|
|||
- **[API Reference](docs/API.md)**: Integrate Pulse with your own tools.
|
||||
- **[Architecture](ARCHITECTURE.md)**: High-level system design and data flow.
|
||||
- **[Troubleshooting](docs/TROUBLESHOOTING.md)**: Solutions to common issues.
|
||||
- **[Agent Security](docs/AGENT_SECURITY.md)**: Details on signed updates and verification.
|
||||
- **[Docker Monitoring](docs/DOCKER.md)**: Setup and management of Docker agents.
|
||||
|
||||
## 🌐 Community Integrations
|
||||
|
||||
|
|
|
|||
|
|
@ -61,6 +61,12 @@ func main() {
|
|||
logger := zerolog.New(os.Stdout).Level(cfg.LogLevel).With().Timestamp().Logger()
|
||||
cfg.Logger = &logger
|
||||
|
||||
// 2a. Handle Self-Test
|
||||
if cfg.SelfTest {
|
||||
logger.Info().Msg("Self-test passed: config loaded and logger initialized")
|
||||
os.Exit(0)
|
||||
}
|
||||
|
||||
// 3. Check if running as Windows service
|
||||
ranAsService, err := runAsWindowsService(cfg, logger)
|
||||
if err != nil {
|
||||
|
|
@ -358,6 +364,7 @@ type Config struct {
|
|||
|
||||
// Network configuration
|
||||
ReportIP string // IP address to report (for multi-NIC systems)
|
||||
SelfTest bool // Perform self-test and exit
|
||||
|
||||
// Health/metrics server
|
||||
HealthAddr string
|
||||
|
|
@ -460,6 +467,7 @@ func loadConfig() Config {
|
|||
kubeMaxPodsFlag := flag.Int("kube-max-pods", defaultInt(envKubeMaxPods, 200), "Max pods included in report")
|
||||
reportIPFlag := flag.String("report-ip", envReportIP, "IP address to report (for multi-NIC systems)")
|
||||
showVersion := flag.Bool("version", false, "Print the agent version and exit")
|
||||
selfTest := flag.Bool("self-test", false, "Perform self-test and exit (used during auto-update)")
|
||||
|
||||
var tagFlags multiValue
|
||||
flag.Var(&tagFlags, "tag", "Tag to apply (repeatable)")
|
||||
|
|
@ -537,6 +545,7 @@ func loadConfig() Config {
|
|||
KubeMaxPods: *kubeMaxPodsFlag,
|
||||
DiskExclude: diskExclude,
|
||||
ReportIP: strings.TrimSpace(*reportIPFlag),
|
||||
SelfTest: *selfTest,
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
29
docs/AGENT_SECURITY.md
Normal file
29
docs/AGENT_SECURITY.md
Normal file
|
|
@ -0,0 +1,29 @@
|
|||
# Agent Security
|
||||
|
||||
Pulse agents incorporate several security mechanisms to ensure that the code running on your infrastructure is authentic and untampered with.
|
||||
|
||||
## Self-Update Security
|
||||
|
||||
The agent's self-update mechanism is critical for security and stability. To prevent supply chain attacks or compromised update servers from distributing malicious or broken agents, Pulse employs a rigorous verification process.
|
||||
|
||||
### 1. Cryptographic Signature Verification
|
||||
All agent binaries are signed using **Ed25519** signatures. The agent contains a hardcoded list of trusted public keys. Before an update is applied, the agent verifies the digital signature of the downloaded binary against these trusted keys.
|
||||
|
||||
- **Key Rotation**: The agent supports multiple trusted keys, allowing for seamless key rotation. A new key can be introduced in one version, and the old key retired in a future version.
|
||||
- **Fail-Safe**: If a binary is not signed or the signature is invalid, the update is strictly rejected.
|
||||
|
||||
### 2. Checksum Verification
|
||||
In addition to the cryptographic signature, the agent verifies the SHA-256 checksum of the downloaded binary to ensure integrity and prevent transmission errors.
|
||||
|
||||
### 3. Pre-Flight Checks
|
||||
To prevent "brick-updates"—bad updates that crash immediately and require manual recovery—the agent performs a pre-flight check before replacing the running executable.
|
||||
1. Download new binary.
|
||||
2. Verify signature and checksum.
|
||||
3. Make executable.
|
||||
4. **Execute with `--self-test`**: The agent attempts to run the new binary with a special flag that loads the configuration and verifies basic functionality.
|
||||
5. If the self-test fails (exit code != 0), the update is aborted.
|
||||
|
||||
## API Security
|
||||
|
||||
- **Token Authentication**: All agent-to-server communication requires a valid API token.
|
||||
- **TLS**: Encrypted by default (unless specifically disabled).
|
||||
|
|
@ -110,6 +110,7 @@ Pulse can detect and apply updates to your Docker containers directly from the U
|
|||
1. **Update Detection**: Pulse compares the local image digest with the latest digest from the container registry
|
||||
2. **Visual Indicator**: Containers with available updates show a blue upward arrow icon
|
||||
3. **One-Click Update**: Click the update button, confirm, and Pulse handles the rest
|
||||
4. **Batch Updates**: Use the **"Update All"** button in the filter bar to update multiple containers safely in sequence
|
||||
|
||||
### Updating a Container
|
||||
|
||||
|
|
@ -123,6 +124,15 @@ Pulse can detect and apply updates to your Docker containers directly from the U
|
|||
- Start a new container with the same configuration
|
||||
- Clean up the backup after 5 minutes
|
||||
|
||||
### Batch Updates
|
||||
|
||||
When multiple containers have updates available, an **"Update All"** button appears in the filter bar.
|
||||
1. Click **"Update All"**
|
||||
2. Confirm the action in the toast notification
|
||||
3. Pulse queues the updates and processes them in parallel batches (default 5 concurrent updates)
|
||||
4. A progress indicator shows the status of the batch operation
|
||||
5. Failed updates are pushed to the end of the queue and reported in the final summary
|
||||
|
||||
### Safety Features
|
||||
|
||||
- **Automatic Backup**: The old container is renamed, not deleted, until the update succeeds
|
||||
|
|
|
|||
|
|
@ -16,8 +16,46 @@ interface DockerFilterProps {
|
|||
onReset?: () => void;
|
||||
activeHostName?: string;
|
||||
onClearHost?: () => void;
|
||||
updateAvailableCount?: number;
|
||||
onUpdateAll?: () => void;
|
||||
}
|
||||
|
||||
const UpdateAllButton: Component<{ count: number; onUpdate: () => void }> = (props) => {
|
||||
const [confirming, setConfirming] = createSignal(false);
|
||||
|
||||
return (
|
||||
<button
|
||||
type="button"
|
||||
onClick={() => {
|
||||
if (confirming()) {
|
||||
props.onUpdate();
|
||||
setConfirming(false);
|
||||
} else {
|
||||
setConfirming(true);
|
||||
// Auto-reset confirmation after 3s
|
||||
setTimeout(() => setConfirming(false), 3000);
|
||||
}
|
||||
}}
|
||||
class={`flex items-center gap-1.5 px-3 py-1 text-xs font-medium rounded-lg transition-colors ${confirming()
|
||||
? 'bg-amber-100 text-amber-800 dark:bg-amber-900/60 dark:text-amber-200 hover:bg-amber-200'
|
||||
: 'bg-blue-100 text-blue-800 dark:bg-blue-900/40 dark:text-blue-200 hover:bg-blue-200'
|
||||
}`}
|
||||
title={confirming() ? "Click again to confirm" : `Update ${props.count} containers`}
|
||||
>
|
||||
<Show when={!confirming()} fallback={
|
||||
<svg class="h-3.5 w-3.5" fill="none" viewBox="0 0 24 24" stroke="currentColor">
|
||||
<path stroke-linecap="round" stroke-linejoin="round" stroke-width="2" d="M5 13l4 4L19 7" />
|
||||
</svg>
|
||||
}>
|
||||
<svg class="h-3.5 w-3.5" fill="none" viewBox="0 0 24 24" stroke="currentColor">
|
||||
<path stroke-linecap="round" stroke-linejoin="round" stroke-width="2" d="M4 16v1a3 3 0 003 3h10a3 3 0 003-3v-1m-4-8l-4-4m0 0L8 8m4-4v12" />
|
||||
</svg>
|
||||
</Show>
|
||||
<span>{confirming() ? 'Confirm Update All?' : `Update All (${props.count})`}</span>
|
||||
</button>
|
||||
);
|
||||
};
|
||||
|
||||
export const DockerFilter: Component<DockerFilterProps> = (props) => {
|
||||
const historyManager = createSearchHistoryManager(STORAGE_KEYS.DOCKER_SEARCH_HISTORY);
|
||||
const [searchHistory, setSearchHistory] = createSignal<string[]>([]);
|
||||
|
|
@ -406,6 +444,14 @@ export const DockerFilter: Component<DockerFilterProps> = (props) => {
|
|||
{/* Metrics View Toggle */}
|
||||
<MetricsViewToggle />
|
||||
|
||||
<Show when={props.updateAvailableCount && props.updateAvailableCount > 1}>
|
||||
<div class="h-5 w-px bg-gray-200 dark:bg-gray-600 hidden sm:block" aria-hidden="true"></div>
|
||||
<UpdateAllButton
|
||||
count={props.updateAvailableCount!}
|
||||
onUpdate={props.onUpdateAll!}
|
||||
/>
|
||||
</Show>
|
||||
|
||||
<Show when={hasActiveFilters()}>
|
||||
<div class="h-5 w-px bg-gray-200 dark:bg-gray-600 hidden sm:block" aria-hidden="true"></div>
|
||||
<button
|
||||
|
|
|
|||
|
|
@ -16,6 +16,8 @@ import { DockerHostMetadataAPI, type DockerHostMetadata } from '@/api/dockerHost
|
|||
import { logger } from '@/utils/logger';
|
||||
import { STORAGE_KEYS } from '@/utils/localStorage';
|
||||
import { DEGRADED_HEALTH_STATUSES, OFFLINE_HEALTH_STATUSES } from '@/utils/status';
|
||||
import { MonitoringAPI } from '@/api/monitoring';
|
||||
import { showSuccess, showError, showToast } from '@/utils/toast';
|
||||
|
||||
type DockerMetadataRecord = Record<string, DockerMetadata>;
|
||||
type DockerHostMetadataRecord = Record<string, DockerHostMetadata>;
|
||||
|
|
@ -349,6 +351,78 @@ export const DockerHosts: Component<DockerHostsProps> = (props) => {
|
|||
setSelectedHostId((current) => (current === hostId ? null : hostId));
|
||||
};
|
||||
|
||||
const updateableContainers = createMemo(() => {
|
||||
const containers: { hostId: string; containerId: string; containerName: string }[] = [];
|
||||
sortedHosts().forEach((host) => {
|
||||
if (!hostMatchesStatus(host)) return;
|
||||
host.containers?.forEach((c) => {
|
||||
if (c.updateStatus?.updateAvailable) {
|
||||
containers.push({
|
||||
hostId: host.id,
|
||||
containerId: c.id,
|
||||
containerName: c.name || c.id,
|
||||
});
|
||||
}
|
||||
});
|
||||
});
|
||||
return containers;
|
||||
});
|
||||
|
||||
const handleUpdateAll = async () => {
|
||||
const targets = updateableContainers();
|
||||
if (targets.length === 0) return;
|
||||
|
||||
// Use a unique ID for this batch operation to update the toast
|
||||
const toastId = `batch-update-${Date.now()}`;
|
||||
|
||||
// Initial toast
|
||||
showToast({
|
||||
id: toastId,
|
||||
title: 'Batch Update Started',
|
||||
message: `Preparing to update ${targets.length} containers...`,
|
||||
tone: 'info',
|
||||
duration: 10000,
|
||||
});
|
||||
|
||||
let successCount = 0;
|
||||
let failCount = 0;
|
||||
|
||||
// Process in chunks of 5 to avoid overloading the browser/network
|
||||
const chunkSize = 5;
|
||||
for (let i = 0; i < targets.length; i += chunkSize) {
|
||||
const chunk = targets.slice(i, i + chunkSize);
|
||||
|
||||
// Update progress toast (if we had a way to update existing toasts, which we might not have in this simple util, but we can emit new ones or just rely on the final report)
|
||||
// For now, let's just process.
|
||||
|
||||
await Promise.all(chunk.map(async (target) => {
|
||||
try {
|
||||
await MonitoringAPI.updateDockerContainer(
|
||||
target.hostId,
|
||||
target.containerId,
|
||||
target.containerName,
|
||||
);
|
||||
successCount++;
|
||||
} catch (err) {
|
||||
failCount++;
|
||||
logger.error(`Failed to trigger update for ${target.containerName}`, err);
|
||||
}
|
||||
}));
|
||||
}
|
||||
|
||||
if (failCount === 0) {
|
||||
showSuccess(`Successfully queued updates for all ${targets.length} containers.`);
|
||||
} else if (successCount === 0) {
|
||||
showError(`Failed to queue any updates. Check console for details.`);
|
||||
} else {
|
||||
showToast({
|
||||
title: 'Batch Update Completed',
|
||||
message: `Queued ${successCount} updates. ${failCount} failed.`,
|
||||
tone: 'warning',
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
const renderFilter = () => (
|
||||
<DockerFilter
|
||||
search={search}
|
||||
|
|
@ -363,6 +437,8 @@ export const DockerHosts: Component<DockerHostsProps> = (props) => {
|
|||
searchInputRef={(el) => {
|
||||
searchInputRef = el;
|
||||
}}
|
||||
updateAvailableCount={updateableContainers().length}
|
||||
onUpdateAll={handleUpdateAll}
|
||||
/>
|
||||
);
|
||||
|
||||
|
|
|
|||
File diff suppressed because it is too large
Load diff
89
internal/dockeragent/agent_image_digest_test.go
Normal file
89
internal/dockeragent/agent_image_digest_test.go
Normal file
|
|
@ -0,0 +1,89 @@
|
|||
package dockeragent
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"io"
|
||||
"testing"
|
||||
|
||||
"github.com/docker/docker/api/types/image"
|
||||
"github.com/rs/zerolog"
|
||||
)
|
||||
|
||||
func TestAgent_getImageRepoDigest_Error(t *testing.T) {
|
||||
agent := &Agent{
|
||||
docker: &fakeDockerClient{
|
||||
imageInspectWithRawFn: func(ctx context.Context, imageID string) (image.InspectResponse, []byte, error) {
|
||||
return image.InspectResponse{}, nil, errors.New("inspect failed")
|
||||
},
|
||||
},
|
||||
logger: zerolog.New(io.Discard),
|
||||
}
|
||||
|
||||
if got := agent.getImageRepoDigest(context.Background(), "image-id", "nginx:latest"); got != "" {
|
||||
t.Fatalf("expected empty digest on error, got %q", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestAgent_getImageRepoDigest_NoRepoDigests(t *testing.T) {
|
||||
agent := &Agent{
|
||||
docker: &fakeDockerClient{
|
||||
imageInspectWithRawFn: func(ctx context.Context, imageID string) (image.InspectResponse, []byte, error) {
|
||||
return image.InspectResponse{RepoDigests: nil}, nil, nil
|
||||
},
|
||||
},
|
||||
logger: zerolog.New(io.Discard),
|
||||
}
|
||||
|
||||
if got := agent.getImageRepoDigest(context.Background(), "image-id", "nginx:latest"); got != "" {
|
||||
t.Fatalf("expected empty digest for no RepoDigests, got %q", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestAgent_getImageRepoDigest_Match(t *testing.T) {
|
||||
agent := &Agent{
|
||||
docker: &fakeDockerClient{
|
||||
imageInspectWithRawFn: func(ctx context.Context, imageID string) (image.InspectResponse, []byte, error) {
|
||||
return image.InspectResponse{RepoDigests: []string{"docker.io/library/nginx@sha256:abc"}}, nil, nil
|
||||
},
|
||||
},
|
||||
logger: zerolog.New(io.Discard),
|
||||
}
|
||||
|
||||
if got := agent.getImageRepoDigest(context.Background(), "image-id", "nginx:latest"); got != "sha256:abc" {
|
||||
t.Fatalf("expected matching digest, got %q", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestAgent_getImageRepoDigest_FallbackToFirst(t *testing.T) {
|
||||
agent := &Agent{
|
||||
docker: &fakeDockerClient{
|
||||
imageInspectWithRawFn: func(ctx context.Context, imageID string) (image.InspectResponse, []byte, error) {
|
||||
return image.InspectResponse{RepoDigests: []string{
|
||||
"docker.io/library/redis@sha256:first",
|
||||
"docker.io/library/nginx@sha256:second",
|
||||
}}, nil, nil
|
||||
},
|
||||
},
|
||||
logger: zerolog.New(io.Discard),
|
||||
}
|
||||
|
||||
if got := agent.getImageRepoDigest(context.Background(), "image-id", "custom:latest"); got != "sha256:first" {
|
||||
t.Fatalf("expected fallback digest, got %q", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestAgent_getImageRepoDigest_InvalidRepoDigest(t *testing.T) {
|
||||
agent := &Agent{
|
||||
docker: &fakeDockerClient{
|
||||
imageInspectWithRawFn: func(ctx context.Context, imageID string) (image.InspectResponse, []byte, error) {
|
||||
return image.InspectResponse{RepoDigests: []string{"invalid-digest"}}, nil, nil
|
||||
},
|
||||
},
|
||||
logger: zerolog.New(io.Discard),
|
||||
}
|
||||
|
||||
if got := agent.getImageRepoDigest(context.Background(), "image-id", "nginx:latest"); got != "" {
|
||||
t.Fatalf("expected empty digest for invalid repo digest, got %q", got)
|
||||
}
|
||||
}
|
||||
53
internal/dockeragent/cleanup.go
Normal file
53
internal/dockeragent/cleanup.go
Normal file
|
|
@ -0,0 +1,53 @@
|
|||
package dockeragent
|
||||
|
||||
import (
|
||||
"context"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/docker/docker/api/types/container"
|
||||
)
|
||||
|
||||
// cleanupOrphanedBackups searches for and removes any Pulse backup containers
|
||||
// (created during updates) that are older than 1 hour.
|
||||
func (a *Agent) cleanupOrphanedBackups(ctx context.Context) {
|
||||
a.logger.Debug().Msg("Checking for orphaned backup containers")
|
||||
|
||||
// List all containers (including stopped ones)
|
||||
list, err := a.docker.ContainerList(ctx, container.ListOptions{
|
||||
All: true,
|
||||
})
|
||||
if err != nil {
|
||||
a.logger.Warn().Err(err).Msg("Failed to list containers for cleanup")
|
||||
return
|
||||
}
|
||||
|
||||
for _, c := range list {
|
||||
// Check if it's a backup container
|
||||
// Name format: originalName + "_pulse_backup_" + timestamp
|
||||
isBackup := false
|
||||
for _, name := range c.Names {
|
||||
if strings.Contains(name, "_pulse_backup_") {
|
||||
isBackup = true
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if !isBackup {
|
||||
continue
|
||||
}
|
||||
|
||||
// Check age
|
||||
created := time.Unix(c.Created, 0)
|
||||
if time.Since(created) > 1*time.Hour {
|
||||
a.logger.Info().
|
||||
Str("container", c.Names[0]).
|
||||
Time("created", created).
|
||||
Msg("Removing orphaned backup container")
|
||||
|
||||
if err := a.docker.ContainerRemove(ctx, c.ID, container.RemoveOptions{Force: true}); err != nil {
|
||||
a.logger.Warn().Err(err).Str("id", c.ID).Msg("Failed to remove orphaned backup container")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
802
internal/dockeragent/collect.go
Normal file
802
internal/dockeragent/collect.go
Normal file
|
|
@ -0,0 +1,802 @@
|
|||
package dockeragent
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/rand"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"math"
|
||||
"math/big"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
containertypes "github.com/docker/docker/api/types/container"
|
||||
"github.com/docker/docker/api/types/filters"
|
||||
agentsdocker "github.com/rcourtman/pulse-go-rewrite/pkg/agents/docker"
|
||||
)
|
||||
|
||||
// buildReport gathers all system and container metrics into a single report
|
||||
func (a *Agent) buildReport(ctx context.Context) (agentsdocker.Report, error) {
|
||||
info, err := a.docker.Info(ctx)
|
||||
if err != nil {
|
||||
return agentsdocker.Report{}, fmt.Errorf("failed to query docker info: %w", err)
|
||||
}
|
||||
|
||||
a.runtimeVer = info.ServerVersion
|
||||
if a.daemonHost == "" {
|
||||
a.daemonHost = a.docker.DaemonHost()
|
||||
}
|
||||
|
||||
newRuntime := detectRuntime(info, a.daemonHost, RuntimeAuto)
|
||||
if newRuntime != a.runtime {
|
||||
if a.runtime != "" {
|
||||
a.logger.Info().
|
||||
Str("runtime_previous", string(a.runtime)).
|
||||
Str("runtime_current", string(newRuntime)).
|
||||
Msg("Detected container runtime change")
|
||||
}
|
||||
a.runtime = newRuntime
|
||||
a.supportsSwarm = newRuntime == RuntimeDocker
|
||||
if newRuntime == RuntimePodman {
|
||||
if a.cfg.IncludeServices {
|
||||
a.logger.Warn().Msg("Podman runtime detected during report; disabling Swarm service collection")
|
||||
}
|
||||
if a.cfg.IncludeTasks {
|
||||
a.logger.Warn().Msg("Podman runtime detected during report; disabling Swarm task collection")
|
||||
}
|
||||
a.cfg.IncludeServices = false
|
||||
a.cfg.IncludeTasks = false
|
||||
}
|
||||
a.cfg.Runtime = string(newRuntime)
|
||||
}
|
||||
|
||||
a.cpuCount = info.NCPU
|
||||
|
||||
agentID := a.cfg.AgentID
|
||||
if agentID == "" {
|
||||
// Use cached daemon ID from init rather than info.ID from current call.
|
||||
// Podman can return different/empty IDs across calls, causing token
|
||||
// binding conflicts on the server.
|
||||
agentID = a.daemonID
|
||||
}
|
||||
if agentID == "" {
|
||||
agentID = a.machineID
|
||||
}
|
||||
if agentID == "" {
|
||||
agentID = a.hostName
|
||||
}
|
||||
a.hostID = agentID
|
||||
|
||||
hostName := a.hostName
|
||||
if hostName == "" {
|
||||
hostName = info.Name
|
||||
}
|
||||
|
||||
uptime := readSystemUptime()
|
||||
|
||||
metricsCtx, metricsCancel := context.WithTimeout(ctx, 10*time.Second)
|
||||
snapshot, err := hostmetricsCollect(metricsCtx, nil)
|
||||
metricsCancel()
|
||||
if err != nil {
|
||||
return agentsdocker.Report{}, fmt.Errorf("collect host metrics: %w", err)
|
||||
}
|
||||
|
||||
collectContainers := a.cfg.IncludeContainers
|
||||
if !collectContainers && (a.cfg.IncludeServices || a.cfg.IncludeTasks) && !info.Swarm.ControlAvailable {
|
||||
collectContainers = true
|
||||
}
|
||||
|
||||
var containers []agentsdocker.Container
|
||||
if collectContainers {
|
||||
var err error
|
||||
containers, err = a.collectContainers(ctx)
|
||||
if err != nil {
|
||||
return agentsdocker.Report{}, err
|
||||
}
|
||||
}
|
||||
|
||||
services, tasks, swarmInfo := a.collectSwarmData(ctx, info, containers)
|
||||
|
||||
report := agentsdocker.Report{
|
||||
Agent: agentsdocker.AgentInfo{
|
||||
ID: agentID,
|
||||
Version: a.agentVersion,
|
||||
Type: a.cfg.AgentType,
|
||||
IntervalSeconds: int(a.cfg.Interval / time.Second),
|
||||
},
|
||||
Host: agentsdocker.HostInfo{
|
||||
Hostname: hostName,
|
||||
Name: info.Name,
|
||||
MachineID: a.machineID,
|
||||
OS: info.OperatingSystem,
|
||||
Runtime: string(a.runtime),
|
||||
RuntimeVersion: a.runtimeVer,
|
||||
KernelVersion: info.KernelVersion,
|
||||
Architecture: info.Architecture,
|
||||
DockerVersion: info.ServerVersion,
|
||||
TotalCPU: info.NCPU,
|
||||
TotalMemoryBytes: info.MemTotal,
|
||||
UptimeSeconds: uptime,
|
||||
CPUUsagePercent: safeFloat(snapshot.CPUUsagePercent),
|
||||
LoadAverage: append([]float64(nil), snapshot.LoadAverage...),
|
||||
Memory: snapshot.Memory,
|
||||
Disks: append([]agentsdocker.Disk(nil), snapshot.Disks...),
|
||||
Network: append([]agentsdocker.NetworkInterface(nil), snapshot.Network...),
|
||||
},
|
||||
Timestamp: time.Now().UTC(),
|
||||
}
|
||||
|
||||
if swarmInfo != nil {
|
||||
report.Host.Swarm = swarmInfo
|
||||
}
|
||||
|
||||
if a.cfg.IncludeContainers {
|
||||
report.Containers = containers
|
||||
}
|
||||
if a.cfg.IncludeServices && len(services) > 0 {
|
||||
report.Services = services
|
||||
}
|
||||
if a.cfg.IncludeTasks && len(tasks) > 0 {
|
||||
report.Tasks = tasks
|
||||
}
|
||||
|
||||
if report.Agent.IntervalSeconds <= 0 {
|
||||
report.Agent.IntervalSeconds = int(30 * time.Second / time.Second)
|
||||
}
|
||||
|
||||
return report, nil
|
||||
}
|
||||
|
||||
func (a *Agent) collectContainers(ctx context.Context) ([]agentsdocker.Container, error) {
|
||||
options := containertypes.ListOptions{All: true}
|
||||
if len(a.stateFilters) > 0 {
|
||||
filterArgs := filters.NewArgs()
|
||||
for _, state := range a.stateFilters {
|
||||
filterArgs.Add("status", state)
|
||||
}
|
||||
options.Filters = filterArgs
|
||||
}
|
||||
|
||||
list, err := a.docker.ContainerList(ctx, options)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to list containers: %w", err)
|
||||
}
|
||||
|
||||
containers := make([]agentsdocker.Container, 0, len(list))
|
||||
active := make(map[string]struct{}, len(list))
|
||||
for _, summary := range list {
|
||||
if len(a.allowedStates) > 0 {
|
||||
if _, ok := a.allowedStates[strings.ToLower(summary.State)]; !ok {
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
active[summary.ID] = struct{}{}
|
||||
|
||||
container, err := a.collectContainer(ctx, summary)
|
||||
if err != nil {
|
||||
a.logger.Warn().Str("container", strings.Join(summary.Names, ",")).Err(err).Msg("Failed to collect container stats")
|
||||
continue
|
||||
}
|
||||
containers = append(containers, container)
|
||||
}
|
||||
a.pruneStaleCPUSamples(active)
|
||||
return containers, nil
|
||||
}
|
||||
|
||||
func (a *Agent) pruneStaleCPUSamples(active map[string]struct{}) {
|
||||
if len(a.prevContainerCPU) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
for id := range a.prevContainerCPU {
|
||||
if _, ok := active[id]; !ok {
|
||||
delete(a.prevContainerCPU, id)
|
||||
// Reset stats failure counter when containers are removed,
|
||||
// though it's global per agent so not strictly necessary but good hygiene
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (a *Agent) collectContainer(ctx context.Context, summary containertypes.Summary) (agentsdocker.Container, error) {
|
||||
const perContainerTimeout = 15 * time.Second
|
||||
|
||||
containerCtx, cancel := context.WithTimeout(ctx, perContainerTimeout)
|
||||
defer cancel()
|
||||
|
||||
requestSize := a.cfg.CollectDiskMetrics
|
||||
inspect, _, err := a.docker.ContainerInspectWithRaw(containerCtx, summary.ID, requestSize)
|
||||
if err != nil {
|
||||
return agentsdocker.Container{}, fmt.Errorf("inspect: %w", err)
|
||||
}
|
||||
|
||||
var (
|
||||
cpuPercent float64
|
||||
memUsage int64
|
||||
memLimit int64
|
||||
memPercent float64
|
||||
blockIO *agentsdocker.ContainerBlockIO
|
||||
)
|
||||
|
||||
if inspect.State.Running || inspect.State.Paused {
|
||||
statsResp, err := a.docker.ContainerStatsOneShot(containerCtx, summary.ID)
|
||||
if err != nil {
|
||||
return agentsdocker.Container{}, fmt.Errorf("stats: %w", err)
|
||||
}
|
||||
defer statsResp.Body.Close()
|
||||
|
||||
var stats containertypes.StatsResponse
|
||||
if err := json.NewDecoder(statsResp.Body).Decode(&stats); err != nil {
|
||||
return agentsdocker.Container{}, fmt.Errorf("decode stats: %w", err)
|
||||
}
|
||||
|
||||
cpuPercent = a.calculateContainerCPUPercent(summary.ID, stats)
|
||||
memUsage, memLimit, memPercent = calculateMemoryUsage(stats)
|
||||
blockIO = summarizeBlockIO(stats)
|
||||
} else {
|
||||
delete(a.prevContainerCPU, summary.ID)
|
||||
}
|
||||
|
||||
createdAt := time.Unix(summary.Created, 0)
|
||||
|
||||
startedAt := parseTime(inspect.State.StartedAt)
|
||||
finishedAt := parseTime(inspect.State.FinishedAt)
|
||||
|
||||
uptimeSeconds := int64(0)
|
||||
if !startedAt.IsZero() && inspect.State.Running {
|
||||
uptimeSeconds = int64(time.Since(startedAt).Seconds())
|
||||
if uptimeSeconds < 0 {
|
||||
uptimeSeconds = 0
|
||||
}
|
||||
}
|
||||
|
||||
health := ""
|
||||
if inspect.State.Health != nil {
|
||||
health = inspect.State.Health.Status
|
||||
}
|
||||
|
||||
ports := make([]agentsdocker.ContainerPort, len(summary.Ports))
|
||||
for i, port := range summary.Ports {
|
||||
ports[i] = agentsdocker.ContainerPort{
|
||||
PrivatePort: int(port.PrivatePort),
|
||||
PublicPort: int(port.PublicPort),
|
||||
Protocol: port.Type,
|
||||
IP: port.IP,
|
||||
}
|
||||
}
|
||||
|
||||
labels := make(map[string]string, len(summary.Labels))
|
||||
for k, v := range summary.Labels {
|
||||
labels[k] = v
|
||||
}
|
||||
|
||||
networks := make([]agentsdocker.ContainerNetwork, 0)
|
||||
if inspect.NetworkSettings != nil {
|
||||
for name, cfg := range inspect.NetworkSettings.Networks {
|
||||
networks = append(networks, agentsdocker.ContainerNetwork{
|
||||
Name: name,
|
||||
IPv4: cfg.IPAddress,
|
||||
IPv6: cfg.GlobalIPv6Address,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
var startedPtr, finishedPtr *time.Time
|
||||
if !startedAt.IsZero() {
|
||||
started := startedAt
|
||||
startedPtr = &started
|
||||
}
|
||||
if !finishedAt.IsZero() && !inspect.State.Running {
|
||||
finished := finishedAt
|
||||
finishedPtr = &finished
|
||||
}
|
||||
|
||||
var writableLayerBytes int64
|
||||
if inspect.SizeRw != nil {
|
||||
writableLayerBytes = *inspect.SizeRw
|
||||
}
|
||||
|
||||
var rootFsBytes int64
|
||||
if inspect.SizeRootFs != nil {
|
||||
rootFsBytes = *inspect.SizeRootFs
|
||||
}
|
||||
|
||||
var mounts []agentsdocker.ContainerMount
|
||||
if len(inspect.Mounts) > 0 {
|
||||
mounts = make([]agentsdocker.ContainerMount, 0, len(inspect.Mounts))
|
||||
for _, mount := range inspect.Mounts {
|
||||
mounts = append(mounts, agentsdocker.ContainerMount{
|
||||
Type: string(mount.Type),
|
||||
Source: mount.Source,
|
||||
Destination: mount.Destination,
|
||||
Mode: mount.Mode,
|
||||
RW: mount.RW,
|
||||
Propagation: string(mount.Propagation),
|
||||
Name: mount.Name,
|
||||
Driver: mount.Driver,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
container := agentsdocker.Container{
|
||||
ID: summary.ID,
|
||||
Name: trimLeadingSlash(summary.Names),
|
||||
Image: summary.Image,
|
||||
ImageDigest: summary.ImageID, // sha256:... digest of the image
|
||||
CreatedAt: createdAt,
|
||||
State: summary.State,
|
||||
Status: summary.Status,
|
||||
Health: health,
|
||||
CPUPercent: cpuPercent,
|
||||
MemoryUsageBytes: memUsage,
|
||||
MemoryLimitBytes: memLimit,
|
||||
MemoryPercent: memPercent,
|
||||
UptimeSeconds: uptimeSeconds,
|
||||
RestartCount: inspect.RestartCount,
|
||||
ExitCode: inspect.State.ExitCode,
|
||||
StartedAt: startedPtr,
|
||||
FinishedAt: finishedPtr,
|
||||
Ports: ports,
|
||||
Labels: labels,
|
||||
Env: maskSensitiveEnvVars(inspect.Config.Env),
|
||||
Networks: networks,
|
||||
WritableLayerBytes: writableLayerBytes,
|
||||
RootFilesystemBytes: rootFsBytes,
|
||||
BlockIO: blockIO,
|
||||
Mounts: mounts,
|
||||
}
|
||||
|
||||
if a.runtime == RuntimePodman {
|
||||
if meta := extractPodmanMetadata(labels); meta != nil {
|
||||
container.Podman = meta
|
||||
}
|
||||
}
|
||||
|
||||
// Check for image updates if registry checker is enabled
|
||||
if a.registryChecker != nil && a.registryChecker.Enabled() {
|
||||
// Get the actual manifest digest (RepoDigest) from the image for accurate comparison.
|
||||
// The ImageID is a local content-addressable ID that differs from the registry manifest digest.
|
||||
digestForComparison := a.getImageRepoDigest(containerCtx, summary.ImageID, summary.Image)
|
||||
if digestForComparison == "" {
|
||||
// Fall back to ImageID if we can't get RepoDigest (shouldn't compare as equal)
|
||||
digestForComparison = summary.ImageID
|
||||
}
|
||||
result := a.registryChecker.CheckImageUpdate(ctx, container.Image, digestForComparison)
|
||||
if result != nil {
|
||||
container.UpdateStatus = &agentsdocker.UpdateStatus{
|
||||
UpdateAvailable: result.UpdateAvailable,
|
||||
CurrentDigest: result.CurrentDigest,
|
||||
LatestDigest: result.LatestDigest,
|
||||
LastChecked: result.CheckedAt,
|
||||
Error: result.Error,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if requestSize {
|
||||
a.logger.Debug().
|
||||
Str("container", container.Name).
|
||||
Int64("writableLayerBytes", writableLayerBytes).
|
||||
Int64("rootFilesystemBytes", rootFsBytes).
|
||||
Int("mountCount", len(mounts)).
|
||||
Msg("Collected container disk metrics")
|
||||
}
|
||||
|
||||
return container, nil
|
||||
}
|
||||
|
||||
// getImageRepoDigest retrieves the RepoDigest for an image, which is the actual
|
||||
// manifest digest from the registry. This is necessary because Docker's ImageID
|
||||
// is a local content-addressable hash that differs from the registry manifest digest.
|
||||
// For multi-arch images, the registry returns a manifest list digest, while Docker
|
||||
// stores the platform-specific image config digest locally.
|
||||
func (a *Agent) getImageRepoDigest(ctx context.Context, imageID, imageName string) string {
|
||||
imageInspect, _, err := a.docker.ImageInspectWithRaw(ctx, imageID)
|
||||
if err != nil {
|
||||
a.logger.Debug().
|
||||
Err(err).
|
||||
Str("imageID", imageID).
|
||||
Str("imageName", imageName).
|
||||
Msg("Failed to inspect image for RepoDigest")
|
||||
return ""
|
||||
}
|
||||
|
||||
if len(imageInspect.RepoDigests) == 0 {
|
||||
// Locally built images won't have RepoDigests
|
||||
return ""
|
||||
}
|
||||
|
||||
// Try to find a RepoDigest that matches the image reference
|
||||
// RepoDigests format: "registry/repo@sha256:..."
|
||||
for _, repoDigest := range imageInspect.RepoDigests {
|
||||
// Extract just the digest part (after @)
|
||||
if idx := strings.LastIndex(repoDigest, "@"); idx >= 0 {
|
||||
repoRef := repoDigest[:idx] // e.g., "docker.io/library/nginx"
|
||||
digest := repoDigest[idx+1:] // e.g., "sha256:abc..."
|
||||
|
||||
// Check if this RepoDigest matches our image reference
|
||||
// Normalize both for comparison
|
||||
if matchesImageReference(imageName, repoRef) {
|
||||
return digest
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// If no exact match, return the first RepoDigest's digest
|
||||
// This handles cases where the image was pulled with a different tag
|
||||
if idx := strings.LastIndex(imageInspect.RepoDigests[0], "@"); idx >= 0 {
|
||||
return imageInspect.RepoDigests[0][idx+1:]
|
||||
}
|
||||
|
||||
return ""
|
||||
}
|
||||
|
||||
// matchesImageReference checks if a RepoDigest repository matches an image reference.
|
||||
// It handles Docker Hub's various naming conventions.
|
||||
func matchesImageReference(imageName, repoRef string) bool {
|
||||
// Normalize image name by removing tag
|
||||
if idx := strings.LastIndex(imageName, ":"); idx >= 0 {
|
||||
// Make sure it's a tag, not a port (check if there's a / after it)
|
||||
if !strings.Contains(imageName[idx:], "/") {
|
||||
imageName = imageName[:idx]
|
||||
}
|
||||
}
|
||||
|
||||
// Direct match
|
||||
if imageName == repoRef {
|
||||
return true
|
||||
}
|
||||
|
||||
// Docker Hub library images: "nginx" == "docker.io/library/nginx"
|
||||
if repoRef == "docker.io/library/"+imageName {
|
||||
return true
|
||||
}
|
||||
|
||||
// Docker Hub with namespace: "myuser/myapp" == "docker.io/myuser/myapp"
|
||||
if repoRef == "docker.io/"+imageName {
|
||||
return true
|
||||
}
|
||||
|
||||
// Registry prefix matching (e.g., "ghcr.io/user/repo" matches "ghcr.io/user/repo")
|
||||
// Already handled by direct match above
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
func extractPodmanMetadata(labels map[string]string) *agentsdocker.PodmanContainer {
|
||||
if len(labels) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
meta := &agentsdocker.PodmanContainer{}
|
||||
|
||||
if v := strings.TrimSpace(labels["io.podman.annotations.pod.name"]); v != "" {
|
||||
meta.PodName = v
|
||||
}
|
||||
|
||||
if v := strings.TrimSpace(labels["io.podman.annotations.pod.id"]); v != "" {
|
||||
meta.PodID = v
|
||||
}
|
||||
|
||||
if v := strings.TrimSpace(labels["io.podman.annotations.pod.infra"]); v != "" {
|
||||
if parsed, err := strconv.ParseBool(v); err == nil {
|
||||
meta.Infra = parsed
|
||||
} else if strings.EqualFold(v, "yes") || strings.EqualFold(v, "true") {
|
||||
meta.Infra = true
|
||||
}
|
||||
}
|
||||
|
||||
if v := strings.TrimSpace(labels["io.podman.compose.project"]); v != "" {
|
||||
meta.ComposeProject = v
|
||||
}
|
||||
|
||||
if v := strings.TrimSpace(labels["io.podman.compose.service"]); v != "" {
|
||||
meta.ComposeService = v
|
||||
}
|
||||
|
||||
if v := strings.TrimSpace(labels["io.podman.compose.working_dir"]); v != "" {
|
||||
meta.ComposeWorkdir = v
|
||||
}
|
||||
|
||||
if v := strings.TrimSpace(labels["io.podman.compose.config-hash"]); v != "" {
|
||||
meta.ComposeConfig = v
|
||||
}
|
||||
|
||||
if v := strings.TrimSpace(labels["io.containers.autoupdate"]); v != "" {
|
||||
meta.AutoUpdatePolicy = v
|
||||
}
|
||||
|
||||
if v := strings.TrimSpace(labels["io.containers.autoupdate.restart"]); v != "" {
|
||||
meta.AutoUpdateRestart = v
|
||||
}
|
||||
|
||||
if v := strings.TrimSpace(labels["io.podman.annotations.userns"]); v != "" {
|
||||
meta.UserNS = v
|
||||
} else if v := strings.TrimSpace(labels["io.containers.userns"]); v != "" {
|
||||
meta.UserNS = v
|
||||
}
|
||||
|
||||
if meta.PodName == "" && meta.PodID == "" && meta.ComposeProject == "" && meta.AutoUpdatePolicy == "" && meta.UserNS == "" && !meta.Infra {
|
||||
return nil
|
||||
}
|
||||
|
||||
return meta
|
||||
}
|
||||
|
||||
func (a *Agent) calculateContainerCPUPercent(id string, stats containertypes.StatsResponse) float64 {
|
||||
current := cpuSample{
|
||||
totalUsage: stats.CPUStats.CPUUsage.TotalUsage,
|
||||
systemUsage: stats.CPUStats.SystemUsage,
|
||||
onlineCPUs: stats.CPUStats.OnlineCPUs,
|
||||
read: stats.Read,
|
||||
}
|
||||
|
||||
// Try to use PreCPUStats if available
|
||||
percent := calculateCPUPercent(stats, a.cpuCount)
|
||||
if percent > 0 {
|
||||
a.prevContainerCPU[id] = current
|
||||
a.logger.Debug().
|
||||
Str("container_id", id[:12]).
|
||||
Float64("cpu_percent", percent).
|
||||
Msg("CPU calculated from PreCPUStats")
|
||||
return percent
|
||||
}
|
||||
|
||||
// PreCPUStats not available or invalid, use manual tracking
|
||||
a.preCPUStatsFailures++
|
||||
if a.preCPUStatsFailures == 10 {
|
||||
a.logger.Warn().
|
||||
Str("runtime", string(a.runtime)).
|
||||
Msg("PreCPUStats consistently unavailable from Docker API - using manual CPU tracking (this is normal for one-shot stats)")
|
||||
}
|
||||
prev, ok := a.prevContainerCPU[id]
|
||||
if !ok {
|
||||
// First time seeing this container - store current sample and return 0
|
||||
// On next collection cycle we'll have a previous sample to compare against
|
||||
a.prevContainerCPU[id] = current
|
||||
a.logger.Debug().
|
||||
Str("container_id", id[:12]).
|
||||
Uint64("total_usage", current.totalUsage).
|
||||
Uint64("system_usage", current.systemUsage).
|
||||
Msg("First CPU sample collected, no previous data for delta calculation")
|
||||
return 0
|
||||
}
|
||||
|
||||
// We have a previous sample - update it after calculation
|
||||
a.prevContainerCPU[id] = current
|
||||
|
||||
var totalDelta float64
|
||||
if current.totalUsage >= prev.totalUsage {
|
||||
totalDelta = float64(current.totalUsage - prev.totalUsage)
|
||||
} else {
|
||||
// Counter likely reset (container restart); fall back to current reading.
|
||||
totalDelta = float64(current.totalUsage)
|
||||
}
|
||||
|
||||
if totalDelta <= 0 {
|
||||
return 0
|
||||
}
|
||||
|
||||
onlineCPUs := current.onlineCPUs
|
||||
if onlineCPUs == 0 {
|
||||
onlineCPUs = prev.onlineCPUs
|
||||
}
|
||||
if onlineCPUs == 0 && a.cpuCount > 0 {
|
||||
onlineCPUs = uint32(a.cpuCount)
|
||||
}
|
||||
if onlineCPUs == 0 {
|
||||
return 0
|
||||
}
|
||||
|
||||
var systemDelta float64
|
||||
if current.systemUsage >= prev.systemUsage {
|
||||
systemDelta = float64(current.systemUsage - prev.systemUsage)
|
||||
}
|
||||
// If systemUsage went backward (counter reset), leave systemDelta as 0
|
||||
// to fall through to time-based calculation below
|
||||
|
||||
if systemDelta > 0 {
|
||||
cpuPercent := safeFloat((totalDelta / systemDelta) * float64(onlineCPUs) * 100.0)
|
||||
a.logger.Debug().
|
||||
Str("container_id", id[:12]).
|
||||
Float64("cpu_percent", cpuPercent).
|
||||
Float64("total_delta", totalDelta).
|
||||
Float64("system_delta", systemDelta).
|
||||
Uint32("online_cpus", onlineCPUs).
|
||||
Msg("CPU calculated from system delta")
|
||||
return cpuPercent
|
||||
}
|
||||
|
||||
// Fall back to time-based calculation
|
||||
if !prev.read.IsZero() && !current.read.IsZero() {
|
||||
elapsed := current.read.Sub(prev.read).Seconds()
|
||||
if elapsed > 0 {
|
||||
denominator := elapsed * float64(onlineCPUs) * 1e9
|
||||
if denominator > 0 {
|
||||
cpuPercent := (totalDelta / denominator) * 100.0
|
||||
result := safeFloat(cpuPercent)
|
||||
a.logger.Debug().
|
||||
Str("container_id", id[:12]).
|
||||
Float64("cpu_percent", result).
|
||||
Float64("total_delta", totalDelta).
|
||||
Float64("elapsed_seconds", elapsed).
|
||||
Uint32("online_cpus", onlineCPUs).
|
||||
Msg("CPU calculated from time-based delta")
|
||||
return result
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
a.logger.Debug().
|
||||
Str("container_id", id[:12]).
|
||||
Float64("total_delta", totalDelta).
|
||||
Float64("system_delta", systemDelta).
|
||||
Bool("prev_read_zero", prev.read.IsZero()).
|
||||
Bool("current_read_zero", current.read.IsZero()).
|
||||
Msg("CPU calculation failed: no valid delta method available")
|
||||
return 0
|
||||
}
|
||||
|
||||
func calculateCPUPercent(stats containertypes.StatsResponse, hostCPUs int) float64 {
|
||||
totalDelta := float64(stats.CPUStats.CPUUsage.TotalUsage - stats.PreCPUStats.CPUUsage.TotalUsage)
|
||||
systemDelta := float64(stats.CPUStats.SystemUsage - stats.PreCPUStats.SystemUsage)
|
||||
|
||||
if totalDelta <= 0 || systemDelta <= 0 {
|
||||
return 0
|
||||
}
|
||||
|
||||
onlineCPUs := stats.CPUStats.OnlineCPUs
|
||||
if onlineCPUs == 0 {
|
||||
onlineCPUs = uint32(len(stats.CPUStats.CPUUsage.PercpuUsage))
|
||||
}
|
||||
if onlineCPUs == 0 && hostCPUs > 0 {
|
||||
onlineCPUs = uint32(hostCPUs)
|
||||
}
|
||||
|
||||
if onlineCPUs == 0 {
|
||||
return 0
|
||||
}
|
||||
|
||||
return safeFloat((totalDelta / systemDelta) * float64(onlineCPUs) * 100.0)
|
||||
}
|
||||
|
||||
func calculateMemoryUsage(stats containertypes.StatsResponse) (usage int64, limit int64, percent float64) {
|
||||
usage = int64(stats.MemoryStats.Usage)
|
||||
if cache, ok := stats.MemoryStats.Stats["cache"]; ok {
|
||||
usage -= int64(cache)
|
||||
}
|
||||
if usage < 0 {
|
||||
usage = int64(stats.MemoryStats.Usage)
|
||||
}
|
||||
|
||||
limit = int64(stats.MemoryStats.Limit)
|
||||
if limit > 0 {
|
||||
percent = (float64(usage) / float64(limit)) * 100.0
|
||||
}
|
||||
|
||||
return usage, limit, safeFloat(percent)
|
||||
}
|
||||
|
||||
func safeFloat(val float64) float64 {
|
||||
if math.IsNaN(val) || math.IsInf(val, 0) {
|
||||
return 0
|
||||
}
|
||||
return val
|
||||
}
|
||||
|
||||
func parseTime(value string) time.Time {
|
||||
if value == "" || value == "0001-01-01T00:00:00Z" {
|
||||
return time.Time{}
|
||||
}
|
||||
if strings.Contains(value, ".") {
|
||||
if t, err := time.Parse(time.RFC3339Nano, value); err == nil {
|
||||
return t
|
||||
}
|
||||
} else {
|
||||
if t, err := time.Parse(time.RFC3339, value); err == nil {
|
||||
return t
|
||||
}
|
||||
}
|
||||
return time.Time{}
|
||||
}
|
||||
|
||||
func trimLeadingSlash(names []string) string {
|
||||
if len(names) == 0 {
|
||||
return ""
|
||||
}
|
||||
name := names[0]
|
||||
return strings.TrimPrefix(name, "/")
|
||||
}
|
||||
|
||||
func randomDuration(max time.Duration) time.Duration {
|
||||
if max <= 0 {
|
||||
return 0
|
||||
}
|
||||
|
||||
n, err := randIntFn(rand.Reader, big.NewInt(int64(max)))
|
||||
if err != nil {
|
||||
return 0
|
||||
}
|
||||
|
||||
return time.Duration(n.Int64())
|
||||
}
|
||||
|
||||
func summarizeBlockIO(stats containertypes.StatsResponse) *agentsdocker.ContainerBlockIO {
|
||||
// BlkioStats structure varies by cgroup version
|
||||
// Cgroup v1: IoServiceBytesRecursive []BlkioStatEntry
|
||||
// Cgroup v2: IoServiceBytesRecursive is empty? No, Docker maps it?
|
||||
// Docker API guarantees IoServiceBytesRecursive is populated?
|
||||
// It seems to try to handle both.
|
||||
|
||||
if len(stats.BlkioStats.IoServiceBytesRecursive) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
var readBytes, writeBytes uint64
|
||||
|
||||
for _, entry := range stats.BlkioStats.IoServiceBytesRecursive {
|
||||
op := strings.ToLower(entry.Op)
|
||||
switch op {
|
||||
case "read":
|
||||
readBytes += entry.Value
|
||||
case "write":
|
||||
writeBytes += entry.Value
|
||||
}
|
||||
}
|
||||
|
||||
if readBytes == 0 && writeBytes == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
return &agentsdocker.ContainerBlockIO{
|
||||
ReadBytes: readBytes,
|
||||
WriteBytes: writeBytes,
|
||||
}
|
||||
}
|
||||
|
||||
// sensitiveEnvPatterns are substrings that, when found in an env var name (case-insensitive),
|
||||
// indicate the value should be masked for security.
|
||||
var sensitiveEnvPatterns = []string{
|
||||
"password", "passwd", "secret", "key", "token", "credential", "auth",
|
||||
"api_key", "apikey", "private", "access_token", "refresh_token",
|
||||
"database_url", "connection_string", "encryption",
|
||||
}
|
||||
|
||||
// maskSensitiveEnvVars returns a copy of the environment variables with sensitive values masked.
|
||||
// Environment variables whose names contain sensitive keywords will have their values replaced with "***".
|
||||
func maskSensitiveEnvVars(envVars []string) []string {
|
||||
if len(envVars) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
result := make([]string, 0, len(envVars))
|
||||
for _, env := range envVars {
|
||||
parts := strings.SplitN(env, "=", 2)
|
||||
if len(parts) != 2 {
|
||||
result = append(result, env)
|
||||
continue
|
||||
}
|
||||
|
||||
name := parts[0]
|
||||
value := parts[1]
|
||||
|
||||
// Check if the environment variable name contains a sensitive pattern
|
||||
lowerName := strings.ToLower(name)
|
||||
isSensitive := false
|
||||
for _, pattern := range sensitiveEnvPatterns {
|
||||
if strings.Contains(lowerName, pattern) {
|
||||
isSensitive = true
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if isSensitive && value != "" {
|
||||
result = append(result, name+"=***")
|
||||
} else {
|
||||
result = append(result, env)
|
||||
}
|
||||
}
|
||||
|
||||
return result
|
||||
}
|
||||
|
|
@ -209,14 +209,47 @@ func (a *Agent) updateContainer(ctx context.Context, containerID string) Contain
|
|||
return result
|
||||
}
|
||||
|
||||
a.logger.Info().Str("container", result.ContainerName).Msg("New container started successfully")
|
||||
a.logger.Info().Str("container", result.ContainerName).Msg("New container started, verifying stability...")
|
||||
|
||||
// 9. Get the new image digest
|
||||
newInspect, err := a.docker.ContainerInspect(ctx, newContainerID)
|
||||
if err == nil {
|
||||
result.NewImageDigest = newInspect.Image
|
||||
// 9. Verify container stability
|
||||
// Wait a few seconds to ensure it doesn't crash immediately
|
||||
sleepFn(5 * time.Second)
|
||||
|
||||
verifyInspect, err := a.docker.ContainerInspect(ctx, newContainerID)
|
||||
if err != nil {
|
||||
result.Error = fmt.Sprintf("Failed to inspect new container during verification: %v", err)
|
||||
a.logger.Error().Err(err).Str("container", result.ContainerName).Msg("Failed to verify container stability")
|
||||
// Rollback
|
||||
_ = a.docker.ContainerRemove(ctx, newContainerID, container.RemoveOptions{Force: true})
|
||||
_ = a.docker.ContainerRename(ctx, backupName, result.ContainerName)
|
||||
_ = a.docker.ContainerStart(ctx, containerID, container.StartOptions{})
|
||||
return result
|
||||
}
|
||||
|
||||
// Check if running
|
||||
if !verifyInspect.State.Running {
|
||||
result.Error = fmt.Sprintf("New container crashed immediately (exit code %d): %s", verifyInspect.State.ExitCode, verifyInspect.State.Error)
|
||||
a.logger.Error().Str("container", result.ContainerName).Int("exitCode", verifyInspect.State.ExitCode).Msg("New container crashed, rolling back")
|
||||
// Rollback
|
||||
_ = a.docker.ContainerRemove(ctx, newContainerID, container.RemoveOptions{Force: true})
|
||||
_ = a.docker.ContainerRename(ctx, backupName, result.ContainerName)
|
||||
_ = a.docker.ContainerStart(ctx, containerID, container.StartOptions{})
|
||||
return result
|
||||
}
|
||||
|
||||
// Check health if available
|
||||
if verifyInspect.State.Health != nil && verifyInspect.State.Health.Status == "unhealthy" {
|
||||
result.Error = "New container reported unhealthy status"
|
||||
a.logger.Error().Str("container", result.ContainerName).Msg("New container unhealthy, rolling back")
|
||||
// Rollback
|
||||
_ = a.docker.ContainerRemove(ctx, newContainerID, container.RemoveOptions{Force: true})
|
||||
_ = a.docker.ContainerRename(ctx, backupName, result.ContainerName)
|
||||
_ = a.docker.ContainerStart(ctx, containerID, container.StartOptions{})
|
||||
return result
|
||||
}
|
||||
|
||||
result.NewImageDigest = verifyInspect.Image
|
||||
|
||||
// 10. Schedule cleanup of backup container after a delay
|
||||
// This gives time to verify the new container is working
|
||||
go func() {
|
||||
|
|
|
|||
|
|
@ -63,4 +63,7 @@ var (
|
|||
selfUpdateFunc = func(a *Agent, ctx context.Context) error {
|
||||
return a.selfUpdate(ctx)
|
||||
}
|
||||
execCommandContextFn = func(ctx context.Context, name string, arg ...string) *exec.Cmd {
|
||||
return exec.CommandContext(ctx, name, arg...)
|
||||
}
|
||||
)
|
||||
|
|
|
|||
378
internal/dockeragent/self_update.go
Normal file
378
internal/dockeragent/self_update.go
Normal file
|
|
@ -0,0 +1,378 @@
|
|||
package dockeragent
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/sha256"
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
|
||||
"github.com/rcourtman/pulse-go-rewrite/internal/utils"
|
||||
)
|
||||
|
||||
// checkForUpdates checks if a newer version is available and performs self-update if needed
|
||||
func (a *Agent) checkForUpdates(ctx context.Context) {
|
||||
// Skip updates if disabled via config
|
||||
if a.cfg.DisableAutoUpdate {
|
||||
a.logger.Info().Msg("Skipping update check - auto-update disabled")
|
||||
return
|
||||
}
|
||||
|
||||
// Skip updates in development mode to prevent update loops
|
||||
if Version == "dev" {
|
||||
a.logger.Debug().Msg("Skipping update check - running in development mode")
|
||||
return
|
||||
}
|
||||
|
||||
a.logger.Debug().Msg("Checking for agent updates")
|
||||
|
||||
target := a.primaryTarget()
|
||||
if target.URL == "" {
|
||||
a.logger.Debug().Msg("Skipping update check - no Pulse target configured")
|
||||
return
|
||||
}
|
||||
|
||||
// Get current version from server
|
||||
url := fmt.Sprintf("%s/api/agent/version", target.URL)
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
|
||||
if err != nil {
|
||||
a.logger.Warn().Err(err).Msg("Failed to create version check request")
|
||||
return
|
||||
}
|
||||
|
||||
if target.Token != "" {
|
||||
req.Header.Set("X-API-Token", target.Token)
|
||||
req.Header.Set("Authorization", "Bearer "+target.Token)
|
||||
}
|
||||
|
||||
client := a.httpClientFor(target)
|
||||
resp, err := client.Do(req)
|
||||
if err != nil {
|
||||
a.logger.Warn().Err(err).Msg("Failed to check for updates")
|
||||
return
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
a.logger.Warn().Int("status", resp.StatusCode).Msg("Version endpoint returned non-200 status")
|
||||
return
|
||||
}
|
||||
|
||||
var versionResp struct {
|
||||
Version string `json:"version"`
|
||||
}
|
||||
|
||||
if err := json.NewDecoder(resp.Body).Decode(&versionResp); err != nil {
|
||||
a.logger.Warn().Err(err).Msg("Failed to decode version response")
|
||||
return
|
||||
}
|
||||
|
||||
// Skip updates if server is also in development mode
|
||||
if versionResp.Version == "dev" {
|
||||
a.logger.Debug().Msg("Skipping update - server is in development mode")
|
||||
return
|
||||
}
|
||||
|
||||
// Compare versions - normalize by stripping "v" prefix for comparison.
|
||||
// Server returns version without prefix (e.g., "4.33.1"), but agent's
|
||||
// Version may include it (e.g., "v4.33.1") depending on build.
|
||||
if utils.NormalizeVersion(versionResp.Version) == utils.NormalizeVersion(Version) {
|
||||
a.logger.Debug().Str("version", Version).Msg("Agent is up to date")
|
||||
return
|
||||
}
|
||||
|
||||
a.logger.Info().
|
||||
Str("currentVersion", Version).
|
||||
Str("availableVersion", versionResp.Version).
|
||||
Msg("New agent version available, performing self-update")
|
||||
|
||||
// Perform self-update
|
||||
if err := selfUpdateFunc(a, ctx); err != nil {
|
||||
a.logger.Error().Err(err).Msg("Failed to self-update agent")
|
||||
return
|
||||
}
|
||||
|
||||
a.logger.Info().Msg("Agent updated successfully, restarting...")
|
||||
}
|
||||
|
||||
// isUnraid checks if we're running on Unraid by looking for /etc/unraid-version
|
||||
func isUnraid() bool {
|
||||
_, err := osStatFn(unraidVersionPath)
|
||||
return err == nil
|
||||
}
|
||||
|
||||
// resolveSymlink resolves symlinks to get the real path of a file.
|
||||
// This is needed for self-update because os.Rename() fails across filesystems.
|
||||
func resolveSymlink(path string) (string, error) {
|
||||
return filepath.EvalSymlinks(path)
|
||||
}
|
||||
|
||||
// verifyELFMagic checks that the file is a valid ELF binary
|
||||
func verifyELFMagic(path string) error {
|
||||
f, err := os.Open(path)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
magic := make([]byte, 4)
|
||||
if _, err := io.ReadFull(f, magic); err != nil {
|
||||
return fmt.Errorf("failed to read magic bytes: %w", err)
|
||||
}
|
||||
|
||||
// ELF magic: 0x7f 'E' 'L' 'F'
|
||||
if magic[0] == 0x7f && magic[1] == 'E' && magic[2] == 'L' && magic[3] == 'F' {
|
||||
return nil
|
||||
}
|
||||
return errors.New("not a valid ELF binary")
|
||||
}
|
||||
|
||||
func determineSelfUpdateArch() string {
|
||||
switch goArch {
|
||||
case "amd64":
|
||||
return "linux-amd64"
|
||||
case "arm64":
|
||||
return "linux-arm64"
|
||||
case "arm":
|
||||
return "linux-armv7"
|
||||
}
|
||||
|
||||
out, err := unameMachine()
|
||||
if err != nil {
|
||||
return ""
|
||||
}
|
||||
|
||||
normalized := strings.ToLower(strings.TrimSpace(out))
|
||||
switch normalized {
|
||||
case "x86_64", "amd64":
|
||||
return "linux-amd64"
|
||||
case "aarch64", "arm64":
|
||||
return "linux-arm64"
|
||||
case "armv7l", "armhf", "armv7":
|
||||
return "linux-armv7"
|
||||
default:
|
||||
return ""
|
||||
}
|
||||
}
|
||||
|
||||
// selfUpdate downloads the new agent binary and replaces the current one
|
||||
func (a *Agent) selfUpdate(ctx context.Context) error {
|
||||
target := a.primaryTarget()
|
||||
if target.URL == "" {
|
||||
return errors.New("no Pulse target configured for self-update")
|
||||
}
|
||||
|
||||
// Get path to current executable
|
||||
execPath, err := osExecutableFn()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get executable path: %w", err)
|
||||
}
|
||||
|
||||
// Resolve symlinks to get the real path for atomic rename
|
||||
// os.Rename() fails across filesystems, so we need the actual target path
|
||||
realExecPath, err := resolveSymlink(execPath)
|
||||
if err != nil {
|
||||
a.logger.Debug().Err(err).Str("path", execPath).Msg("Failed to resolve symlinks, using original path")
|
||||
realExecPath = execPath
|
||||
} else if realExecPath != execPath {
|
||||
a.logger.Debug().
|
||||
Str("symlink", execPath).
|
||||
Str("target", realExecPath).
|
||||
Msg("Resolved symlink for self-update")
|
||||
}
|
||||
|
||||
downloadBase := strings.TrimRight(target.URL, "/") + "/download/pulse-docker-agent"
|
||||
archParam := determineSelfUpdateArch()
|
||||
|
||||
type downloadCandidate struct {
|
||||
url string
|
||||
arch string
|
||||
}
|
||||
|
||||
candidates := make([]downloadCandidate, 0, 2)
|
||||
if archParam != "" {
|
||||
candidates = append(candidates, downloadCandidate{
|
||||
url: fmt.Sprintf("%s?arch=%s", downloadBase, archParam),
|
||||
arch: archParam,
|
||||
})
|
||||
}
|
||||
candidates = append(candidates, downloadCandidate{url: downloadBase})
|
||||
|
||||
client := a.httpClientFor(target)
|
||||
var resp *http.Response
|
||||
lastErr := errors.New("failed to download new binary")
|
||||
|
||||
for _, candidate := range candidates {
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodGet, candidate.url, nil)
|
||||
if err != nil {
|
||||
lastErr = fmt.Errorf("failed to create download request: %w", err)
|
||||
continue
|
||||
}
|
||||
|
||||
if target.Token != "" {
|
||||
req.Header.Set("X-API-Token", target.Token)
|
||||
req.Header.Set("Authorization", "Bearer "+target.Token)
|
||||
}
|
||||
|
||||
response, err := client.Do(req)
|
||||
if err != nil {
|
||||
lastErr = fmt.Errorf("failed to download new binary: %w", err)
|
||||
continue
|
||||
}
|
||||
|
||||
if response.StatusCode != http.StatusOK {
|
||||
lastErr = fmt.Errorf("download failed with status: %s", response.Status)
|
||||
response.Body.Close()
|
||||
continue
|
||||
}
|
||||
|
||||
resp = response
|
||||
if candidate.arch != "" {
|
||||
a.logger.Debug().
|
||||
Str("arch", candidate.arch).
|
||||
Msg("Self-update: downloaded architecture-specific agent binary")
|
||||
} else if archParam != "" {
|
||||
a.logger.Debug().Msg("Self-update: falling back to server default agent binary")
|
||||
}
|
||||
break
|
||||
}
|
||||
|
||||
if resp == nil {
|
||||
return lastErr
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
checksumHeader := strings.TrimSpace(resp.Header.Get("X-Checksum-Sha256"))
|
||||
|
||||
// Create temporary file in the same directory as the target binary
|
||||
// to ensure atomic rename works (os.Rename fails across filesystems)
|
||||
targetDir := filepath.Dir(realExecPath)
|
||||
tmpFile, err := osCreateTempFn(targetDir, "pulse-docker-agent-*.tmp")
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create temp file in %s: %w", targetDir, err)
|
||||
}
|
||||
tmpPath := tmpFile.Name()
|
||||
defer osRemoveFn(tmpPath) // Clean up if something goes wrong
|
||||
|
||||
// Write downloaded binary to temp file with size limit (100 MB max)
|
||||
const maxBinarySize = 100 * 1024 * 1024
|
||||
hasher := sha256.New()
|
||||
limitedReader := io.LimitReader(resp.Body, maxBinarySize+1)
|
||||
written, err := io.Copy(tmpFile, io.TeeReader(limitedReader, hasher))
|
||||
if err != nil {
|
||||
tmpFile.Close()
|
||||
return fmt.Errorf("failed to write downloaded binary: %w", err)
|
||||
}
|
||||
if written > maxBinarySize {
|
||||
tmpFile.Close()
|
||||
return fmt.Errorf("downloaded binary exceeds maximum size (%d bytes)", maxBinarySize)
|
||||
}
|
||||
if err := closeFileFn(tmpFile); err != nil {
|
||||
return fmt.Errorf("failed to close temp file: %w", err)
|
||||
}
|
||||
|
||||
// Verify it's a valid ELF binary (basic sanity check for Linux)
|
||||
if err := verifyELFMagic(tmpPath); err != nil {
|
||||
return fmt.Errorf("downloaded file is not a valid executable: %w", err)
|
||||
}
|
||||
|
||||
// Verify checksum (mandatory for security)
|
||||
downloadChecksum := hex.EncodeToString(hasher.Sum(nil))
|
||||
if checksumHeader == "" {
|
||||
return fmt.Errorf("server did not provide checksum header (X-Checksum-Sha256); refusing update for security")
|
||||
}
|
||||
|
||||
expected := strings.ToLower(strings.TrimSpace(checksumHeader))
|
||||
actual := strings.ToLower(downloadChecksum)
|
||||
if expected != actual {
|
||||
return fmt.Errorf("checksum verification failed: expected %s, got %s", expected, actual)
|
||||
}
|
||||
a.logger.Debug().Str("checksum", downloadChecksum).Msg("Self-update: checksum verified")
|
||||
|
||||
// Verify cryptographic signature (X-Signature-Ed25519 header)
|
||||
signatureHeader := strings.TrimSpace(resp.Header.Get("X-Signature-Ed25519"))
|
||||
if signatureHeader != "" {
|
||||
if err := verifyFileSignature(tmpPath, signatureHeader); err != nil {
|
||||
return fmt.Errorf("signature verification failed: %w", err)
|
||||
}
|
||||
a.logger.Info().Msg("Self-update: cryptographic signature verified")
|
||||
} else {
|
||||
// For now, only warn if missing. In strict mode, we would error here.
|
||||
a.logger.Warn().Msg("Self-update: server did not provide cryptographic signature (X-Signature-Ed25519)")
|
||||
}
|
||||
|
||||
// Make temp file executable
|
||||
if err := osChmodFn(tmpPath, 0755); err != nil {
|
||||
return fmt.Errorf("failed to make temp file executable: %w", err)
|
||||
}
|
||||
|
||||
// Pre-flight check: Run the new binary with --self-test
|
||||
// We use the same environment as the current process
|
||||
// We also need to supply minimal required flags if they are mandatory (like token)
|
||||
// However, we just grab current args and replace the executable path,
|
||||
// and force --self-test
|
||||
a.logger.Debug().Msg("Self-update: running pre-flight check on new binary...")
|
||||
|
||||
// Construct args for self-test. We need a valid token source for config load to pass.
|
||||
// Since we are running on the same host, we can pass the token file or env.
|
||||
// Simplest approach: pass --self-test and --token=dummy (if validation requires it)
|
||||
// But our config loader checks token presence.
|
||||
// Let's use the actual token configured to be safe.
|
||||
checkCmd := execCommandContextFn(ctx, tmpPath, "--self-test", "--token", a.cfg.APIToken)
|
||||
if output, err := checkCmd.CombinedOutput(); err != nil {
|
||||
a.logger.Error().Str("output", string(output)).Err(err).Msg("Self-update: pre-flight check failed")
|
||||
return fmt.Errorf("new binary failed self-test: %w", err)
|
||||
}
|
||||
a.logger.Debug().Msg("Self-update: pre-flight check passed")
|
||||
|
||||
// Create backup of current binary (use realExecPath for atomic operations)
|
||||
backupPath := realExecPath + ".backup"
|
||||
if err := osRenameFn(realExecPath, backupPath); err != nil {
|
||||
return fmt.Errorf("failed to backup current binary: %w", err)
|
||||
}
|
||||
|
||||
// Move new binary to current location
|
||||
if err := osRenameFn(tmpPath, realExecPath); err != nil {
|
||||
// Restore backup on failure
|
||||
_ = osRenameFn(backupPath, realExecPath)
|
||||
return fmt.Errorf("failed to replace binary: %w", err)
|
||||
}
|
||||
|
||||
// Remove backup on success
|
||||
_ = osRemoveFn(backupPath)
|
||||
|
||||
// On Unraid, also update the persistent copy on the flash drive
|
||||
if isUnraid() {
|
||||
persistPath := unraidPersistPath
|
||||
if _, err := osStatFn(persistPath); err == nil {
|
||||
a.logger.Debug().Str("path", persistPath).Msg("Updating Unraid persistent binary")
|
||||
if newBinary, err := osReadFileFn(execPath); err == nil {
|
||||
tmpPersist := persistPath + ".tmp"
|
||||
if err := osWriteFileFn(tmpPersist, newBinary, 0644); err != nil {
|
||||
a.logger.Warn().Err(err).Msg("Failed to write Unraid persistent binary")
|
||||
} else if err := osRenameFn(tmpPersist, persistPath); err != nil {
|
||||
a.logger.Warn().Err(err).Msg("Failed to rename Unraid persistent binary")
|
||||
_ = osRemoveFn(tmpPersist)
|
||||
} else {
|
||||
a.logger.Info().Str("path", persistPath).Msg("Updated Unraid persistent binary")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Restart agent with same arguments
|
||||
args := os.Args
|
||||
env := os.Environ()
|
||||
|
||||
if err := syscallExecFn(execPath, args, env); err != nil {
|
||||
return fmt.Errorf("failed to restart agent: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
@ -10,6 +10,7 @@ import (
|
|||
"net/http"
|
||||
"net/http/httptest"
|
||||
"os"
|
||||
"os/exec"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"testing"
|
||||
|
|
@ -1065,6 +1066,11 @@ func TestSelfUpdate(t *testing.T) {
|
|||
swap(t, &syscallExecFn, func(string, []string, []string) error {
|
||||
return nil
|
||||
})
|
||||
// Mock pre-flight check to succeed
|
||||
swap(t, &execCommandContextFn, func(ctx context.Context, name string, arg ...string) *exec.Cmd {
|
||||
// echo returns 0 exit code
|
||||
return exec.Command("echo", "ok")
|
||||
})
|
||||
|
||||
if err := agent.selfUpdate(context.Background()); err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
|
|
@ -1108,6 +1114,10 @@ func TestSelfUpdate(t *testing.T) {
|
|||
swap(t, &syscallExecFn, func(string, []string, []string) error {
|
||||
return errors.New("exec failed")
|
||||
})
|
||||
// Mock pre-flight check to succeed
|
||||
swap(t, &execCommandContextFn, func(ctx context.Context, name string, arg ...string) *exec.Cmd {
|
||||
return exec.Command("echo", "ok")
|
||||
})
|
||||
|
||||
if err := agent.selfUpdate(context.Background()); err == nil {
|
||||
t.Fatal("expected error from exec")
|
||||
|
|
|
|||
79
internal/dockeragent/signature.go
Normal file
79
internal/dockeragent/signature.go
Normal file
|
|
@ -0,0 +1,79 @@
|
|||
package dockeragent
|
||||
|
||||
import (
|
||||
"crypto/ed25519"
|
||||
"crypto/x509"
|
||||
"encoding/base64"
|
||||
"encoding/pem"
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
)
|
||||
|
||||
// trustedPublicKeysPEM contains a list of trusted release public keys.
|
||||
// In a real build, these would be injected via ldflags.
|
||||
// Using a list allows for key rotation (start signing with new key, retire old key later).
|
||||
var trustedPublicKeysPEM = []string{
|
||||
`-----BEGIN PUBLIC KEY-----
|
||||
MCowBQYDK2VwAyEAlbXZQRx8jgMzwpXbbjOGcnA+9TG0lms/auxbPzY+Tdo=
|
||||
-----END PUBLIC KEY-----`,
|
||||
}
|
||||
|
||||
// verifySignature checks if the provided binary data matches the signature
|
||||
// using ANY of the trusted Ed25519 public keys.
|
||||
func verifySignature(binaryData []byte, signatureBase64 string) error {
|
||||
if signatureBase64 == "" {
|
||||
return errors.New("missing signature")
|
||||
}
|
||||
|
||||
// Decode the signature once
|
||||
sigBytes, err := base64.StdEncoding.DecodeString(signatureBase64)
|
||||
if err != nil {
|
||||
return fmt.Errorf("invalid base64 signature: %w", err)
|
||||
}
|
||||
|
||||
var lastErr error
|
||||
|
||||
// Try each trusted key
|
||||
for _, keyPEM := range trustedPublicKeysPEM {
|
||||
block, _ := pem.Decode([]byte(keyPEM))
|
||||
if block == nil || block.Type != "PUBLIC KEY" {
|
||||
lastErr = errors.New("failed to decode one of the trusted public keys")
|
||||
continue
|
||||
}
|
||||
|
||||
pub, err := x509.ParsePKIXPublicKey(block.Bytes)
|
||||
if err != nil {
|
||||
lastErr = fmt.Errorf("failed to parse trusted public key: %w", err)
|
||||
continue
|
||||
}
|
||||
|
||||
edPub, ok := pub.(ed25519.PublicKey)
|
||||
if !ok {
|
||||
lastErr = errors.New("trusted key is not an Ed25519 public key")
|
||||
continue
|
||||
}
|
||||
|
||||
// If verification succeeds, we are valid!
|
||||
if ed25519.Verify(edPub, binaryData, sigBytes) {
|
||||
return nil
|
||||
}
|
||||
|
||||
lastErr = errors.New("signature verification failed for key")
|
||||
}
|
||||
|
||||
// If we're here, no key verified the signature
|
||||
if lastErr != nil {
|
||||
return fmt.Errorf("cryptographic signature verification failed against all trusted keys")
|
||||
}
|
||||
return errors.New("no trusted keys available for verification")
|
||||
}
|
||||
|
||||
// verifyFileSignature reads the file and verifies its signature.
|
||||
func verifyFileSignature(path string, signatureBase64 string) error {
|
||||
data, err := os.ReadFile(path)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to read file for verification: %w", err)
|
||||
}
|
||||
return verifySignature(data, signatureBase64)
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue