safing-portmaster/spn/terminal/control_flow.go
Daniel Hååvi 80664d1a27
Restructure modules ()
* Move portbase into monorepo

* Add new simple module mgr

* [WIP] Switch to new simple module mgr

* Add StateMgr and more worker variants

* [WIP] Switch more modules

* [WIP] Switch more modules

* [WIP] swtich more modules

* [WIP] switch all SPN modules

* [WIP] switch all service modules

* [WIP] Convert all workers to the new module system

* [WIP] add new task system to module manager

* [WIP] Add second take for scheduling workers

* [WIP] Add FIXME for bugs in new scheduler

* [WIP] Add minor improvements to scheduler

* [WIP] Add new worker scheduler

* [WIP] Fix more bug related to new module system

* [WIP] Fix start handing of the new module system

* [WIP] Improve startup process

* [WIP] Fix minor issues

* [WIP] Fix missing subsystem in settings

* [WIP] Initialize managers in constructor

* [WIP] Move module event initialization to constrictors

* [WIP] Fix setting for enabling and disabling the SPN module

* [WIP] Move API registeration into module construction

* [WIP] Update states mgr for all modules

* [WIP] Add CmdLine operation support

* Add state helper methods to module group and instance

* Add notification and module status handling to status package

* Fix starting issues

* Remove pilot widget and update security lock to new status data

* Remove debug logs

* Improve http server shutdown

* Add workaround for cleanly shutting down firewall+netquery

* Improve logging

* Add syncing states with notifications for new module system

* Improve starting, stopping, shutdown; resolve FIXMEs/TODOs

* [WIP] Fix most unit tests

* Review new module system and fix minor issues

* Push shutdown and restart events again via API

* Set sleep mode via interface

* Update example/template module

* [WIP] Fix spn/cabin unit test

* Remove deprecated UI elements

* Make log output more similar for the logging transition phase

* Switch spn hub and observer cmds to new module system

* Fix log sources

* Make worker mgr less error prone

* Fix tests and minor issues

* Fix observation hub

* Improve shutdown and restart handling

* Split up big connection.go source file

* Move varint and dsd packages to structures repo

* Improve expansion test

* Fix linter warnings

* Fix interception module on windows

* Fix linter errors

---------

Co-authored-by: Vladimir Stoilov <vladimir@safing.io>
2024-08-09 18:15:48 +03:00

454 lines
12 KiB
Go

