mirror of
https://github.com/safing/portmaster
synced 2025-04-20 10:59:10 +00:00
* Move portbase into monorepo * Add new simple module mgr * [WIP] Switch to new simple module mgr * Add StateMgr and more worker variants * [WIP] Switch more modules * [WIP] Switch more modules * [WIP] swtich more modules * [WIP] switch all SPN modules * [WIP] switch all service modules * [WIP] Convert all workers to the new module system * [WIP] add new task system to module manager * [WIP] Add second take for scheduling workers * [WIP] Add FIXME for bugs in new scheduler * [WIP] Add minor improvements to scheduler * [WIP] Add new worker scheduler * [WIP] Fix more bug related to new module system * [WIP] Fix start handing of the new module system * [WIP] Improve startup process * [WIP] Fix minor issues * [WIP] Fix missing subsystem in settings * [WIP] Initialize managers in constructor * [WIP] Move module event initialization to constrictors * [WIP] Fix setting for enabling and disabling the SPN module * [WIP] Move API registeration into module construction * [WIP] Update states mgr for all modules * [WIP] Add CmdLine operation support * Add state helper methods to module group and instance * Add notification and module status handling to status package * Fix starting issues * Remove pilot widget and update security lock to new status data * Remove debug logs * Improve http server shutdown * Add workaround for cleanly shutting down firewall+netquery * Improve logging * Add syncing states with notifications for new module system * Improve starting, stopping, shutdown; resolve FIXMEs/TODOs * [WIP] Fix most unit tests * Review new module system and fix minor issues * Push shutdown and restart events again via API * Set sleep mode via interface * Update example/template module * [WIP] Fix spn/cabin unit test * Remove deprecated UI elements * Make log output more similar for the logging transition phase * Switch spn hub and observer cmds to new module system * Fix log sources * Make worker mgr less error prone * Fix tests and minor issues * Fix observation hub * Improve shutdown and restart handling * Split up big connection.go source file * Move varint and dsd packages to structures repo * Improve expansion test * Fix linter warnings * Fix interception module on windows * Fix linter errors --------- Co-authored-by: Vladimir Stoilov <vladimir@safing.io>
454 lines
12 KiB
Go
454 lines
12 KiB
Go
package terminal
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/safing/portmaster/service/mgr"
|
|
"github.com/safing/structures/varint"
|
|
)
|
|
|
|
// FlowControl defines the flow control interface.
|
|
type FlowControl interface {
|
|
Deliver(msg *Msg) *Error
|
|
Receive() <-chan *Msg
|
|
Send(msg *Msg, timeout time.Duration) *Error
|
|
ReadyToSend() <-chan struct{}
|
|
Flush(timeout time.Duration)
|
|
StartWorkers(m *mgr.Manager, terminalName string)
|
|
RecvQueueLen() int
|
|
SendQueueLen() int
|
|
}
|
|
|
|
// FlowControlType represents a flow control type.
|
|
type FlowControlType uint8
|
|
|
|
// Flow Control Types.
|
|
const (
|
|
FlowControlDefault FlowControlType = 0
|
|
FlowControlDFQ FlowControlType = 1
|
|
FlowControlNone FlowControlType = 2
|
|
|
|
defaultFlowControl = FlowControlDFQ
|
|
)
|
|
|
|
// DefaultSize returns the default flow control size.
|
|
func (fct FlowControlType) DefaultSize() uint32 {
|
|
if fct == FlowControlDefault {
|
|
fct = defaultFlowControl
|
|
}
|
|
|
|
switch fct {
|
|
case FlowControlDFQ:
|
|
return 50000
|
|
case FlowControlNone:
|
|
return 10000
|
|
case FlowControlDefault:
|
|
fallthrough
|
|
default:
|
|
return 0
|
|
}
|
|
}
|
|
|
|
// Flow Queue Configuration.
|
|
const (
|
|
DefaultQueueSize = 50000
|
|
MaxQueueSize = 1000000
|
|
forceReportBelowPercent = 0.75
|
|
)
|
|
|
|
// DuplexFlowQueue is a duplex flow control mechanism using queues.
|
|
type DuplexFlowQueue struct {
|
|
// ti is the Terminal that is using the DFQ.
|
|
ctx context.Context
|
|
|
|
// submitUpstream is used to submit messages to the upstream channel.
|
|
submitUpstream func(msg *Msg, timeout time.Duration)
|
|
|
|
// sendQueue holds the messages that are waiting to be sent.
|
|
sendQueue chan *Msg
|
|
// prioMsgs holds the number of messages to send with high priority.
|
|
prioMsgs *int32
|
|
// sendSpace indicates the amount free slots in the recvQueue on the other end.
|
|
sendSpace *int32
|
|
// readyToSend is used to notify sending components that there is free space.
|
|
readyToSend chan struct{}
|
|
// wakeSender is used to wake a sender in case the sendSpace was zero and the
|
|
// sender is waiting for available space.
|
|
wakeSender chan struct{}
|
|
|
|
// recvQueue holds the messages that are waiting to be processed.
|
|
recvQueue chan *Msg
|
|
// reportedSpace indicates the amount of free slots that the other end knows
|
|
// about.
|
|
reportedSpace *int32
|
|
// spaceReportLock locks the calculation of space to report.
|
|
spaceReportLock sync.Mutex
|
|
// forceSpaceReport forces the sender to send a space report.
|
|
forceSpaceReport chan struct{}
|
|
|
|
// flush is used to send a finish function to the handler, which will write
|
|
// all pending messages and then call the received function.
|
|
flush chan func()
|
|
}
|
|
|
|
// NewDuplexFlowQueue returns a new duplex flow queue.
|
|
func NewDuplexFlowQueue(
|
|
ctx context.Context,
|
|
queueSize uint32,
|
|
submitUpstream func(msg *Msg, timeout time.Duration),
|
|
) *DuplexFlowQueue {
|
|
dfq := &DuplexFlowQueue{
|
|
ctx: ctx,
|
|
submitUpstream: submitUpstream,
|
|
sendQueue: make(chan *Msg, queueSize),
|
|
prioMsgs: new(int32),
|
|
sendSpace: new(int32),
|
|
readyToSend: make(chan struct{}),
|
|
wakeSender: make(chan struct{}, 1),
|
|
recvQueue: make(chan *Msg, queueSize),
|
|
reportedSpace: new(int32),
|
|
forceSpaceReport: make(chan struct{}, 1),
|
|
flush: make(chan func()),
|
|
}
|
|
atomic.StoreInt32(dfq.sendSpace, int32(queueSize))
|
|
atomic.StoreInt32(dfq.reportedSpace, int32(queueSize))
|
|
|
|
return dfq
|
|
}
|
|
|
|
// StartWorkers starts the necessary workers to operate the flow queue.
|
|
func (dfq *DuplexFlowQueue) StartWorkers(m *mgr.Manager, terminalName string) {
|
|
m.Go(terminalName+" flow queue", dfq.FlowHandler)
|
|
}
|
|
|
|
// shouldReportRecvSpace returns whether the receive space should be reported.
|
|
func (dfq *DuplexFlowQueue) shouldReportRecvSpace() bool {
|
|
return atomic.LoadInt32(dfq.reportedSpace) < int32(float32(cap(dfq.recvQueue))*forceReportBelowPercent)
|
|
}
|
|
|
|
// decrementReportedRecvSpace decreases the reported recv space by 1 and
|
|
// returns if the receive space should be reported.
|
|
func (dfq *DuplexFlowQueue) decrementReportedRecvSpace() (shouldReportRecvSpace bool) {
|
|
return atomic.AddInt32(dfq.reportedSpace, -1) < int32(float32(cap(dfq.recvQueue))*forceReportBelowPercent)
|
|
}
|
|
|
|
// getSendSpace returns the current send space.
|
|
func (dfq *DuplexFlowQueue) getSendSpace() int32 {
|
|
return atomic.LoadInt32(dfq.sendSpace)
|
|
}
|
|
|
|
// decrementSendSpace decreases the send space by 1 and returns it.
|
|
func (dfq *DuplexFlowQueue) decrementSendSpace() int32 {
|
|
return atomic.AddInt32(dfq.sendSpace, -1)
|
|
}
|
|
|
|
func (dfq *DuplexFlowQueue) addToSendSpace(n int32) {
|
|
// Add new space to send space and check if it was zero.
|
|
atomic.AddInt32(dfq.sendSpace, n)
|
|
// Wake the sender in case it is waiting.
|
|
select {
|
|
case dfq.wakeSender <- struct{}{}:
|
|
default:
|
|
}
|
|
}
|
|
|
|
// reportableRecvSpace returns how much free space can be reported to the other
|
|
// end. The returned number must be communicated to the other end and must not
|
|
// be ignored.
|
|
func (dfq *DuplexFlowQueue) reportableRecvSpace() int32 {
|
|
// Changes to the recvQueue during calculation are no problem.
|
|
// We don't want to report space twice though!
|
|
dfq.spaceReportLock.Lock()
|
|
defer dfq.spaceReportLock.Unlock()
|
|
|
|
// Calculate reportable receive space and add it to the reported space.
|
|
reportedSpace := atomic.LoadInt32(dfq.reportedSpace)
|
|
toReport := int32(cap(dfq.recvQueue)-len(dfq.recvQueue)) - reportedSpace
|
|
|
|
// Never report values below zero.
|
|
// This can happen, as dfq.reportedSpace is decreased after a container is
|
|
// submitted to dfq.recvQueue by dfq.Deliver(). This race condition can only
|
|
// lower the space to report, not increase it. A simple check here solved
|
|
// this problem and keeps performance high.
|
|
// Also, don't report values of 1, as the benefit is minimal and this might
|
|
// be commonly triggered due to the buffer of the force report channel.
|
|
if toReport <= 1 {
|
|
return 0
|
|
}
|
|
|
|
// Add space to report to dfq.reportedSpace and return it.
|
|
atomic.AddInt32(dfq.reportedSpace, toReport)
|
|
return toReport
|
|
}
|
|
|
|
// FlowHandler handles all flow queue internals and must be started as a worker
|
|
// in the module where it is used.
|
|
func (dfq *DuplexFlowQueue) FlowHandler(_ *mgr.WorkerCtx) error {
|
|
// The upstreamSender is started by the terminal module, but is tied to the
|
|
// flow owner instead. Make sure that the flow owner's module depends on the
|
|
// terminal module so that it is shut down earlier.
|
|
|
|
var sendSpaceDepleted bool
|
|
var flushFinished func()
|
|
|
|
// Drain all queues when shutting down.
|
|
defer func() {
|
|
for {
|
|
select {
|
|
case msg := <-dfq.sendQueue:
|
|
msg.Finish()
|
|
case msg := <-dfq.recvQueue:
|
|
msg.Finish()
|
|
default:
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
|
|
sending:
|
|
for {
|
|
// If the send queue is depleted, wait to be woken.
|
|
if sendSpaceDepleted {
|
|
select {
|
|
case <-dfq.wakeSender:
|
|
if dfq.getSendSpace() > 0 {
|
|
sendSpaceDepleted = false
|
|
} else {
|
|
continue sending
|
|
}
|
|
|
|
case <-dfq.forceSpaceReport:
|
|
// Forced reporting of space.
|
|
// We do not need to check if there is enough sending space, as there is
|
|
// no data included.
|
|
spaceToReport := dfq.reportableRecvSpace()
|
|
if spaceToReport > 0 {
|
|
msg := NewMsg(varint.Pack64(uint64(spaceToReport)))
|
|
dfq.submitUpstream(msg, 0)
|
|
}
|
|
continue sending
|
|
|
|
case <-dfq.ctx.Done():
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// Get message from send queue.
|
|
|
|
select {
|
|
case dfq.readyToSend <- struct{}{}:
|
|
// Notify that we are ready to send.
|
|
|
|
case msg := <-dfq.sendQueue:
|
|
// Send message from queue.
|
|
|
|
// If nil, the queue is being shut down.
|
|
if msg == nil {
|
|
return nil
|
|
}
|
|
|
|
// Check if we are handling a high priority message or waiting for one.
|
|
// Mark any msgs as high priority, when there is one in the pipeline.
|
|
remainingPrioMsgs := atomic.AddInt32(dfq.prioMsgs, -1)
|
|
switch {
|
|
case remainingPrioMsgs >= 0:
|
|
msg.Unit.MakeHighPriority()
|
|
case remainingPrioMsgs < -30_000:
|
|
// Prevent wrap to positive.
|
|
// Compatible with int16 or bigger.
|
|
atomic.StoreInt32(dfq.prioMsgs, 0)
|
|
}
|
|
|
|
// Wait for processing slot.
|
|
msg.Unit.WaitForSlot()
|
|
|
|
// Prepend available receiving space.
|
|
msg.Data.Prepend(varint.Pack64(uint64(dfq.reportableRecvSpace())))
|
|
|
|
// Submit for sending upstream.
|
|
dfq.submitUpstream(msg, 0)
|
|
// Decrease the send space and set flag if depleted.
|
|
if dfq.decrementSendSpace() <= 0 {
|
|
sendSpaceDepleted = true
|
|
}
|
|
|
|
// Check if the send queue is empty now and signal flushers.
|
|
if flushFinished != nil && len(dfq.sendQueue) == 0 {
|
|
flushFinished()
|
|
flushFinished = nil
|
|
}
|
|
|
|
case <-dfq.forceSpaceReport:
|
|
// Forced reporting of space.
|
|
// We do not need to check if there is enough sending space, as there is
|
|
// no data included.
|
|
spaceToReport := dfq.reportableRecvSpace()
|
|
if spaceToReport > 0 {
|
|
msg := NewMsg(varint.Pack64(uint64(spaceToReport)))
|
|
dfq.submitUpstream(msg, 0)
|
|
}
|
|
|
|
case newFlushFinishedFn := <-dfq.flush:
|
|
// Signal immediately if send queue is empty.
|
|
if len(dfq.sendQueue) == 0 {
|
|
newFlushFinishedFn()
|
|
} else {
|
|
// If there already is a flush finished function, stack them.
|
|
if flushFinished != nil {
|
|
stackedFlushFinishFn := flushFinished
|
|
flushFinished = func() {
|
|
stackedFlushFinishFn()
|
|
newFlushFinishedFn()
|
|
}
|
|
} else {
|
|
flushFinished = newFlushFinishedFn
|
|
}
|
|
}
|
|
|
|
case <-dfq.ctx.Done():
|
|
return nil
|
|
}
|
|
}
|
|
}
|
|
|
|
// Flush waits for all waiting data to be sent.
|
|
func (dfq *DuplexFlowQueue) Flush(timeout time.Duration) {
|
|
// Create channel and function for notifying.
|
|
wait := make(chan struct{})
|
|
finished := func() {
|
|
close(wait)
|
|
}
|
|
// Request flush and return when stopping.
|
|
select {
|
|
case dfq.flush <- finished:
|
|
case <-dfq.ctx.Done():
|
|
return
|
|
case <-TimedOut(timeout):
|
|
return
|
|
}
|
|
// Wait for flush to finish and return when stopping.
|
|
select {
|
|
case <-wait:
|
|
case <-dfq.ctx.Done():
|
|
case <-TimedOut(timeout):
|
|
}
|
|
}
|
|
|
|
var ready = make(chan struct{})
|
|
|
|
func init() {
|
|
close(ready)
|
|
}
|
|
|
|
// ReadyToSend returns a channel that can be read when data can be sent.
|
|
func (dfq *DuplexFlowQueue) ReadyToSend() <-chan struct{} {
|
|
if atomic.LoadInt32(dfq.sendSpace) > 0 {
|
|
return ready
|
|
}
|
|
return dfq.readyToSend
|
|
}
|
|
|
|
// Send adds the given container to the send queue.
|
|
func (dfq *DuplexFlowQueue) Send(msg *Msg, timeout time.Duration) *Error {
|
|
select {
|
|
case dfq.sendQueue <- msg:
|
|
if msg.Unit.IsHighPriority() {
|
|
// Reset prioMsgs to the current queue size, so that all waiting and the
|
|
// message we just added are all handled as high priority.
|
|
atomic.StoreInt32(dfq.prioMsgs, int32(len(dfq.sendQueue)))
|
|
}
|
|
return nil
|
|
|
|
case <-TimedOut(timeout):
|
|
msg.Finish()
|
|
return ErrTimeout
|
|
|
|
case <-dfq.ctx.Done():
|
|
msg.Finish()
|
|
return ErrStopping
|
|
}
|
|
}
|
|
|
|
// Receive receives a container from the recv queue.
|
|
func (dfq *DuplexFlowQueue) Receive() <-chan *Msg {
|
|
// If the reported recv space is nearing its end, force a report.
|
|
if dfq.shouldReportRecvSpace() {
|
|
select {
|
|
case dfq.forceSpaceReport <- struct{}{}:
|
|
default:
|
|
}
|
|
}
|
|
|
|
return dfq.recvQueue
|
|
}
|
|
|
|
// Deliver submits a container for receiving from upstream.
|
|
func (dfq *DuplexFlowQueue) Deliver(msg *Msg) *Error {
|
|
// Ignore nil containers.
|
|
if msg == nil || msg.Data == nil {
|
|
msg.Finish()
|
|
return ErrMalformedData.With("no data")
|
|
}
|
|
|
|
// Get and add new reported space.
|
|
addSpace, err := msg.Data.GetNextN16()
|
|
if err != nil {
|
|
msg.Finish()
|
|
return ErrMalformedData.With("failed to parse reported space: %w", err)
|
|
}
|
|
if addSpace > 0 {
|
|
dfq.addToSendSpace(int32(addSpace))
|
|
}
|
|
// Abort processing if the container only contained a space update.
|
|
if !msg.Data.HoldsData() {
|
|
msg.Finish()
|
|
return nil
|
|
}
|
|
|
|
select {
|
|
case dfq.recvQueue <- msg:
|
|
|
|
// If the recv queue accepted the Container, decrement the recv space.
|
|
shouldReportRecvSpace := dfq.decrementReportedRecvSpace()
|
|
// If the reported recv space is nearing its end, force a report, if the
|
|
// sender worker is idle.
|
|
if shouldReportRecvSpace {
|
|
select {
|
|
case dfq.forceSpaceReport <- struct{}{}:
|
|
default:
|
|
}
|
|
}
|
|
|
|
return nil
|
|
default:
|
|
// If the recv queue is full, return an error.
|
|
// The whole point of the flow queue is to guarantee that this never happens.
|
|
msg.Finish()
|
|
return ErrQueueOverflow
|
|
}
|
|
}
|
|
|
|
// FlowStats returns a k=v formatted string of internal stats.
|
|
func (dfq *DuplexFlowQueue) FlowStats() string {
|
|
return fmt.Sprintf(
|
|
"sq=%d rq=%d sends=%d reps=%d",
|
|
len(dfq.sendQueue),
|
|
len(dfq.recvQueue),
|
|
atomic.LoadInt32(dfq.sendSpace),
|
|
atomic.LoadInt32(dfq.reportedSpace),
|
|
)
|
|
}
|
|
|
|
// RecvQueueLen returns the current length of the receive queue.
|
|
func (dfq *DuplexFlowQueue) RecvQueueLen() int {
|
|
return len(dfq.recvQueue)
|
|
}
|
|
|
|
// SendQueueLen returns the current length of the send queue.
|
|
func (dfq *DuplexFlowQueue) SendQueueLen() int {
|
|
return len(dfq.sendQueue)
|
|
}
|