mirror of
https://github.com/safing/portmaster
synced 2025-04-21 03:19:10 +00:00
* Move portbase into monorepo * Add new simple module mgr * [WIP] Switch to new simple module mgr * Add StateMgr and more worker variants * [WIP] Switch more modules * [WIP] Switch more modules * [WIP] swtich more modules * [WIP] switch all SPN modules * [WIP] switch all service modules * [WIP] Convert all workers to the new module system * [WIP] add new task system to module manager * [WIP] Add second take for scheduling workers * [WIP] Add FIXME for bugs in new scheduler * [WIP] Add minor improvements to scheduler * [WIP] Add new worker scheduler * [WIP] Fix more bug related to new module system * [WIP] Fix start handing of the new module system * [WIP] Improve startup process * [WIP] Fix minor issues * [WIP] Fix missing subsystem in settings * [WIP] Initialize managers in constructor * [WIP] Move module event initialization to constrictors * [WIP] Fix setting for enabling and disabling the SPN module * [WIP] Move API registeration into module construction * [WIP] Update states mgr for all modules * [WIP] Add CmdLine operation support * Add state helper methods to module group and instance * Add notification and module status handling to status package * Fix starting issues * Remove pilot widget and update security lock to new status data * Remove debug logs * Improve http server shutdown * Add workaround for cleanly shutting down firewall+netquery * Improve logging * Add syncing states with notifications for new module system * Improve starting, stopping, shutdown; resolve FIXMEs/TODOs * [WIP] Fix most unit tests * Review new module system and fix minor issues * Push shutdown and restart events again via API * Set sleep mode via interface * Update example/template module * [WIP] Fix spn/cabin unit test * Remove deprecated UI elements * Make log output more similar for the logging transition phase * Switch spn hub and observer cmds to new module system * Fix log sources * Make worker mgr less error prone * Fix tests and minor issues * Fix observation hub * Improve shutdown and restart handling * Split up big connection.go source file * Move varint and dsd packages to structures repo * Improve expansion test * Fix linter warnings * Fix interception module on windows * Fix linter errors --------- Co-authored-by: Vladimir Stoilov <vladimir@safing.io>
909 lines
24 KiB
Go
909 lines
24 KiB
Go
package terminal
|
|
|
|
import (
|
|
"context"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/tevino/abool"
|
|
|
|
"github.com/safing/jess"
|
|
"github.com/safing/portmaster/base/log"
|
|
"github.com/safing/portmaster/base/rng"
|
|
"github.com/safing/portmaster/service/mgr"
|
|
"github.com/safing/portmaster/spn/cabin"
|
|
"github.com/safing/portmaster/spn/conf"
|
|
"github.com/safing/structures/container"
|
|
)
|
|
|
|
const (
|
|
timeoutTicks = 5
|
|
|
|
clientTerminalAbandonTimeout = 15 * time.Second
|
|
serverTerminalAbandonTimeout = 5 * time.Minute
|
|
)
|
|
|
|
// Terminal represents a terminal.
|
|
type Terminal interface { //nolint:golint // Being explicit is helpful here.
|
|
// ID returns the terminal ID.
|
|
ID() uint32
|
|
// Ctx returns the terminal context.
|
|
Ctx() context.Context
|
|
|
|
// Deliver delivers a message to the terminal.
|
|
// Should not be overridden by implementations.
|
|
Deliver(msg *Msg) *Error
|
|
// Send is used by others to send a message through the terminal.
|
|
// Should not be overridden by implementations.
|
|
Send(msg *Msg, timeout time.Duration) *Error
|
|
// Flush sends all messages waiting in the terminal.
|
|
// Should not be overridden by implementations.
|
|
Flush(timeout time.Duration)
|
|
|
|
// StartOperation starts the given operation by assigning it an ID and sending the given operation initialization data.
|
|
// Should not be overridden by implementations.
|
|
StartOperation(op Operation, initData *container.Container, timeout time.Duration) *Error
|
|
// StopOperation stops the given operation.
|
|
// Should not be overridden by implementations.
|
|
StopOperation(op Operation, err *Error)
|
|
|
|
// Abandon shuts down the terminal unregistering it from upstream and calling HandleAbandon().
|
|
// Should not be overridden by implementations.
|
|
Abandon(err *Error)
|
|
// HandleAbandon gives the terminal the ability to cleanly shut down.
|
|
// The terminal is still fully functional at this point.
|
|
// The returned error is the error to send to the other side.
|
|
// Should never be called directly. Call Abandon() instead.
|
|
// Meant to be overridden by implementations.
|
|
HandleAbandon(err *Error) (errorToSend *Error)
|
|
// HandleDestruction gives the terminal the ability to clean up.
|
|
// The terminal has already fully shut down at this point.
|
|
// Should never be called directly. Call Abandon() instead.
|
|
// Meant to be overridden by implementations.
|
|
HandleDestruction(err *Error)
|
|
|
|
// FmtID formats the terminal ID (including parent IDs).
|
|
// May be overridden by implementations.
|
|
FmtID() string
|
|
}
|
|
|
|
// TerminalBase contains the basic functions of a terminal.
|
|
type TerminalBase struct { //nolint:golint,maligned // Being explicit is helpful here.
|
|
// TODO: Fix maligned.
|
|
Terminal // Interface check.
|
|
|
|
lock sync.RWMutex
|
|
|
|
// id is the underlying id of the Terminal.
|
|
id uint32
|
|
// parentID is the id of the parent component.
|
|
parentID string
|
|
|
|
// ext holds the extended terminal so that the base terminal can access custom functions.
|
|
ext Terminal
|
|
// sendQueue holds message to be sent.
|
|
sendQueue chan *Msg
|
|
// flowControl holds the flow control system.
|
|
flowControl FlowControl
|
|
// upstream represents the upstream (parent) terminal.
|
|
upstream Upstream
|
|
|
|
// deliverProxy is populated with the configured deliver function
|
|
deliverProxy func(msg *Msg) *Error
|
|
// recvProxy is populated with the configured recv function
|
|
recvProxy func() <-chan *Msg
|
|
|
|
// ctx is the context of the Terminal.
|
|
ctx context.Context
|
|
// cancelCtx cancels ctx.
|
|
cancelCtx context.CancelFunc
|
|
|
|
// waitForFlush signifies if sending should be delayed until the next call
|
|
// to Flush()
|
|
waitForFlush *abool.AtomicBool
|
|
// flush is used to send a finish function to the handler, which will write
|
|
// all pending messages and then call the received function.
|
|
flush chan func()
|
|
// idleTicker ticks for increasing and checking the idle counter.
|
|
idleTicker *time.Ticker
|
|
// idleCounter counts the ticks the terminal has been idle.
|
|
idleCounter *uint32
|
|
|
|
// jession is the jess session used for encryption.
|
|
jession *jess.Session
|
|
// jessionLock locks jession.
|
|
jessionLock sync.Mutex
|
|
// encryptionReady is set when the encryption is ready for sending messages.
|
|
encryptionReady chan struct{}
|
|
// identity is the identity used by a remote Terminal.
|
|
identity *cabin.Identity
|
|
|
|
// operations holds references to all active operations that require persistence.
|
|
operations map[uint32]Operation
|
|
// nextOpID holds the next operation ID.
|
|
nextOpID *uint32
|
|
// permission holds the permissions of the terminal.
|
|
permission Permission
|
|
|
|
// opts holds the terminal options. It must not be modified after the terminal
|
|
// has started.
|
|
opts *TerminalOpts
|
|
|
|
// lastUnknownOpID holds the operation ID of the last data message received
|
|
// for an unknown operation ID.
|
|
lastUnknownOpID uint32
|
|
// lastUnknownOpMsgs holds the amount of continuous data messages received
|
|
// for the operation ID in lastUnknownOpID.
|
|
lastUnknownOpMsgs uint32
|
|
|
|
// Abandoning indicates if the Terminal is being abandoned. The main handlers
|
|
// will keep running until the context has been canceled by the abandon
|
|
// procedure.
|
|
// No new operations should be started.
|
|
// Whoever initiates the abandoning must also start the abandon procedure.
|
|
Abandoning *abool.AtomicBool
|
|
}
|
|
|
|
func createTerminalBase(
|
|
ctx context.Context,
|
|
id uint32,
|
|
parentID string,
|
|
remote bool,
|
|
initMsg *TerminalOpts,
|
|
upstream Upstream,
|
|
) (*TerminalBase, *Error) {
|
|
t := &TerminalBase{
|
|
id: id,
|
|
parentID: parentID,
|
|
sendQueue: make(chan *Msg),
|
|
upstream: upstream,
|
|
waitForFlush: abool.New(),
|
|
flush: make(chan func()),
|
|
idleTicker: time.NewTicker(time.Minute),
|
|
idleCounter: new(uint32),
|
|
encryptionReady: make(chan struct{}),
|
|
operations: make(map[uint32]Operation),
|
|
nextOpID: new(uint32),
|
|
opts: initMsg,
|
|
Abandoning: abool.New(),
|
|
}
|
|
// Stop ticking to disable timeout.
|
|
t.idleTicker.Stop()
|
|
// Shift next operation ID if remote.
|
|
if remote {
|
|
atomic.AddUint32(t.nextOpID, 4)
|
|
}
|
|
// Create context.
|
|
t.ctx, t.cancelCtx = context.WithCancel(ctx)
|
|
|
|
// Create flow control.
|
|
switch initMsg.FlowControl {
|
|
case FlowControlDFQ:
|
|
t.flowControl = NewDuplexFlowQueue(t.Ctx(), initMsg.FlowControlSize, t.submitToUpstream)
|
|
t.deliverProxy = t.flowControl.Deliver
|
|
t.recvProxy = t.flowControl.Receive
|
|
case FlowControlNone:
|
|
deliver := make(chan *Msg, initMsg.FlowControlSize)
|
|
t.deliverProxy = MakeDirectDeliveryDeliverFunc(ctx, deliver)
|
|
t.recvProxy = MakeDirectDeliveryRecvFunc(deliver)
|
|
case FlowControlDefault:
|
|
fallthrough
|
|
default:
|
|
return nil, ErrInternalError.With("unknown flow control type %d", initMsg.FlowControl)
|
|
}
|
|
|
|
return t, nil
|
|
}
|
|
|
|
// ID returns the Terminal's ID.
|
|
func (t *TerminalBase) ID() uint32 {
|
|
return t.id
|
|
}
|
|
|
|
// Ctx returns the Terminal's context.
|
|
func (t *TerminalBase) Ctx() context.Context {
|
|
return t.ctx
|
|
}
|
|
|
|
// SetTerminalExtension sets the Terminal's extension. This function is not
|
|
// guarded and may only be used during initialization.
|
|
func (t *TerminalBase) SetTerminalExtension(ext Terminal) {
|
|
t.ext = ext
|
|
}
|
|
|
|
// SetTimeout sets the Terminal's idle timeout duration.
|
|
// It is broken down into slots internally.
|
|
func (t *TerminalBase) SetTimeout(d time.Duration) {
|
|
t.idleTicker.Reset(d / timeoutTicks)
|
|
}
|
|
|
|
// Deliver on TerminalBase only exists to conform to the interface. It must be
|
|
// overridden by an actual implementation.
|
|
func (t *TerminalBase) Deliver(msg *Msg) *Error {
|
|
// Deliver via configured proxy.
|
|
err := t.deliverProxy(msg)
|
|
if err != nil {
|
|
msg.Finish()
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
// StartWorkers starts the necessary workers to operate the Terminal.
|
|
func (t *TerminalBase) StartWorkers(m *mgr.Manager, terminalName string) {
|
|
// Start terminal workers.
|
|
m.Go(terminalName+" handler", t.Handler)
|
|
m.Go(terminalName+" sender", t.Sender)
|
|
|
|
// Start any flow control workers.
|
|
if t.flowControl != nil {
|
|
t.flowControl.StartWorkers(m, terminalName)
|
|
}
|
|
}
|
|
|
|
const (
|
|
sendThresholdLength = 100 // bytes
|
|
sendMaxLength = 4000 // bytes
|
|
sendThresholdMaxWait = 20 * time.Millisecond
|
|
)
|
|
|
|
// Handler receives and handles messages and must be started as a worker in the
|
|
// module where the Terminal is used.
|
|
func (t *TerminalBase) Handler(_ *mgr.WorkerCtx) error {
|
|
defer t.Abandon(ErrInternalError.With("handler died"))
|
|
|
|
var msg *Msg
|
|
defer msg.Finish()
|
|
|
|
for {
|
|
select {
|
|
case <-t.ctx.Done():
|
|
// Call Abandon just in case.
|
|
// Normally, only the StopProcedure function should cancel the context.
|
|
t.Abandon(nil)
|
|
return nil // Controlled worker exit.
|
|
|
|
case <-t.idleTicker.C:
|
|
// If nothing happens for a while, end the session.
|
|
if atomic.AddUint32(t.idleCounter, 1) > timeoutTicks {
|
|
// Abandon the terminal and reset the counter.
|
|
t.Abandon(ErrNoActivity)
|
|
atomic.StoreUint32(t.idleCounter, 0)
|
|
}
|
|
|
|
case msg = <-t.recvProxy():
|
|
err := t.handleReceive(msg)
|
|
if err != nil {
|
|
t.Abandon(err.Wrap("failed to handle"))
|
|
return nil
|
|
}
|
|
|
|
// Register activity.
|
|
atomic.StoreUint32(t.idleCounter, 0)
|
|
}
|
|
}
|
|
}
|
|
|
|
// submit is used to send message from the terminal to upstream, including
|
|
// going through flow control, if configured.
|
|
// This function should be used to send message from the terminal to upstream.
|
|
func (t *TerminalBase) submit(msg *Msg, timeout time.Duration) {
|
|
// Submit directly if no flow control is configured.
|
|
if t.flowControl == nil {
|
|
t.submitToUpstream(msg, timeout)
|
|
return
|
|
}
|
|
|
|
// Hand over to flow control.
|
|
err := t.flowControl.Send(msg, timeout)
|
|
if err != nil {
|
|
msg.Finish()
|
|
t.Abandon(err.Wrap("failed to submit to flow control"))
|
|
}
|
|
}
|
|
|
|
// submitToUpstream is used to directly submit messages to upstream.
|
|
// This function should only be used by the flow control or submit function.
|
|
func (t *TerminalBase) submitToUpstream(msg *Msg, timeout time.Duration) {
|
|
// Add terminal ID as flow ID.
|
|
msg.FlowID = t.ID()
|
|
|
|
// Debug unit leaks.
|
|
msg.debugWithCaller(2)
|
|
|
|
// Submit to upstream.
|
|
err := t.upstream.Send(msg, timeout)
|
|
if err != nil {
|
|
msg.Finish()
|
|
t.Abandon(err.Wrap("failed to submit to upstream"))
|
|
}
|
|
}
|
|
|
|
// Sender handles sending messages and must be started as a worker in the
|
|
// module where the Terminal is used.
|
|
func (t *TerminalBase) Sender(_ *mgr.WorkerCtx) error {
|
|
// Don't send messages, if the encryption is net yet set up.
|
|
// The server encryption session is only initialized with the first
|
|
// operative message, not on Terminal creation.
|
|
if t.opts.Encrypt {
|
|
select {
|
|
case <-t.ctx.Done():
|
|
// Call Abandon just in case.
|
|
// Normally, the only the StopProcedure function should cancel the context.
|
|
t.Abandon(nil)
|
|
return nil // Controlled worker exit.
|
|
case <-t.encryptionReady:
|
|
}
|
|
}
|
|
|
|
// Be sure to call Stop even in case of sudden death.
|
|
defer t.Abandon(ErrInternalError.With("sender died"))
|
|
|
|
var msgBufferMsg *Msg
|
|
var msgBufferLen int
|
|
var msgBufferLimitReached bool
|
|
var sendMsgs bool
|
|
var sendMaxWait *time.Timer
|
|
var flushFinished func()
|
|
|
|
// Finish any current unit when returning.
|
|
defer msgBufferMsg.Finish()
|
|
|
|
// Only receive message when not sending the current msg buffer.
|
|
sendQueueOpMsgs := func() <-chan *Msg {
|
|
// Don't handle more messages, if the buffer is full.
|
|
if msgBufferLimitReached {
|
|
return nil
|
|
}
|
|
return t.sendQueue
|
|
}
|
|
|
|
// Only wait for sending slot when the current msg buffer is ready to be sent.
|
|
readyToSend := func() <-chan struct{} {
|
|
switch {
|
|
case !sendMsgs:
|
|
// Wait until there is something to send.
|
|
return nil
|
|
case t.flowControl != nil:
|
|
// Let flow control decide when we are ready.
|
|
return t.flowControl.ReadyToSend()
|
|
default:
|
|
// Always ready.
|
|
return ready
|
|
}
|
|
}
|
|
|
|
// Calculate current max wait time to send the msg buffer.
|
|
getSendMaxWait := func() <-chan time.Time {
|
|
if sendMaxWait != nil {
|
|
return sendMaxWait.C
|
|
}
|
|
return nil
|
|
}
|
|
|
|
handling:
|
|
for {
|
|
select {
|
|
case <-t.ctx.Done():
|
|
// Call Stop just in case.
|
|
// Normally, the only the StopProcedure function should cancel the context.
|
|
t.Abandon(nil)
|
|
return nil // Controlled worker exit.
|
|
|
|
case <-t.idleTicker.C:
|
|
// If nothing happens for a while, end the session.
|
|
if atomic.AddUint32(t.idleCounter, 1) > timeoutTicks {
|
|
// Abandon the terminal and reset the counter.
|
|
t.Abandon(ErrNoActivity)
|
|
atomic.StoreUint32(t.idleCounter, 0)
|
|
}
|
|
|
|
case msg := <-sendQueueOpMsgs():
|
|
if msg == nil {
|
|
continue handling
|
|
}
|
|
|
|
// Add unit to buffer unit, or use it as new buffer.
|
|
if msgBufferMsg != nil {
|
|
// Pack, append and finish additional message.
|
|
msgBufferMsg.Consume(msg)
|
|
} else {
|
|
// Pack operation message.
|
|
msg.Pack()
|
|
// Convert to message of terminal.
|
|
msgBufferMsg = msg
|
|
msgBufferMsg.FlowID = t.ID()
|
|
msgBufferMsg.Type = MsgTypeData
|
|
}
|
|
msgBufferLen += msg.Data.Length()
|
|
|
|
// Check if there is enough data to hit the sending threshold.
|
|
if msgBufferLen >= sendThresholdLength {
|
|
sendMsgs = true
|
|
} else if sendMaxWait == nil && t.waitForFlush.IsNotSet() {
|
|
sendMaxWait = time.NewTimer(sendThresholdMaxWait)
|
|
}
|
|
|
|
// Check if we have reached the maximum buffer size.
|
|
if msgBufferLen >= sendMaxLength {
|
|
msgBufferLimitReached = true
|
|
}
|
|
|
|
// Register activity.
|
|
atomic.StoreUint32(t.idleCounter, 0)
|
|
|
|
case <-getSendMaxWait():
|
|
// The timer for waiting for more data has ended.
|
|
// Send all available data if not forced to wait for a flush.
|
|
if t.waitForFlush.IsNotSet() {
|
|
sendMsgs = true
|
|
}
|
|
|
|
case newFlushFinishedFn := <-t.flush:
|
|
// We are flushing - stop waiting.
|
|
t.waitForFlush.UnSet()
|
|
|
|
// Signal immediately if msg buffer is empty.
|
|
if msgBufferLen == 0 {
|
|
newFlushFinishedFn()
|
|
} else {
|
|
// If there already is a flush finished function, stack them.
|
|
if flushFinished != nil {
|
|
stackedFlushFinishFn := flushFinished
|
|
flushFinished = func() {
|
|
stackedFlushFinishFn()
|
|
newFlushFinishedFn()
|
|
}
|
|
} else {
|
|
flushFinished = newFlushFinishedFn
|
|
}
|
|
}
|
|
|
|
// Force sending data now.
|
|
sendMsgs = true
|
|
|
|
case <-readyToSend():
|
|
// Reset sending flags.
|
|
sendMsgs = false
|
|
msgBufferLimitReached = false
|
|
|
|
// Send if there is anything to send.
|
|
var err *Error
|
|
if msgBufferLen > 0 {
|
|
// Update message type to include priority.
|
|
if msgBufferMsg.Type == MsgTypeData &&
|
|
msgBufferMsg.Unit.IsHighPriority() &&
|
|
t.opts.UsePriorityDataMsgs {
|
|
msgBufferMsg.Type = MsgTypePriorityData
|
|
}
|
|
|
|
// Wait for clearance on initial msg only.
|
|
msgBufferMsg.Unit.WaitForSlot()
|
|
|
|
err = t.sendOpMsgs(msgBufferMsg)
|
|
}
|
|
|
|
// Reset buffer.
|
|
msgBufferMsg = nil
|
|
msgBufferLen = 0
|
|
|
|
// Reset send wait timer.
|
|
if sendMaxWait != nil {
|
|
sendMaxWait.Stop()
|
|
sendMaxWait = nil
|
|
}
|
|
|
|
// Check if we are flushing and need to notify.
|
|
if flushFinished != nil {
|
|
flushFinished()
|
|
flushFinished = nil
|
|
}
|
|
|
|
// Handle error after state updates.
|
|
if err != nil {
|
|
t.Abandon(err.With("failed to send"))
|
|
continue handling
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// WaitForFlush makes the terminal pause all sending until the next call to
|
|
// Flush().
|
|
func (t *TerminalBase) WaitForFlush() {
|
|
t.waitForFlush.Set()
|
|
}
|
|
|
|
// Flush sends all data waiting to be sent.
|
|
func (t *TerminalBase) Flush(timeout time.Duration) {
|
|
// Create channel and function for notifying.
|
|
wait := make(chan struct{})
|
|
finished := func() {
|
|
close(wait)
|
|
}
|
|
// Request flush and return when stopping.
|
|
select {
|
|
case t.flush <- finished:
|
|
case <-t.Ctx().Done():
|
|
return
|
|
case <-TimedOut(timeout):
|
|
return
|
|
}
|
|
// Wait for flush to finish and return when stopping.
|
|
select {
|
|
case <-wait:
|
|
case <-t.Ctx().Done():
|
|
return
|
|
case <-TimedOut(timeout):
|
|
return
|
|
}
|
|
|
|
// Flush flow control, if configured.
|
|
if t.flowControl != nil {
|
|
t.flowControl.Flush(timeout)
|
|
}
|
|
}
|
|
|
|
func (t *TerminalBase) encrypt(c *container.Container) (*container.Container, *Error) {
|
|
if !t.opts.Encrypt {
|
|
return c, nil
|
|
}
|
|
|
|
t.jessionLock.Lock()
|
|
defer t.jessionLock.Unlock()
|
|
|
|
letter, err := t.jession.Close(c.CompileData())
|
|
if err != nil {
|
|
return nil, ErrIntegrity.With("failed to encrypt: %w", err)
|
|
}
|
|
|
|
encryptedData, err := letter.ToWire()
|
|
if err != nil {
|
|
return nil, ErrInternalError.With("failed to pack letter: %w", err)
|
|
}
|
|
|
|
return encryptedData, nil
|
|
}
|
|
|
|
func (t *TerminalBase) decrypt(c *container.Container) (*container.Container, *Error) {
|
|
if !t.opts.Encrypt {
|
|
return c, nil
|
|
}
|
|
|
|
t.jessionLock.Lock()
|
|
defer t.jessionLock.Unlock()
|
|
|
|
letter, err := jess.LetterFromWire(c)
|
|
if err != nil {
|
|
return nil, ErrMalformedData.With("failed to parse letter: %w", err)
|
|
}
|
|
|
|
// Setup encryption if not yet done.
|
|
if t.jession == nil {
|
|
if t.identity == nil {
|
|
return nil, ErrInternalError.With("missing identity for setting up incoming encryption")
|
|
}
|
|
|
|
// Create jess session.
|
|
t.jession, err = letter.WireCorrespondence(t.identity)
|
|
if err != nil {
|
|
return nil, ErrIntegrity.With("failed to initialize incoming encryption: %w", err)
|
|
}
|
|
|
|
// Don't need that anymore.
|
|
t.identity = nil
|
|
|
|
// Encryption is ready for sending.
|
|
close(t.encryptionReady)
|
|
}
|
|
|
|
decryptedData, err := t.jession.Open(letter)
|
|
if err != nil {
|
|
return nil, ErrIntegrity.With("failed to decrypt: %w", err)
|
|
}
|
|
|
|
return container.New(decryptedData), nil
|
|
}
|
|
|
|
func (t *TerminalBase) handleReceive(msg *Msg) *Error {
|
|
msg.Unit.WaitForSlot()
|
|
defer msg.Finish()
|
|
|
|
// Debugging:
|
|
// log.Errorf("spn/terminal %s handling tmsg: %s", t.FmtID(), spew.Sdump(c.CompileData()))
|
|
|
|
// Check if message is empty. This will be the case if a message was only
|
|
// for updated the available space of the flow queue.
|
|
if !msg.Data.HoldsData() {
|
|
return nil
|
|
}
|
|
|
|
// Decrypt if enabled.
|
|
var tErr *Error
|
|
msg.Data, tErr = t.decrypt(msg.Data)
|
|
if tErr != nil {
|
|
return tErr
|
|
}
|
|
|
|
// Handle operation messages.
|
|
for msg.Data.HoldsData() {
|
|
// Get next message length.
|
|
msgLength, err := msg.Data.GetNextN32()
|
|
if err != nil {
|
|
return ErrMalformedData.With("failed to get operation msg length: %w", err)
|
|
}
|
|
if msgLength == 0 {
|
|
// Remainder is padding.
|
|
// Padding can only be at the end of the segment.
|
|
t.handlePaddingMsg(msg.Data)
|
|
return nil
|
|
}
|
|
|
|
// Get op msg data.
|
|
msgData, err := msg.Data.GetAsContainer(int(msgLength))
|
|
if err != nil {
|
|
return ErrMalformedData.With("failed to get operation msg data (%d/%d bytes): %w", msg.Data.Length(), msgLength, err)
|
|
}
|
|
|
|
// Handle op msg.
|
|
if handleErr := t.handleOpMsg(msgData); handleErr != nil {
|
|
return handleErr
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (t *TerminalBase) handleOpMsg(data *container.Container) *Error {
|
|
// Debugging:
|
|
// log.Errorf("spn/terminal %s handling opmsg: %s", t.FmtID(), spew.Sdump(data.CompileData()))
|
|
|
|
// Parse message operation id, type.
|
|
opID, msgType, err := ParseIDType(data)
|
|
if err != nil {
|
|
return ErrMalformedData.With("failed to parse operation msg id/type: %w", err)
|
|
}
|
|
|
|
switch msgType {
|
|
case MsgTypeInit:
|
|
t.handleOperationStart(opID, data)
|
|
|
|
case MsgTypeData, MsgTypePriorityData:
|
|
op, ok := t.GetActiveOp(opID)
|
|
if ok && !op.Stopped() {
|
|
// Create message from data.
|
|
msg := NewEmptyMsg()
|
|
msg.FlowID = opID
|
|
msg.Type = msgType
|
|
msg.Data = data
|
|
if msg.Type == MsgTypePriorityData {
|
|
msg.Unit.MakeHighPriority()
|
|
}
|
|
|
|
// Deliver message to operation.
|
|
tErr := op.Deliver(msg)
|
|
if tErr != nil {
|
|
// Also stop on "success" errors!
|
|
msg.Finish()
|
|
t.StopOperation(op, tErr)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// If an active op is not found, this is likely just left-overs from a
|
|
// stopped or failed operation.
|
|
// log.Tracef("spn/terminal: %s received data msg for unknown op %d", fmtTerminalID(t.parentID, t.id), opID)
|
|
|
|
// Send a stop error if this happens too often.
|
|
if opID == t.lastUnknownOpID {
|
|
// OpID is the same as last time.
|
|
t.lastUnknownOpMsgs++
|
|
|
|
// Log an warning (via StopOperation) and send a stop message every thousand.
|
|
if t.lastUnknownOpMsgs%1000 == 0 {
|
|
t.StopOperation(newUnknownOp(opID, ""), ErrUnknownOperationID.With("received %d unsolicited data msgs", t.lastUnknownOpMsgs))
|
|
}
|
|
|
|
// TODO: Abandon terminal at over 10000?
|
|
} else {
|
|
// OpID changed, set new ID and reset counter.
|
|
t.lastUnknownOpID = opID
|
|
t.lastUnknownOpMsgs = 1
|
|
}
|
|
|
|
case MsgTypeStop:
|
|
// Parse received error.
|
|
opErr, parseErr := ParseExternalError(data.CompileData())
|
|
if parseErr != nil {
|
|
log.Warningf("spn/terminal: %s failed to parse stop error: %s", fmtTerminalID(t.parentID, t.id), parseErr)
|
|
opErr = ErrUnknownError.AsExternal()
|
|
}
|
|
|
|
// End operation.
|
|
op, ok := t.GetActiveOp(opID)
|
|
if ok {
|
|
t.StopOperation(op, opErr)
|
|
} else {
|
|
log.Tracef("spn/terminal: %s received stop msg for unknown op %d", fmtTerminalID(t.parentID, t.id), opID)
|
|
}
|
|
|
|
default:
|
|
log.Warningf("spn/terminal: %s received unexpected message type: %d", t.FmtID(), msgType)
|
|
return ErrUnexpectedMsgType
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (t *TerminalBase) handlePaddingMsg(c *container.Container) {
|
|
padding := c.GetAll()
|
|
if len(padding) > 0 {
|
|
rngFeeder.SupplyEntropyIfNeeded(padding, len(padding))
|
|
}
|
|
}
|
|
|
|
func (t *TerminalBase) sendOpMsgs(msg *Msg) *Error {
|
|
msg.Unit.WaitForSlot()
|
|
|
|
// Add Padding if needed.
|
|
if t.opts.Padding > 0 {
|
|
paddingNeeded := (int(t.opts.Padding) - msg.Data.Length()) % int(t.opts.Padding)
|
|
if paddingNeeded > 0 {
|
|
// Add padding message header.
|
|
msg.Data.Append([]byte{0})
|
|
paddingNeeded--
|
|
|
|
// Add needed padding data.
|
|
if paddingNeeded > 0 {
|
|
padding, err := rng.Bytes(paddingNeeded)
|
|
if err != nil {
|
|
log.Debugf("spn/terminal: %s failed to get random data, using zeros instead", t.FmtID())
|
|
padding = make([]byte, paddingNeeded)
|
|
}
|
|
msg.Data.Append(padding)
|
|
}
|
|
}
|
|
}
|
|
|
|
// Encrypt operative data.
|
|
var tErr *Error
|
|
msg.Data, tErr = t.encrypt(msg.Data)
|
|
if tErr != nil {
|
|
return tErr
|
|
}
|
|
|
|
// Send data.
|
|
t.submit(msg, 0)
|
|
return nil
|
|
}
|
|
|
|
// Abandon shuts down the terminal unregistering it from upstream and calling HandleAbandon().
|
|
// Should not be overridden by implementations.
|
|
func (t *TerminalBase) Abandon(err *Error) {
|
|
if t.Abandoning.SetToIf(false, true) {
|
|
module.mgr.Go("terminal abandon procedure", func(_ *mgr.WorkerCtx) error {
|
|
t.handleAbandonProcedure(err)
|
|
return nil
|
|
})
|
|
}
|
|
}
|
|
|
|
// HandleAbandon gives the terminal the ability to cleanly shut down.
|
|
// The returned error is the error to send to the other side.
|
|
// Should never be called directly. Call Abandon() instead.
|
|
// Meant to be overridden by implementations.
|
|
func (t *TerminalBase) HandleAbandon(err *Error) (errorToSend *Error) {
|
|
return err
|
|
}
|
|
|
|
// HandleDestruction gives the terminal the ability to clean up.
|
|
// The terminal has already fully shut down at this point.
|
|
// Should never be called directly. Call Abandon() instead.
|
|
// Meant to be overridden by implementations.
|
|
func (t *TerminalBase) HandleDestruction(err *Error) {}
|
|
|
|
func (t *TerminalBase) handleAbandonProcedure(err *Error) {
|
|
// End all operations.
|
|
for _, op := range t.allOps() {
|
|
t.StopOperation(op, nil)
|
|
}
|
|
|
|
// Prepare timeouts for waiting for ops.
|
|
timeout := clientTerminalAbandonTimeout
|
|
if conf.PublicHub() {
|
|
timeout = serverTerminalAbandonTimeout
|
|
}
|
|
checkTicker := time.NewTicker(50 * time.Millisecond)
|
|
defer checkTicker.Stop()
|
|
abortWaiting := time.After(timeout)
|
|
|
|
// Wait for all operations to end.
|
|
waitForOps:
|
|
for {
|
|
select {
|
|
case <-checkTicker.C:
|
|
if t.GetActiveOpCount() <= 0 {
|
|
break waitForOps
|
|
}
|
|
case <-abortWaiting:
|
|
log.Warningf(
|
|
"spn/terminal: terminal %s is continuing shutdown with %d active operations",
|
|
t.FmtID(),
|
|
t.GetActiveOpCount(),
|
|
)
|
|
break waitForOps
|
|
}
|
|
}
|
|
|
|
// Call operation stop handle function for proper shutdown cleaning up.
|
|
if t.ext != nil {
|
|
err = t.ext.HandleAbandon(err)
|
|
}
|
|
|
|
// Send error to the connected Operation, if the error is internal.
|
|
if !err.IsExternal() {
|
|
if err == nil {
|
|
err = ErrStopping
|
|
}
|
|
|
|
msg := NewMsg(err.Pack())
|
|
msg.FlowID = t.ID()
|
|
msg.Type = MsgTypeStop
|
|
t.submit(msg, 1*time.Second)
|
|
}
|
|
|
|
// If terminal was ended locally, send all data before abandoning.
|
|
// If terminal was ended remotely, don't bother sending remaining data.
|
|
if !err.IsExternal() {
|
|
// Flushing could mean sending a full buffer of 50000 packets.
|
|
t.Flush(5 * time.Minute)
|
|
}
|
|
|
|
// Stop all other connected workers.
|
|
t.cancelCtx()
|
|
t.idleTicker.Stop()
|
|
|
|
// Call operation destruction handle function for proper shutdown cleaning up.
|
|
if t.ext != nil {
|
|
t.ext.HandleDestruction(err)
|
|
}
|
|
}
|
|
|
|
func (t *TerminalBase) allOps() []Operation {
|
|
t.lock.Lock()
|
|
defer t.lock.Unlock()
|
|
|
|
ops := make([]Operation, 0, len(t.operations))
|
|
for _, op := range t.operations {
|
|
ops = append(ops, op)
|
|
}
|
|
|
|
return ops
|
|
}
|
|
|
|
// MakeDirectDeliveryDeliverFunc creates a submit upstream function with the
|
|
// given delivery channel.
|
|
func MakeDirectDeliveryDeliverFunc(
|
|
ctx context.Context,
|
|
deliver chan *Msg,
|
|
) func(c *Msg) *Error {
|
|
return func(c *Msg) *Error {
|
|
select {
|
|
case deliver <- c:
|
|
return nil
|
|
case <-ctx.Done():
|
|
return ErrStopping
|
|
}
|
|
}
|
|
}
|
|
|
|
// MakeDirectDeliveryRecvFunc makes a delivery receive function with the given
|
|
// delivery channel.
|
|
func MakeDirectDeliveryRecvFunc(
|
|
deliver chan *Msg,
|
|
) func() <-chan *Msg {
|
|
return func() <-chan *Msg {
|
|
return deliver
|
|
}
|
|
}
|