Pulse/internal/updates/sse.go
Claude 0af921dc23 Refactor update service to eliminate polling and race conditions
This commit implements a comprehensive refactoring of the update system
to address race conditions, redundant polling, and rate limiting issues.

Backend changes:
- Add job queue system to ensure only ONE update runs at a time
- Implement Server-Sent Events (SSE) for real-time update progress
- Add rate limiting to /api/updates/status (5-second minimum per client)
- Create SSE broadcaster for push-based status updates
- Integrate job queue with update manager for atomic operations
- Add comprehensive unit tests for queue and SSE components

Frontend changes:
- Update UpdateProgressModal to use SSE as primary mechanism
- Implement automatic fallback to polling when SSE unavailable
- Maintain backward compatibility with existing update flow
- Clean up SSE connections on component unmount

API changes:
- Add new endpoint: GET /api/updates/stream (SSE)
- Enhance /api/updates/status with client-based rate limiting
- Return cached status with appropriate headers when rate limited

Benefits:
- Eliminates 429 rate limit errors during updates
- Only one update job can run at a time (prevents race conditions)
- Real-time updates via SSE reduce unnecessary polling
- Graceful degradation to polling when SSE unavailable
- Better resource utilization and reduced server load

Testing:
- All existing tests pass
- New unit tests for queue and SSE functionality
- Integration tests verify complete update flow
2025-11-11 09:33:05 +00:00

281 lines
6.2 KiB
Go