package terminal
import (
"context"
"fmt"
"sync"
"sync/atomic"
"time"
"github.com/safing/portmaster/service/mgr"
"github.com/safing/structures/varint"
)
// FlowControl defines the flow control interface.
type FlowControl interface {
Deliver(msg *Msg) *Error
Receive() <-chan *Msg
Send(msg *Msg, timeout time.Duration) *Error
ReadyToSend() <-chan struct{}
Flush(timeout time.Duration)
StartWorkers(m *mgr.Manager, terminalName string)
RecvQueueLen() int
SendQueueLen() int
}
// FlowControlType represents a flow control type.
type FlowControlType uint8
// Flow Control Types.
const (
FlowControlDefault FlowControlType = 0
FlowControlDFQ FlowControlType = 1
FlowControlNone FlowControlType = 2
defaultFlowControl = FlowControlDFQ
)
// DefaultSize returns the default flow control size.
func (fct FlowControlType) DefaultSize() uint32 {
if fct == FlowControlDefault {
fct = defaultFlowControl
}
switch fct {
case FlowControlDFQ:
return 50000
case FlowControlNone:
return 10000
case FlowControlDefault:
fallthrough
default:
return 0
}
}
// Flow Queue Configuration.
const (
DefaultQueueSize = 50000
MaxQueueSize = 1000000
forceReportBelowPercent = 0.75
)
// DuplexFlowQueue is a duplex flow control mechanism using queues.
type DuplexFlowQueue struct {
// ti is the Terminal that is using the DFQ.
ctx context.Context
// submitUpstream is used to submit messages to the upstream channel.
submitUpstream func(msg *Msg, timeout time.Duration)
// sendQueue holds the messages that are waiting to be sent.
sendQueue chan *Msg
// prioMsgs holds the number of messages to send with high priority.
prioMsgs *int32
// sendSpace indicates the amount free slots in the recvQueue on the other end.
sendSpace *int32
// readyToSend is used to notify sending components that there is free space.
readyToSend chan struct{}
// wakeSender is used to wake a sender in case the sendSpace was zero and the
// sender is waiting for available space.
wakeSender chan struct{}
// recvQueue holds the messages that are waiting to be processed.
recvQueue chan *Msg
// reportedSpace indicates the amount of free slots that the other end knows
// about.
reportedSpace *int32
// spaceReportLock locks the calculation of space to report.
spaceReportLock sync.Mutex
// forceSpaceReport forces the sender to send a space report.
forceSpaceReport chan struct{}
// flush is used to send a finish function to the handler, which will write
// all pending messages and then call the received function.
flush chan func()
}
// NewDuplexFlowQueue returns a new duplex flow queue.
func NewDuplexFlowQueue(
ctx context.Context,
queueSize uint32,
submitUpstream func(msg *Msg, timeout time.Duration),
) *DuplexFlowQueue {
dfq := &DuplexFlowQueue{
ctx: ctx,
submitUpstream: submitUpstream,
sendQueue: make(chan *Msg, queueSize),
prioMsgs: new(int32),
sendSpace: new(int32),
readyToSend: make(chan struct{}),
wakeSender: make(chan struct{}, 1),
recvQueue: make(chan *Msg, queueSize),
reportedSpace: new(int32),
forceSpaceReport: make(chan struct{}, 1),
flush: make(chan func()),
}
atomic.StoreInt32(dfq.sendSpace, int32(queueSize))
atomic.StoreInt32(dfq.reportedSpace, int32(queueSize))
return dfq
}
// StartWorkers starts the necessary workers to operate the flow queue.
func (dfq *DuplexFlowQueue) StartWorkers(m *mgr.Manager, terminalName string) {
m.Go(terminalName+" flow queue", dfq.FlowHandler)
}
// shouldReportRecvSpace returns whether the receive space should be reported.
func (dfq *DuplexFlowQueue) shouldReportRecvSpace() bool {
return atomic.LoadInt32(dfq.reportedSpace) < int32(float32(cap(dfq.recvQueue))*forceReportBelowPercent)
}
// decrementReportedRecvSpace decreases the reported recv space by 1 and
// returns if the receive space should be reported.
func (dfq *DuplexFlowQueue) decrementReportedRecvSpace() (shouldReportRecvSpace bool) {
return atomic.AddInt32(dfq.reportedSpace, -1) < int32(float32(cap(dfq.recvQueue))*forceReportBelowPercent)
}
// getSendSpace returns the current send space.
func (dfq *DuplexFlowQueue) getSendSpace() int32 {
return atomic.LoadInt32(dfq.sendSpace)
}
// decrementSendSpace decreases the send space by 1 and returns it.
func (dfq *DuplexFlowQueue) decrementSendSpace() int32 {
return atomic.AddInt32(dfq.sendSpace, -1)
}
func (dfq *DuplexFlowQueue) addToSendSpace(n int32) {
// Add new space to send space and check if it was zero.
atomic.AddInt32(dfq.sendSpace, n)
// Wake the sender in case it is waiting.
select {
case dfq.wakeSender <- struct{}{}:
default:
}
}
// reportableRecvSpace returns how much free space can be reported to the other
// end. The returned number must be communicated to the other end and must not
// be ignored.
func (dfq *DuplexFlowQueue) reportableRecvSpace() int32 {
// Changes to the recvQueue during calculation are no problem.
// We don't want to report space twice though!
dfq.spaceReportLock.Lock()
defer dfq.spaceReportLock.Unlock()
// Calculate reportable receive space and add it to the reported space.
reportedSpace := atomic.LoadInt32(dfq.reportedSpace)
toReport := int32(cap(dfq.recvQueue)-len(dfq.recvQueue)) - reportedSpace
// Never report values below zero.
// This can happen, as dfq.reportedSpace is decreased after a container is
// submitted to dfq.recvQueue by dfq.Deliver(). This race condition can only
// lower the space to report, not increase it. A simple check here solved
// this problem and keeps performance high.
// Also, don't report values of 1, as the benefit is minimal and this might
// be commonly triggered due to the buffer of the force report channel.
if toReport <= 1 {
return 0
}
// Add space to report to dfq.reportedSpace and return it.
atomic.AddInt32(dfq.reportedSpace, toReport)
return toReport
}
// FlowHandler handles all flow queue internals and must be started as a worker
// in the module where it is used.
func (dfq *DuplexFlowQueue) FlowHandler(_ *mgr.WorkerCtx) error {
// The upstreamSender is started by the terminal module, but is tied to the
// flow owner instead. Make sure that the flow owner's module depends on the
// terminal module so that it is shut down earlier.
var sendSpaceDepleted bool
var flushFinished func()
// Drain all queues when shutting down.
defer func() {
for {
select {
case msg := <-dfq.sendQueue:
msg.Finish()
case msg := <-dfq.recvQueue:
msg.Finish()
default:
return
}
}
}()
sending:
for {
// If the send queue is depleted, wait to be woken.
if sendSpaceDepleted {
select {
case <-dfq.wakeSender:
if dfq.getSendSpace() > 0 {
sendSpaceDepleted = false
} else {
continue sending
}
case <-dfq.forceSpaceReport:
// Forced reporting of space.
// We do not need to check if there is enough sending space, as there is
// no data included.
spaceToReport := dfq.reportableRecvSpace()
if spaceToReport > 0 {
msg := NewMsg(varint.Pack64(uint64(spaceToReport)))
dfq.submitUpstream(msg, 0)
}
continue sending
case <-dfq.ctx.Done():
return nil
}
}
// Get message from send queue.
select {
case dfq.readyToSend <- struct{}{}:
// Notify that we are ready to send.
case msg := <-dfq.sendQueue:
// Send message from queue.
// If nil, the queue is being shut down.
if msg == nil {
return nil
}
// Check if we are handling a high priority message or waiting for one.
// Mark any msgs as high priority, when there is one in the pipeline.
remainingPrioMsgs := atomic.AddInt32(dfq.prioMsgs, -1)
switch {
case remainingPrioMsgs >= 0:
msg.Unit.MakeHighPriority()
case remainingPrioMsgs < -30_000:
// Prevent wrap to positive.
// Compatible with int16 or bigger.
atomic.StoreInt32(dfq.prioMsgs, 0)
}
// Wait for processing slot.
msg.Unit.WaitForSlot()
// Prepend available receiving space.
msg.Data.Prepend(varint.Pack64(uint64(dfq.reportableRecvSpace())))
// Submit for sending upstream.
dfq.submitUpstream(msg, 0)
// Decrease the send space and set flag if depleted.
if dfq.decrementSendSpace() <= 0 {
sendSpaceDepleted = true
}
// Check if the send queue is empty now and signal flushers.
if flushFinished != nil && len(dfq.sendQueue) == 0 {
flushFinished()
flushFinished = nil
}
case <-dfq.forceSpaceReport:
// Forced reporting of space.
// We do not need to check if there is enough sending space, as there is
// no data included.
spaceToReport := dfq.reportableRecvSpace()
if spaceToReport > 0 {
msg := NewMsg(varint.Pack64(uint64(spaceToReport)))
dfq.submitUpstream(msg, 0)
}
case newFlushFinishedFn := <-dfq.flush:
// Signal immediately if send queue is empty.
if len(dfq.sendQueue) == 0 {
newFlushFinishedFn()
} else {
// If there already is a flush finished function, stack them.
if flushFinished != nil {
stackedFlushFinishFn := flushFinished
flushFinished = func() {
stackedFlushFinishFn()
newFlushFinishedFn()
}
} else {
flushFinished = newFlushFinishedFn
}
}
case <-dfq.ctx.Done():
return nil
}
}
}
// Flush waits for all waiting data to be sent.
func (dfq *DuplexFlowQueue) Flush(timeout time.Duration) {
// Create channel and function for notifying.
wait := make(chan struct{})
finished := func() {
close(wait)
}
// Request flush and return when stopping.
select {
case dfq.flush <- finished:
case <-dfq.ctx.Done():
return
case <-TimedOut(timeout):
return
}
// Wait for flush to finish and return when stopping.
select {
case <-wait:
case <-dfq.ctx.Done():
case <-TimedOut(timeout):
}
}
var ready = make(chan struct{})
func init() {
close(ready)
}
// ReadyToSend returns a channel that can be read when data can be sent.
func (dfq *DuplexFlowQueue) ReadyToSend() <-chan struct{} {
if atomic.LoadInt32(dfq.sendSpace) > 0 {
return ready
}
return dfq.readyToSend
}
// Send adds the given container to the send queue.
func (dfq *DuplexFlowQueue) Send(msg *Msg, timeout time.Duration) *Error {
select {
case dfq.sendQueue <- msg:
if msg.Unit.IsHighPriority() {
// Reset prioMsgs to the current queue size, so that all waiting and the
// message we just added are all handled as high priority.
atomic.StoreInt32(dfq.prioMsgs, int32(len(dfq.sendQueue)))
}
return nil
case <-TimedOut(timeout):
msg.Finish()
return ErrTimeout
case <-dfq.ctx.Done():
msg.Finish()
return ErrStopping
}
}
// Receive receives a container from the recv queue.
func (dfq *DuplexFlowQueue) Receive() <-chan *Msg {
// If the reported recv space is nearing its end, force a report.
if dfq.shouldReportRecvSpace() {
select {
case dfq.forceSpaceReport <- struct{}{}:
default:
}
}
return dfq.recvQueue
}
// Deliver submits a container for receiving from upstream.
func (dfq *DuplexFlowQueue) Deliver(msg *Msg) *Error {
// Ignore nil containers.
if msg == nil || msg.Data == nil {
msg.Finish()
return ErrMalformedData.With("no data")
}
// Get and add new reported space.
addSpace, err := msg.Data.GetNextN16()
if err != nil {
msg.Finish()
return ErrMalformedData.With("failed to parse reported space: %w", err)
}
if addSpace > 0 {
dfq.addToSendSpace(int32(addSpace))
}
// Abort processing if the container only contained a space update.
if !msg.Data.HoldsData() {
msg.Finish()
return nil
}
select {
case dfq.recvQueue <- msg:
// If the recv queue accepted the Container, decrement the recv space.
shouldReportRecvSpace := dfq.decrementReportedRecvSpace()
// If the reported recv space is nearing its end, force a report, if the
// sender worker is idle.
if shouldReportRecvSpace {
select {
case dfq.forceSpaceReport <- struct{}{}:
default:
}
}
return nil
default:
// If the recv queue is full, return an error.
// The whole point of the flow queue is to guarantee that this never happens.
msg.Finish()
return ErrQueueOverflow
}
}
// FlowStats returns a k=v formatted string of internal stats.
func (dfq *DuplexFlowQueue) FlowStats() string {
return fmt.Sprintf(
"sq=%d rq=%d sends=%d reps=%d",
len(dfq.sendQueue),
len(dfq.recvQueue),
atomic.LoadInt32(dfq.sendSpace),
atomic.LoadInt32(dfq.reportedSpace),
)
}
// RecvQueueLen returns the current length of the receive queue.
func (dfq *DuplexFlowQueue) RecvQueueLen() int {
return len(dfq.recvQueue)
}
// SendQueueLen returns the current length of the send queue.
func (dfq *DuplexFlowQueue) SendQueueLen() int {
return len(dfq.sendQueue)
}