diff --git a/plugins/host_kvstore.go b/plugins/host_kvstore.go index 248e43c4d..c3f6ec734 100644 --- a/plugins/host_kvstore.go +++ b/plugins/host_kvstore.go @@ -9,6 +9,7 @@ import ( "path/filepath" "slices" "strings" + "sync" "time" "github.com/dustin/go-humanize" @@ -35,6 +36,8 @@ type kvstoreServiceImpl struct { pluginName string db *sql.DB maxSize int64 + cancel context.CancelFunc + wg sync.WaitGroup } // newKVStoreService creates a new kvstoreServiceImpl instance with its own SQLite database. @@ -74,12 +77,15 @@ func newKVStoreService(ctx context.Context, pluginName string, perm *KVStorePerm log.Debug("Initialized plugin kvstore", "plugin", pluginName, "path", dbPath, "maxSize", humanize.Bytes(uint64(maxSize))) + cleanupCtx, cancel := context.WithCancel(ctx) svc := &kvstoreServiceImpl{ pluginName: pluginName, db: db, maxSize: maxSize, + cancel: cancel, } - go svc.cleanupLoop(ctx) + svc.wg.Add(1) + go svc.cleanupLoop(cleanupCtx) return svc, nil } @@ -335,6 +341,7 @@ func (s *kvstoreServiceImpl) GetMany(ctx context.Context, keys []string) (map[st // cleanupLoop periodically removes expired keys from the database. // It stops when the provided context is cancelled. func (s *kvstoreServiceImpl) cleanupLoop(ctx context.Context) { + defer s.wg.Done() ticker := time.NewTicker(cleanupInterval) defer ticker.Stop() for { @@ -359,17 +366,12 @@ func (s *kvstoreServiceImpl) cleanupExpired(ctx context.Context) { } } -// Close runs a final cleanup and closes the SQLite database connection. -// The cleanup goroutine is stopped by the context passed to newKVStoreService. +// Close stops the cleanup goroutine and closes the SQLite database connection. func (s *kvstoreServiceImpl) Close() error { - if s.db != nil { - log.Debug("Closing plugin kvstore", "plugin", s.pluginName) - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - s.cleanupExpired(ctx) - return s.db.Close() - } - return nil + log.Debug("Closing plugin kvstore", "plugin", s.pluginName) + s.cancel() + s.wg.Wait() + return s.db.Close() } // Compile-time verification diff --git a/plugins/host_kvstore_test.go b/plugins/host_kvstore_test.go index 4928825ef..e5d467f79 100644 --- a/plugins/host_kvstore_test.go +++ b/plugins/host_kvstore_test.go @@ -445,6 +445,36 @@ var _ = Describe("KVStoreService", func() { }) }) + Describe("Close", func() { + It("does not race with cleanupLoop goroutine", func() { + // Create a service with a dedicated context so we can verify + // that Close() properly waits for the cleanup goroutine. + closeCtx, closeCancel := context.WithCancel(ctx) + defer closeCancel() + + maxSize := "1KB" + svc, err := newKVStoreService(closeCtx, "test_close_race", &KVStorePermission{MaxSize: &maxSize}) + Expect(err).ToNot(HaveOccurred()) + + // Insert an expired key so cleanup has work to do + _, err = svc.db.Exec(` + INSERT INTO kvstore (key, value, size, expires_at) + VALUES ('cleanup_race', 'old', 3, datetime('now', '-1 seconds')) + `) + Expect(err).ToNot(HaveOccurred()) + + // Close should not panic or produce "database is closed" errors. + // Before the fix, the cleanup goroutine could race with db.Close(). + err = svc.Close() + Expect(err).ToNot(HaveOccurred()) + + // Verify the database is actually closed (further queries should fail) + _, err = svc.db.Exec(`SELECT 1`) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("database is closed")) + }) + }) + Describe("SetWithTTL", func() { It("stores value that is retrievable before expiry", func() { err := service.SetWithTTL(ctx, "ttl_key", []byte("ttl_value"), 3600)