safing-portmaster/spn/terminal/session.go
Daniel Hååvi 80664d1a27
Restructure modules (#1572)
* Move portbase into monorepo

* Add new simple module mgr

* [WIP] Switch to new simple module mgr

* Add StateMgr and more worker variants

* [WIP] Switch more modules

* [WIP] Switch more modules

* [WIP] swtich more modules

* [WIP] switch all SPN modules

* [WIP] switch all service modules

* [WIP] Convert all workers to the new module system

* [WIP] add new task system to module manager

* [WIP] Add second take for scheduling workers

* [WIP] Add FIXME for bugs in new scheduler

* [WIP] Add minor improvements to scheduler

* [WIP] Add new worker scheduler

* [WIP] Fix more bug related to new module system

* [WIP] Fix start handing of the new module system

* [WIP] Improve startup process

* [WIP] Fix minor issues

* [WIP] Fix missing subsystem in settings

* [WIP] Initialize managers in constructor

* [WIP] Move module event initialization to constrictors

* [WIP] Fix setting for enabling and disabling the SPN module

* [WIP] Move API registeration into module construction

* [WIP] Update states mgr for all modules

* [WIP] Add CmdLine operation support

* Add state helper methods to module group and instance

* Add notification and module status handling to status package

* Fix starting issues

* Remove pilot widget and update security lock to new status data

* Remove debug logs

* Improve http server shutdown

* Add workaround for cleanly shutting down firewall+netquery

* Improve logging

* Add syncing states with notifications for new module system

* Improve starting, stopping, shutdown; resolve FIXMEs/TODOs

* [WIP] Fix most unit tests

* Review new module system and fix minor issues

* Push shutdown and restart events again via API

* Set sleep mode via interface

* Update example/template module

* [WIP] Fix spn/cabin unit test

* Remove deprecated UI elements

* Make log output more similar for the logging transition phase

* Switch spn hub and observer cmds to new module system

* Fix log sources

* Make worker mgr less error prone

* Fix tests and minor issues

* Fix observation hub

* Improve shutdown and restart handling

* Split up big connection.go source file

* Move varint and dsd packages to structures repo

* Improve expansion test

* Fix linter warnings

* Fix interception module on windows

* Fix linter errors

---------

Co-authored-by: Vladimir Stoilov <vladimir@safing.io>
2024-08-09 18:15:48 +03:00

166 lines
4.1 KiB
Go

package terminal
import (
"context"
"fmt"
"sync"
"sync/atomic"
"time"
"github.com/safing/portmaster/base/log"
)
const (
rateLimitMinOps = 250
rateLimitMaxOpsPerSecond = 5
rateLimitMinSuspicion = 25
rateLimitMinPermaSuspicion = rateLimitMinSuspicion * 100
rateLimitMaxSuspicionPerSecond = 1
// Make this big enough to trigger suspicion limit in first blast.
concurrencyPoolSize = 30
)
// Session holds terminal metadata for operations.
type Session struct {
sync.RWMutex
// Rate Limiting.
// started holds the unix timestamp in seconds when the session was started.
// It is set when the Session is created and may be treated as a constant.
started int64
// opCount is the amount of operations started (and not rate limited by suspicion).
opCount atomic.Int64
// suspicionScore holds a score of suspicious activity.
// Every suspicious operations is counted as at least 1.
// Rate limited operations because of suspicion are also counted as 1.
suspicionScore atomic.Int64
concurrencyPool chan struct{}
}
// SessionTerminal is an interface for terminals that support authorization.
type SessionTerminal interface {
GetSession() *Session
}
// SessionAddOn can be inherited by terminals to add support for sessions.
type SessionAddOn struct {
lock sync.Mutex
// session holds the terminal session.
session *Session
}
// GetSession returns the terminal's session.
func (t *SessionAddOn) GetSession() *Session {
t.lock.Lock()
defer t.lock.Unlock()
// Create session if it does not exist.
if t.session == nil {
t.session = NewSession()
}
return t.session
}
// NewSession returns a new session.
func NewSession() *Session {
return &Session{
started: time.Now().Unix() - 1, // Ensure a 1 second difference to current time.
concurrencyPool: make(chan struct{}, concurrencyPoolSize),
}
}
// RateLimitInfo returns some basic information about the status of the rate limiter.
func (s *Session) RateLimitInfo() string {
secondsActive := time.Now().Unix() - s.started
return fmt.Sprintf(
"%do/s %ds/s %ds",
s.opCount.Load()/secondsActive,
s.suspicionScore.Load()/secondsActive,
secondsActive,
)
}
// RateLimit enforces a rate and suspicion limit.
func (s *Session) RateLimit() *Error {
secondsActive := time.Now().Unix() - s.started
// Check the suspicion limit.
score := s.suspicionScore.Load()
if score > rateLimitMinSuspicion {
scorePerSecond := score / secondsActive
if scorePerSecond >= rateLimitMaxSuspicionPerSecond {
// Add current try to suspicion score.
s.suspicionScore.Add(1)
return ErrRateLimited
}
// Permanently rate limit if suspicion goes over the perma min limit and
// the suspicion score is greater than 80% of the operation count.
if score > rateLimitMinPermaSuspicion &&
score*5 > s.opCount.Load()*4 { // Think: 80*5 == 100*4
return ErrRateLimited
}
}
// Check the rate limit.
count := s.opCount.Add(1)
if count > rateLimitMinOps {
opsPerSecond := count / secondsActive
if opsPerSecond >= rateLimitMaxOpsPerSecond {
return ErrRateLimited
}
}
return nil
}
// Suspicion Factors.
const (
SusFactorCommon = 1
SusFactorWeirdButOK = 5
SusFactorQuiteUnusual = 10
SusFactorMustBeMalicious = 100
)
// ReportSuspiciousActivity reports suspicious activity of the terminal.
func (s *Session) ReportSuspiciousActivity(factor int64) {
s.suspicionScore.Add(factor)
}
// LimitConcurrency limits concurrent executions.
// If over the limit, waiting goroutines are selected randomly.
// It returns the context error if it was canceled.
func (s *Session) LimitConcurrency(ctx context.Context, f func()) error {
// Wait for place in pool.
select {
case <-ctx.Done():
return ctx.Err()
case s.concurrencyPool <- struct{}{}:
// We added our entry to the pool, continue with execution.
}
// Drain own spot if pool after execution.
defer func() {
select {
case <-s.concurrencyPool:
// Own entry drained.
default:
// This should never happen, but let's play safe and not deadlock when pool is empty.
log.Warningf("spn/session: failed to drain own entry from concurrency pool")
}
}()
// Execute and return.
f()
return nil
}