package updates
import (
"encoding/json"
"fmt"
"net/http"
"sync"
"time"
"github.com/rs/zerolog/log"
)
// SSEClient represents a Server-Sent Events client
type SSEClient struct {
ID string
Writer http.ResponseWriter
Flusher http.Flusher
Done chan bool
LastActive time.Time
}
// SSEBroadcaster manages Server-Sent Events connections for update progress
type SSEBroadcaster struct {
mu sync.RWMutex
clients map[string]*SSEClient
messageChan chan UpdateStatus
cachedStatus UpdateStatus
cachedStatusTime time.Time
statusMu sync.RWMutex
}
// NewSSEBroadcaster creates a new SSE broadcaster
func NewSSEBroadcaster() *SSEBroadcaster {
b := &SSEBroadcaster{
clients: make(map[string]*SSEClient),
messageChan: make(chan UpdateStatus, 100),
cachedStatus: UpdateStatus{
Status: "idle",
UpdatedAt: time.Now().Format(time.RFC3339),
},
cachedStatusTime: time.Now(),
}
// Start the broadcast loop
go b.broadcastLoop()
// Start client cleanup loop
go b.cleanupLoop()
return b
}
// AddClient registers a new SSE client
func (b *SSEBroadcaster) AddClient(w http.ResponseWriter, clientID string) *SSEClient {
flusher, ok := w.(http.Flusher)
if !ok {
log.Error().Msg("Streaming not supported by response writer")
return nil
}
client := &SSEClient{
ID: clientID,
Writer: w,
Flusher: flusher,
Done: make(chan bool, 1),
LastActive: time.Now(),
}
b.mu.Lock()
b.clients[clientID] = client
b.mu.Unlock()
log.Info().
Str("client_id", clientID).
Int("total_clients", len(b.clients)).
Msg("SSE client connected")
// Send the current cached status immediately to the new client
b.statusMu.RLock()
cachedStatus := b.cachedStatus
b.statusMu.RUnlock()
go func() {
b.sendToClient(client, cachedStatus)
}()
return client
}
// RemoveClient unregisters an SSE client
func (b *SSEBroadcaster) RemoveClient(clientID string) {
b.mu.Lock()
defer b.mu.Unlock()
if client, exists := b.clients[clientID]; exists {
close(client.Done)
delete(b.clients, clientID)
log.Info().
Str("client_id", clientID).
Int("total_clients", len(b.clients)).
Msg("SSE client disconnected")
}
}
// Broadcast sends an update status to all connected clients
func (b *SSEBroadcaster) Broadcast(status UpdateStatus) {
// Update cached status
b.statusMu.Lock()
b.cachedStatus = status
b.cachedStatusTime = time.Now()
b.statusMu.Unlock()
// Send to message channel (non-blocking)
select {
case b.messageChan <- status:
default:
log.Warn().Msg("SSE message channel full, dropping message")
}
}
// GetCachedStatus returns the last broadcasted status
func (b *SSEBroadcaster) GetCachedStatus() (UpdateStatus, time.Time) {
b.statusMu.RLock()
defer b.statusMu.RUnlock()
return b.cachedStatus, b.cachedStatusTime
}
// GetClientCount returns the number of connected clients
func (b *SSEBroadcaster) GetClientCount() int {
b.mu.RLock()
defer b.mu.RUnlock()
return len(b.clients)
}
// broadcastLoop continuously broadcasts messages to all clients
func (b *SSEBroadcaster) broadcastLoop() {
for status := range b.messageChan {
b.mu.RLock()
clients := make([]*SSEClient, 0, len(b.clients))
for _, client := range b.clients {
clients = append(clients, client)
}
b.mu.RUnlock()
// Send to all clients in parallel
for _, client := range clients {
go b.sendToClient(client, status)
}
log.Debug().
Str("status", status.Status).
Int("progress", status.Progress).
Int("clients", len(clients)).
Msg("Broadcasted update status to SSE clients")
}
}
// sendToClient sends a message to a specific client
func (b *SSEBroadcaster) sendToClient(client *SSEClient, status UpdateStatus) {
// Check if client is already disconnected
select {
case <-client.Done:
return
default:
}
// Marshal status to JSON
data, err := json.Marshal(status)
if err != nil {
log.Error().Err(err).Str("client_id", client.ID).Msg("Failed to marshal status for SSE")
return
}
// Write SSE message format
// Format: data: {json}\n\n
message := fmt.Sprintf("data: %s\n\n", string(data))
defer func() {
if r := recover(); r != nil {
log.Warn().
Str("client_id", client.ID).
Interface("panic", r).
Msg("Recovered from panic while sending to SSE client")
b.RemoveClient(client.ID)
}
}()
// Write to client
_, err = fmt.Fprint(client.Writer, message)
if err != nil {
log.Debug().
Err(err).
Str("client_id", client.ID).
Msg("Failed to write to SSE client, removing")
b.RemoveClient(client.ID)
return
}
// Flush immediately
client.Flusher.Flush()
client.LastActive = time.Now()
}
// cleanupLoop periodically removes inactive clients
func (b *SSEBroadcaster) cleanupLoop() {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for range ticker.C {
now := time.Now()
b.mu.Lock()
idsToRemove := make([]string, 0)
for id, client := range b.clients {
// Remove clients inactive for more than 5 minutes
if now.Sub(client.LastActive) > 5*time.Minute {
idsToRemove = append(idsToRemove, id)
}
}
for _, id := range idsToRemove {
if client, exists := b.clients[id]; exists {
close(client.Done)
delete(b.clients, id)
log.Info().
Str("client_id", id).
Msg("Removed inactive SSE client")
}
}
b.mu.Unlock()
}
}
// SendHeartbeat sends a heartbeat comment to all clients to keep connections alive
func (b *SSEBroadcaster) SendHeartbeat() {
b.mu.RLock()
clients := make([]*SSEClient, 0, len(b.clients))
for _, client := range b.clients {
clients = append(clients, client)
}
b.mu.RUnlock()
for _, client := range clients {
go func(c *SSEClient) {
select {
case <-c.Done:
return
default:
}
defer func() {
if r := recover(); r != nil {
b.RemoveClient(c.ID)
}
}()
// Send SSE comment as heartbeat
_, err := fmt.Fprint(c.Writer, ": heartbeat\n\n")
if err != nil {
b.RemoveClient(c.ID)
return
}
c.Flusher.Flush()
}(client)
}
}
// Close closes the broadcaster and disconnects all clients
func (b *SSEBroadcaster) Close() {
b.mu.Lock()
defer b.mu.Unlock()
for id, client := range b.clients {
close(client.Done)
delete(b.clients, id)
}
close(b.messageChan)
log.Info().Msg("SSE broadcaster closed")
}