safing-portmaster/base/database/storage/hashmap/map.go
Daniel Hååvi 80664d1a27
Restructure modules (#1572)
* Move portbase into monorepo

* Add new simple module mgr

* [WIP] Switch to new simple module mgr

* Add StateMgr and more worker variants

* [WIP] Switch more modules

* [WIP] Switch more modules

* [WIP] swtich more modules

* [WIP] switch all SPN modules

* [WIP] switch all service modules

* [WIP] Convert all workers to the new module system

* [WIP] add new task system to module manager

* [WIP] Add second take for scheduling workers

* [WIP] Add FIXME for bugs in new scheduler

* [WIP] Add minor improvements to scheduler

* [WIP] Add new worker scheduler

* [WIP] Fix more bug related to new module system

* [WIP] Fix start handing of the new module system

* [WIP] Improve startup process

* [WIP] Fix minor issues

* [WIP] Fix missing subsystem in settings

* [WIP] Initialize managers in constructor

* [WIP] Move module event initialization to constrictors

* [WIP] Fix setting for enabling and disabling the SPN module

* [WIP] Move API registeration into module construction

* [WIP] Update states mgr for all modules

* [WIP] Add CmdLine operation support

* Add state helper methods to module group and instance

* Add notification and module status handling to status package

* Fix starting issues

* Remove pilot widget and update security lock to new status data

* Remove debug logs

* Improve http server shutdown

* Add workaround for cleanly shutting down firewall+netquery

* Improve logging

* Add syncing states with notifications for new module system

* Improve starting, stopping, shutdown; resolve FIXMEs/TODOs

* [WIP] Fix most unit tests

* Review new module system and fix minor issues

* Push shutdown and restart events again via API

* Set sleep mode via interface

* Update example/template module

* [WIP] Fix spn/cabin unit test

* Remove deprecated UI elements

* Make log output more similar for the logging transition phase

* Switch spn hub and observer cmds to new module system

* Fix log sources

* Make worker mgr less error prone

* Fix tests and minor issues

* Fix observation hub

* Improve shutdown and restart handling

* Split up big connection.go source file

* Move varint and dsd packages to structures repo

* Improve expansion test

* Fix linter warnings

* Fix interception module on windows

* Fix linter errors

---------

Co-authored-by: Vladimir Stoilov <vladimir@safing.io>
2024-08-09 18:15:48 +03:00

216 lines
4.5 KiB
Go

package hashmap
import (
"context"
"errors"
"fmt"
"sync"
"time"
"github.com/safing/portmaster/base/database/iterator"
"github.com/safing/portmaster/base/database/query"
"github.com/safing/portmaster/base/database/record"
"github.com/safing/portmaster/base/database/storage"
)
// HashMap storage.
type HashMap struct {
name string
db map[string]record.Record
dbLock sync.RWMutex
}
func init() {
_ = storage.Register("hashmap", NewHashMap)
}
// NewHashMap creates a hashmap database.
func NewHashMap(name, location string) (storage.Interface, error) {
return &HashMap{
name: name,
db: make(map[string]record.Record),
}, nil
}
// Get returns a database record.
func (hm *HashMap) Get(key string) (record.Record, error) {
hm.dbLock.RLock()
defer hm.dbLock.RUnlock()
r, ok := hm.db[key]
if !ok {
return nil, storage.ErrNotFound
}
return r, nil
}
// GetMeta returns the metadata of a database record.
func (hm *HashMap) GetMeta(key string) (*record.Meta, error) {
// TODO: Replace with more performant variant.
r, err := hm.Get(key)
if err != nil {
return nil, err
}
return r.Meta(), nil
}
// Put stores a record in the database.
func (hm *HashMap) Put(r record.Record) (record.Record, error) {
hm.dbLock.Lock()
defer hm.dbLock.Unlock()
hm.db[r.DatabaseKey()] = r
return r, nil
}
// PutMany stores many records in the database.
func (hm *HashMap) PutMany(shadowDelete bool) (chan<- record.Record, <-chan error) {
hm.dbLock.Lock()
defer hm.dbLock.Unlock()
// we could lock for every record, but we want to have the same behaviour
// as the other storage backends, especially for testing.
batch := make(chan record.Record, 100)
errs := make(chan error, 1)
// start handler
go func() {
for r := range batch {
hm.batchPutOrDelete(shadowDelete, r)
}
errs <- nil
}()
return batch, errs
}
func (hm *HashMap) batchPutOrDelete(shadowDelete bool, r record.Record) {
r.Lock()
defer r.Unlock()
hm.dbLock.Lock()
defer hm.dbLock.Unlock()
if !shadowDelete && r.Meta().IsDeleted() {
delete(hm.db, r.DatabaseKey())
} else {
hm.db[r.DatabaseKey()] = r
}
}
// Delete deletes a record from the database.
func (hm *HashMap) Delete(key string) error {
hm.dbLock.Lock()
defer hm.dbLock.Unlock()
delete(hm.db, key)
return nil
}
// Query returns a an iterator for the supplied query.
func (hm *HashMap) Query(q *query.Query, local, internal bool) (*iterator.Iterator, error) {
_, err := q.Check()
if err != nil {
return nil, fmt.Errorf("invalid query: %w", err)
}
queryIter := iterator.New()
go hm.queryExecutor(queryIter, q, local, internal)
return queryIter, nil
}
func (hm *HashMap) queryExecutor(queryIter *iterator.Iterator, q *query.Query, local, internal bool) {
hm.dbLock.RLock()
defer hm.dbLock.RUnlock()
var err error
mapLoop:
for key, record := range hm.db {
record.Lock()
if !q.MatchesKey(key) ||
!q.MatchesRecord(record) ||
!record.Meta().CheckValidity() ||
!record.Meta().CheckPermission(local, internal) {
record.Unlock()
continue
}
record.Unlock()
select {
case <-queryIter.Done:
break mapLoop
case queryIter.Next <- record:
default:
select {
case <-queryIter.Done:
break mapLoop
case queryIter.Next <- record:
case <-time.After(1 * time.Second):
err = errors.New("query timeout")
break mapLoop
}
}
}
queryIter.Finish(err)
}
// ReadOnly returns whether the database is read only.
func (hm *HashMap) ReadOnly() bool {
return false
}
// Injected returns whether the database is injected.
func (hm *HashMap) Injected() bool {
return false
}
// MaintainRecordStates maintains records states in the database.
func (hm *HashMap) MaintainRecordStates(ctx context.Context, purgeDeletedBefore time.Time, shadowDelete bool) error {
hm.dbLock.Lock()
defer hm.dbLock.Unlock()
now := time.Now().Unix()
purgeThreshold := purgeDeletedBefore.Unix()
for key, record := range hm.db {
// check if context is cancelled
select {
case <-ctx.Done():
return nil
default:
}
meta := record.Meta()
switch {
case meta.Deleted == 0 && meta.Expires > 0 && meta.Expires < now:
if shadowDelete {
// mark as deleted
record.Lock()
meta.Deleted = meta.Expires
record.Unlock()
continue
}
// Immediately delete expired entries if shadowDelete is disabled.
fallthrough
case meta.Deleted > 0 && (!shadowDelete || meta.Deleted < purgeThreshold):
// delete from storage
delete(hm.db, key)
}
}
return nil
}
// Shutdown shuts down the database.
func (hm *HashMap) Shutdown() error {
return nil
}