safing-portmaster/spn/terminal/operation_base.go

185 lines
4.9 KiB
Go

package terminal
import (
"time"
"github.com/tevino/abool"
)
// OperationBase provides the basic operation functionality.
type OperationBase struct {
terminal Terminal
id uint32
stopped abool.AtomicBool
}
// InitOperationBase initialize the operation with the ID and attached terminal.
// Should not be overridden by implementations.
func (op *OperationBase) InitOperationBase(t Terminal, opID uint32) {
op.id = opID
op.terminal = t
}
// ID returns the ID of the operation.
// Should not be overridden by implementations.
func (op *OperationBase) ID() uint32 {
return op.id
}
// Type returns the operation's type ID.
// Should be overridden by implementations to return correct type ID.
func (op *OperationBase) Type() string {
return "unknown"
}
// Deliver delivers a message to the operation.
// Meant to be overridden by implementations.
func (op *OperationBase) Deliver(_ *Msg) *Error {
return ErrIncorrectUsage.With("Deliver not implemented for this operation")
}
// NewMsg creates a new message from this operation.
// Should not be overridden by implementations.
func (op *OperationBase) NewMsg(data []byte) *Msg {
msg := NewMsg(data)
msg.FlowID = op.id
msg.Type = MsgTypeData
// Debug unit leaks.
msg.debugWithCaller(2)
return msg
}
// NewEmptyMsg creates a new empty message from this operation.
// Should not be overridden by implementations.
func (op *OperationBase) NewEmptyMsg() *Msg {
msg := NewEmptyMsg()
msg.FlowID = op.id
msg.Type = MsgTypeData
// Debug unit leaks.
msg.debugWithCaller(2)
return msg
}
// Send sends a message to the other side.
// Should not be overridden by implementations.
func (op *OperationBase) Send(msg *Msg, timeout time.Duration) *Error {
// Add and update metadata.
msg.FlowID = op.id
if msg.Type == MsgTypeData && msg.Unit.IsHighPriority() && UsePriorityDataMsgs {
msg.Type = MsgTypePriorityData
}
// Wait for processing slot.
msg.Unit.WaitForSlot()
// Send message.
tErr := op.terminal.Send(msg, timeout)
if tErr != nil {
// Finish message unit on failure.
msg.Finish()
}
return tErr
}
// Flush sends all messages waiting in the terminal.
// Meant to be overridden by implementations.
func (op *OperationBase) Flush(timeout time.Duration) {
op.terminal.Flush(timeout)
}
// Stopped returns whether the operation has stopped.
// Should not be overridden by implementations.
func (op *OperationBase) Stopped() bool {
return op.stopped.IsSet()
}
// markStopped marks the operation as stopped.
// It returns whether the stop flag was set.
func (op *OperationBase) markStopped() bool {
return op.stopped.SetToIf(false, true)
}
// Stop stops the operation by unregistering it from the terminal and calling HandleStop().
// Should not be overridden by implementations.
func (op *OperationBase) Stop(self Operation, err *Error) {
// Stop operation from terminal.
op.terminal.StopOperation(self, err)
}
// HandleStop gives the operation the ability to cleanly shut down.
// The returned error is the error to send to the other side.
// Should never be called directly. Call Stop() instead.
// Meant to be overridden by implementations.
func (op *OperationBase) HandleStop(err *Error) (errorToSend *Error) {
return err
}
// Terminal returns the terminal the operation is linked to.
// Should not be overridden by implementations.
func (op *OperationBase) Terminal() Terminal {
return op.terminal
}
// OneOffOperationBase is an operation base for operations that just have one
// message and a error return.
type OneOffOperationBase struct {
OperationBase
Result chan *Error
}
// Init initializes the single operation base.
func (op *OneOffOperationBase) Init() {
op.Result = make(chan *Error, 1)
}
// HandleStop gives the operation the ability to cleanly shut down.
// The returned error is the error to send to the other side.
// Should never be called directly. Call Stop() instead.
func (op *OneOffOperationBase) HandleStop(err *Error) (errorToSend *Error) {
select {
case op.Result <- err:
default:
}
return err
}
// MessageStreamOperationBase is an operation base for receiving a message stream.
// Every received message must be finished by the implementing operation.
type MessageStreamOperationBase struct {
OperationBase
Delivered chan *Msg
Ended chan *Error
}
// Init initializes the operation base.
func (op *MessageStreamOperationBase) Init(deliverQueueSize int) {
op.Delivered = make(chan *Msg, deliverQueueSize)
op.Ended = make(chan *Error, 1)
}
// Deliver delivers data to the operation.
func (op *MessageStreamOperationBase) Deliver(msg *Msg) *Error {
select {
case op.Delivered <- msg:
return nil
default:
return ErrIncorrectUsage.With("request was not waiting for data")
}
}
// HandleStop gives the operation the ability to cleanly shut down.
// The returned error is the error to send to the other side.
// Should never be called directly. Call Stop() instead.
func (op *MessageStreamOperationBase) HandleStop(err *Error) (errorToSend *Error) {
select {
case op.Ended <- err:
default:
}
return err
}