mirror of
https://github.com/safing/portmaster
synced 2025-04-23 04:19:10 +00:00
* Move portbase into monorepo * Add new simple module mgr * [WIP] Switch to new simple module mgr * Add StateMgr and more worker variants * [WIP] Switch more modules * [WIP] Switch more modules * [WIP] swtich more modules * [WIP] switch all SPN modules * [WIP] switch all service modules * [WIP] Convert all workers to the new module system * [WIP] add new task system to module manager * [WIP] Add second take for scheduling workers * [WIP] Add FIXME for bugs in new scheduler * [WIP] Add minor improvements to scheduler * [WIP] Add new worker scheduler * [WIP] Fix more bug related to new module system * [WIP] Fix start handing of the new module system * [WIP] Improve startup process * [WIP] Fix minor issues * [WIP] Fix missing subsystem in settings * [WIP] Initialize managers in constructor * [WIP] Move module event initialization to constrictors * [WIP] Fix setting for enabling and disabling the SPN module * [WIP] Move API registeration into module construction * [WIP] Update states mgr for all modules * [WIP] Add CmdLine operation support * Add state helper methods to module group and instance * Add notification and module status handling to status package * Fix starting issues * Remove pilot widget and update security lock to new status data * Remove debug logs * Improve http server shutdown * Add workaround for cleanly shutting down firewall+netquery * Improve logging * Add syncing states with notifications for new module system * Improve starting, stopping, shutdown; resolve FIXMEs/TODOs * [WIP] Fix most unit tests * Review new module system and fix minor issues * Push shutdown and restart events again via API * Set sleep mode via interface * Update example/template module * [WIP] Fix spn/cabin unit test * Remove deprecated UI elements * Make log output more similar for the logging transition phase * Switch spn hub and observer cmds to new module system * Fix log sources * Make worker mgr less error prone * Fix tests and minor issues * Fix observation hub * Improve shutdown and restart handling * Split up big connection.go source file * Move varint and dsd packages to structures repo * Improve expansion test * Fix linter warnings * Fix interception module on windows * Fix linter errors --------- Co-authored-by: Vladimir Stoilov <vladimir@safing.io>
586 lines
16 KiB
Go
586 lines
16 KiB
Go
package crew
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"net"
|
|
"strconv"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/safing/portmaster/base/log"
|
|
"github.com/safing/portmaster/service/mgr"
|
|
"github.com/safing/portmaster/service/network/netutils"
|
|
"github.com/safing/portmaster/service/network/packet"
|
|
"github.com/safing/portmaster/spn/conf"
|
|
"github.com/safing/portmaster/spn/terminal"
|
|
"github.com/safing/structures/container"
|
|
"github.com/safing/structures/dsd"
|
|
)
|
|
|
|
// ConnectOpType is the type ID for the connection operation.
|
|
const ConnectOpType string = "connect"
|
|
|
|
var activeConnectOps = new(int64)
|
|
|
|
// ConnectOp is used to connect data tunnels to servers on the Internet.
|
|
type ConnectOp struct {
|
|
terminal.OperationBase
|
|
|
|
// Flow Control
|
|
dfq *terminal.DuplexFlowQueue
|
|
|
|
// Context and shutdown handling
|
|
// ctx is the context of the Terminal.
|
|
ctx context.Context
|
|
// cancelCtx cancels ctx.
|
|
cancelCtx context.CancelFunc
|
|
// doneWriting signals that the writer has finished writing.
|
|
doneWriting chan struct{}
|
|
|
|
// Metrics
|
|
incomingTraffic atomic.Uint64
|
|
outgoingTraffic atomic.Uint64
|
|
started time.Time
|
|
|
|
// Connection
|
|
t terminal.Terminal
|
|
conn net.Conn
|
|
request *ConnectRequest
|
|
entry bool
|
|
tunnel *Tunnel
|
|
}
|
|
|
|
// Type returns the type ID.
|
|
func (op *ConnectOp) Type() string {
|
|
return ConnectOpType
|
|
}
|
|
|
|
// Ctx returns the operation context.
|
|
func (op *ConnectOp) Ctx() context.Context {
|
|
return op.ctx
|
|
}
|
|
|
|
// ConnectRequest holds all the information necessary for a connect operation.
|
|
type ConnectRequest struct {
|
|
Domain string `json:"d,omitempty"`
|
|
IP net.IP `json:"ip,omitempty"`
|
|
UsePriorityDataMsgs bool `json:"pr,omitempty"`
|
|
Protocol packet.IPProtocol `json:"p,omitempty"`
|
|
Port uint16 `json:"po,omitempty"`
|
|
QueueSize uint32 `json:"qs,omitempty"`
|
|
}
|
|
|
|
// DialNetwork returns the address of the connect request.
|
|
func (r *ConnectRequest) DialNetwork() string {
|
|
if ip4 := r.IP.To4(); ip4 != nil {
|
|
switch r.Protocol { //nolint:exhaustive // Only looking for supported protocols.
|
|
case packet.TCP:
|
|
return "tcp4"
|
|
case packet.UDP:
|
|
return "udp4"
|
|
}
|
|
} else {
|
|
switch r.Protocol { //nolint:exhaustive // Only looking for supported protocols.
|
|
case packet.TCP:
|
|
return "tcp6"
|
|
case packet.UDP:
|
|
return "udp6"
|
|
}
|
|
}
|
|
|
|
return ""
|
|
}
|
|
|
|
// Address returns the address of the connext request.
|
|
func (r *ConnectRequest) Address() string {
|
|
return net.JoinHostPort(r.IP.String(), strconv.Itoa(int(r.Port)))
|
|
}
|
|
|
|
func (r *ConnectRequest) String() string {
|
|
if r.Domain != "" {
|
|
return fmt.Sprintf("%s (%s %s)", r.Domain, r.Protocol, r.Address())
|
|
}
|
|
return fmt.Sprintf("%s %s", r.Protocol, r.Address())
|
|
}
|
|
|
|
func init() {
|
|
terminal.RegisterOpType(terminal.OperationFactory{
|
|
Type: ConnectOpType,
|
|
Requires: terminal.MayConnect,
|
|
Start: startConnectOp,
|
|
})
|
|
}
|
|
|
|
// NewConnectOp starts a new connect operation.
|
|
func NewConnectOp(tunnel *Tunnel) (*ConnectOp, *terminal.Error) {
|
|
// Submit metrics.
|
|
connectOpCnt.Inc()
|
|
|
|
// Create request.
|
|
request := &ConnectRequest{
|
|
Domain: tunnel.connInfo.Entity.Domain,
|
|
IP: tunnel.connInfo.Entity.IP,
|
|
Protocol: packet.IPProtocol(tunnel.connInfo.Entity.Protocol),
|
|
Port: tunnel.connInfo.Entity.Port,
|
|
UsePriorityDataMsgs: terminal.UsePriorityDataMsgs,
|
|
}
|
|
|
|
// Set defaults.
|
|
if request.QueueSize == 0 {
|
|
request.QueueSize = terminal.DefaultQueueSize
|
|
}
|
|
|
|
// Create new op.
|
|
op := &ConnectOp{
|
|
doneWriting: make(chan struct{}),
|
|
t: tunnel.dstTerminal,
|
|
conn: tunnel.conn,
|
|
request: request,
|
|
entry: true,
|
|
tunnel: tunnel,
|
|
}
|
|
op.ctx, op.cancelCtx = context.WithCancel(module.mgr.Ctx())
|
|
op.dfq = terminal.NewDuplexFlowQueue(op.Ctx(), request.QueueSize, op.submitUpstream)
|
|
|
|
// Prepare init msg.
|
|
data, err := dsd.Dump(request, dsd.CBOR)
|
|
if err != nil {
|
|
return nil, terminal.ErrInternalError.With("failed to pack connect request: %w", err)
|
|
}
|
|
|
|
// Initialize.
|
|
tErr := op.t.StartOperation(op, container.New(data), 5*time.Second)
|
|
if tErr != nil {
|
|
return nil, tErr
|
|
}
|
|
|
|
// Setup metrics.
|
|
op.started = time.Now()
|
|
|
|
module.mgr.Go("connect op conn reader", op.connReader)
|
|
module.mgr.Go("connect op conn writer", op.connWriter)
|
|
module.mgr.Go("connect op flow handler", op.dfq.FlowHandler)
|
|
|
|
log.Infof("spn/crew: connected to %s via %s", request, tunnel.dstPin.Hub)
|
|
return op, nil
|
|
}
|
|
|
|
func startConnectOp(t terminal.Terminal, opID uint32, data *container.Container) (terminal.Operation, *terminal.Error) {
|
|
// Check if we are running a public hub.
|
|
if !conf.PublicHub() {
|
|
return nil, terminal.ErrPermissionDenied.With("connecting is only allowed on public hubs")
|
|
}
|
|
|
|
// Parse connect request.
|
|
request := &ConnectRequest{}
|
|
_, err := dsd.Load(data.CompileData(), request)
|
|
if err != nil {
|
|
connectOpCntError.Inc() // More like a protocol/system error than a bad request.
|
|
return nil, terminal.ErrMalformedData.With("failed to parse connect request: %w", err)
|
|
}
|
|
if request.QueueSize == 0 || request.QueueSize > terminal.MaxQueueSize {
|
|
connectOpCntError.Inc() // More like a protocol/system error than a bad request.
|
|
return nil, terminal.ErrInvalidOptions.With("invalid queue size of %d", request.QueueSize)
|
|
}
|
|
|
|
// Check if IP seems valid.
|
|
if len(request.IP) != net.IPv4len && len(request.IP) != net.IPv6len {
|
|
connectOpCntError.Inc() // More like a protocol/system error than a bad request.
|
|
return nil, terminal.ErrInvalidOptions.With("ip address is not valid")
|
|
}
|
|
|
|
// Create and initialize operation.
|
|
op := &ConnectOp{
|
|
doneWriting: make(chan struct{}),
|
|
t: t,
|
|
request: request,
|
|
}
|
|
op.InitOperationBase(t, opID)
|
|
op.ctx, op.cancelCtx = context.WithCancel(t.Ctx())
|
|
op.dfq = terminal.NewDuplexFlowQueue(op.Ctx(), request.QueueSize, op.submitUpstream)
|
|
|
|
// Start worker to complete setting up the connection.
|
|
module.mgr.Go("connect op setup", op.handleSetup)
|
|
|
|
return op, nil
|
|
}
|
|
|
|
func (op *ConnectOp) handleSetup(_ *mgr.WorkerCtx) error {
|
|
// Get terminal session for rate limiting.
|
|
var session *terminal.Session
|
|
if sessionTerm, ok := op.t.(terminal.SessionTerminal); ok {
|
|
session = sessionTerm.GetSession()
|
|
} else {
|
|
connectOpCntError.Inc()
|
|
log.Errorf("spn/crew: %T is not a session terminal, aborting op %s#%d", op.t, op.t.FmtID(), op.ID())
|
|
op.Stop(op, terminal.ErrInternalError.With("no session available"))
|
|
return nil
|
|
}
|
|
|
|
// Limit concurrency of connecting.
|
|
cancelErr := session.LimitConcurrency(op.Ctx(), func() {
|
|
op.setup(session)
|
|
})
|
|
|
|
// If context was canceled, stop operation.
|
|
if cancelErr != nil {
|
|
connectOpCntCanceled.Inc()
|
|
op.Stop(op, terminal.ErrCanceled.With(cancelErr.Error()))
|
|
}
|
|
|
|
// Do not return a worker error.
|
|
return nil
|
|
}
|
|
|
|
func (op *ConnectOp) setup(session *terminal.Session) {
|
|
// Rate limit before connecting.
|
|
if tErr := session.RateLimit(); tErr != nil {
|
|
// Add rate limit info to error.
|
|
if tErr.Is(terminal.ErrRateLimited) {
|
|
connectOpCntRateLimited.Inc()
|
|
op.Stop(op, tErr.With(session.RateLimitInfo()))
|
|
return
|
|
}
|
|
|
|
connectOpCntError.Inc()
|
|
op.Stop(op, tErr)
|
|
return
|
|
}
|
|
|
|
// Check if connection target is in global scope.
|
|
ipScope := netutils.GetIPScope(op.request.IP)
|
|
if ipScope != netutils.Global {
|
|
session.ReportSuspiciousActivity(terminal.SusFactorQuiteUnusual)
|
|
connectOpCntBadRequest.Inc()
|
|
op.Stop(op, terminal.ErrPermissionDenied.With("denied request to connect to non-global IP %s", op.request.IP))
|
|
return
|
|
}
|
|
|
|
// Check exit policy.
|
|
if tErr := checkExitPolicy(op.request); tErr != nil {
|
|
session.ReportSuspiciousActivity(terminal.SusFactorQuiteUnusual)
|
|
connectOpCntBadRequest.Inc()
|
|
op.Stop(op, tErr)
|
|
return
|
|
}
|
|
|
|
// Check one last time before connecting if operation was not canceled.
|
|
if op.Ctx().Err() != nil {
|
|
op.Stop(op, terminal.ErrCanceled.With(op.Ctx().Err().Error()))
|
|
connectOpCntCanceled.Inc()
|
|
return
|
|
}
|
|
|
|
// Connect to destination.
|
|
dialNet := op.request.DialNetwork()
|
|
if dialNet == "" {
|
|
session.ReportSuspiciousActivity(terminal.SusFactorCommon)
|
|
connectOpCntBadRequest.Inc()
|
|
op.Stop(op, terminal.ErrIncorrectUsage.With("protocol %s is not supported", op.request.Protocol))
|
|
return
|
|
}
|
|
dialer := &net.Dialer{
|
|
Timeout: 10 * time.Second,
|
|
LocalAddr: conf.GetBindAddr(dialNet),
|
|
FallbackDelay: -1, // Disables Fast Fallback from IPv6 to IPv4.
|
|
KeepAlive: -1, // Disable keep-alive.
|
|
}
|
|
conn, err := dialer.DialContext(op.Ctx(), dialNet, op.request.Address())
|
|
if err != nil {
|
|
// Connection errors are common, but still a bit suspicious.
|
|
var netError net.Error
|
|
switch {
|
|
case errors.As(err, &netError) && netError.Timeout():
|
|
session.ReportSuspiciousActivity(terminal.SusFactorCommon)
|
|
connectOpCntFailed.Inc()
|
|
case errors.Is(err, context.Canceled):
|
|
session.ReportSuspiciousActivity(terminal.SusFactorCommon)
|
|
connectOpCntCanceled.Inc()
|
|
default:
|
|
session.ReportSuspiciousActivity(terminal.SusFactorWeirdButOK)
|
|
connectOpCntFailed.Inc()
|
|
}
|
|
|
|
op.Stop(op, terminal.ErrConnectionError.With("failed to connect to %s: %w", op.request, err))
|
|
return
|
|
}
|
|
op.conn = conn
|
|
|
|
// Start worker.
|
|
module.mgr.Go("connect op conn reader", op.connReader)
|
|
module.mgr.Go("connect op conn writer", op.connWriter)
|
|
module.mgr.Go("connect op flow handler", op.dfq.FlowHandler)
|
|
|
|
connectOpCntConnected.Inc()
|
|
log.Infof("spn/crew: connected op %s#%d to %s", op.t.FmtID(), op.ID(), op.request)
|
|
}
|
|
|
|
func (op *ConnectOp) submitUpstream(msg *terminal.Msg, timeout time.Duration) {
|
|
err := op.Send(msg, timeout)
|
|
if err != nil {
|
|
msg.Finish()
|
|
op.Stop(op, err.Wrap("failed to send data (op) read from %s", op.connectedType()))
|
|
}
|
|
}
|
|
|
|
const (
|
|
readBufSize = 1500
|
|
|
|
// High priority up to first 10MB.
|
|
highPrioThreshold = 10_000_000
|
|
|
|
// Rate limit to 128 Mbit/s after 1GB traffic.
|
|
// Do NOT use time.Sleep per packet, as it is very inaccurate and will sleep a lot longer than desired.
|
|
rateLimitThreshold = 1_000_000_000
|
|
rateLimitMaxMbit = 128
|
|
)
|
|
|
|
func (op *ConnectOp) connReader(_ *mgr.WorkerCtx) error {
|
|
// Metrics setup and submitting.
|
|
atomic.AddInt64(activeConnectOps, 1)
|
|
defer func() {
|
|
atomic.AddInt64(activeConnectOps, -1)
|
|
connectOpDurationHistogram.UpdateDuration(op.started)
|
|
connectOpIncomingDataHistogram.Update(float64(op.incomingTraffic.Load()))
|
|
}()
|
|
|
|
rateLimiter := terminal.NewRateLimiter(rateLimitMaxMbit)
|
|
|
|
for {
|
|
// Read from connection.
|
|
buf := make([]byte, readBufSize)
|
|
n, err := op.conn.Read(buf)
|
|
if err != nil {
|
|
if errors.Is(err, io.EOF) {
|
|
op.Stop(op, terminal.ErrStopping.With("connection to %s was closed on read", op.connectedType()))
|
|
} else {
|
|
op.Stop(op, terminal.ErrConnectionError.With("failed to read from %s: %w", op.connectedType(), err))
|
|
}
|
|
return nil
|
|
}
|
|
if n == 0 {
|
|
log.Tracef("spn/crew: connect op %s>%d read 0 bytes from %s", op.t.FmtID(), op.ID(), op.connectedType())
|
|
continue
|
|
}
|
|
|
|
// Submit metrics.
|
|
connectOpIncomingBytes.Add(n)
|
|
inBytes := op.incomingTraffic.Add(uint64(n))
|
|
|
|
// Rate limit if over threshold.
|
|
if inBytes > rateLimitThreshold {
|
|
rateLimiter.Limit(uint64(n))
|
|
}
|
|
|
|
// Create message from data.
|
|
msg := op.NewMsg(buf[:n])
|
|
|
|
// Define priority and possibly wait for slot.
|
|
switch {
|
|
case inBytes > highPrioThreshold:
|
|
msg.Unit.WaitForSlot()
|
|
case op.request.UsePriorityDataMsgs:
|
|
msg.Unit.MakeHighPriority()
|
|
}
|
|
|
|
// Send packet.
|
|
tErr := op.dfq.Send(
|
|
msg,
|
|
30*time.Second,
|
|
)
|
|
if tErr != nil {
|
|
msg.Finish()
|
|
op.Stop(op, tErr.Wrap("failed to send data (dfq) from %s", op.connectedType()))
|
|
return nil
|
|
}
|
|
}
|
|
}
|
|
|
|
// Deliver delivers a messages to the operation.
|
|
func (op *ConnectOp) Deliver(msg *terminal.Msg) *terminal.Error {
|
|
return op.dfq.Deliver(msg)
|
|
}
|
|
|
|
func (op *ConnectOp) connWriter(_ *mgr.WorkerCtx) error {
|
|
// Metrics submitting.
|
|
defer func() {
|
|
connectOpOutgoingDataHistogram.Update(float64(op.outgoingTraffic.Load()))
|
|
}()
|
|
|
|
defer func() {
|
|
// Signal that we are done with writing.
|
|
close(op.doneWriting)
|
|
// Close connection.
|
|
_ = op.conn.Close()
|
|
}()
|
|
|
|
var msg *terminal.Msg
|
|
defer msg.Finish()
|
|
|
|
rateLimiter := terminal.NewRateLimiter(rateLimitMaxMbit)
|
|
|
|
writing:
|
|
for {
|
|
msg.Finish()
|
|
|
|
select {
|
|
case msg = <-op.dfq.Receive():
|
|
case <-op.ctx.Done():
|
|
op.Stop(op, terminal.ErrCanceled)
|
|
return nil
|
|
default:
|
|
// Handle all data before also listening for the context cancel.
|
|
// This ensures all data is written properly before stopping.
|
|
select {
|
|
case msg = <-op.dfq.Receive():
|
|
case op.doneWriting <- struct{}{}:
|
|
op.Stop(op, terminal.ErrStopping)
|
|
return nil
|
|
case <-op.ctx.Done():
|
|
op.Stop(op, terminal.ErrCanceled)
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// TODO: Instead of compiling data here again, can we send it as in the container?
|
|
data := msg.Data.CompileData()
|
|
if len(data) == 0 {
|
|
continue writing
|
|
}
|
|
|
|
// Submit metrics.
|
|
connectOpOutgoingBytes.Add(len(data))
|
|
out := op.outgoingTraffic.Add(uint64(len(data)))
|
|
|
|
// Rate limit if over threshold.
|
|
if out > rateLimitThreshold {
|
|
rateLimiter.Limit(uint64(len(data)))
|
|
}
|
|
|
|
// Special handling after first data was received on client.
|
|
if op.entry &&
|
|
out == uint64(len(data)) {
|
|
// Report time taken to receive first byte.
|
|
connectOpTTFBDurationHistogram.UpdateDuration(op.started)
|
|
|
|
// If not stickied yet, stick destination to Hub.
|
|
if !op.tunnel.stickied {
|
|
op.tunnel.stickDestinationToHub()
|
|
}
|
|
}
|
|
|
|
// Send all given data.
|
|
for {
|
|
n, err := op.conn.Write(data)
|
|
switch {
|
|
case err != nil:
|
|
if errors.Is(err, io.EOF) {
|
|
op.Stop(op, terminal.ErrStopping.With("connection to %s was closed on write", op.connectedType()))
|
|
} else {
|
|
op.Stop(op, terminal.ErrConnectionError.With("failed to send to %s: %w", op.connectedType(), err))
|
|
}
|
|
return nil
|
|
case n == 0:
|
|
op.Stop(op, terminal.ErrConnectionError.With("sent 0 bytes to %s", op.connectedType()))
|
|
return nil
|
|
case n < len(data):
|
|
// If not all data was sent, try again.
|
|
log.Debugf("spn/crew: %s#%d only sent %d/%d bytes to %s", op.t.FmtID(), op.ID(), n, len(data), op.connectedType())
|
|
data = data[n:]
|
|
default:
|
|
continue writing
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (op *ConnectOp) connectedType() string {
|
|
if op.entry {
|
|
return "origin"
|
|
}
|
|
return "destination"
|
|
}
|
|
|
|
// HandleStop gives the operation the ability to cleanly shut down.
|
|
// The returned error is the error to send to the other side.
|
|
// Should never be called directly. Call Stop() instead.
|
|
func (op *ConnectOp) HandleStop(err *terminal.Error) (errorToSend *terminal.Error) {
|
|
if err.IsError() {
|
|
reportConnectError(err)
|
|
}
|
|
|
|
// If the connection has sent or received any data so far, finish the data
|
|
// flows as it makes sense.
|
|
if op.incomingTraffic.Load() > 0 || op.outgoingTraffic.Load() > 0 {
|
|
// If the op was ended locally, send all data before closing.
|
|
// If the op was ended remotely, don't bother sending remaining data.
|
|
if !err.IsExternal() {
|
|
// Flushing could mean sending a full buffer of 50000 packets.
|
|
op.dfq.Flush(5 * time.Minute)
|
|
}
|
|
|
|
// If the op was ended remotely, write all remaining received data.
|
|
// If the op was ended locally, don't bother writing remaining data.
|
|
if err.IsExternal() {
|
|
select {
|
|
case <-op.doneWriting:
|
|
default:
|
|
select {
|
|
case <-op.doneWriting:
|
|
case <-time.After(5 * time.Second):
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Cancel workers.
|
|
op.cancelCtx()
|
|
|
|
// Special client-side handling.
|
|
if op.entry {
|
|
// Mark the connection as failed if there was an error and no data was sent to the app yet.
|
|
if err.IsError() && op.outgoingTraffic.Load() == 0 {
|
|
// Set connection to failed and save it to propagate the update.
|
|
c := op.tunnel.connInfo
|
|
func() {
|
|
c.Lock()
|
|
defer c.Unlock()
|
|
|
|
if err.IsExternal() {
|
|
c.Failed(fmt.Sprintf(
|
|
"the exit node reported an error: %s", err,
|
|
), "")
|
|
} else {
|
|
c.Failed(fmt.Sprintf(
|
|
"connection failed locally: %s", err,
|
|
), "")
|
|
}
|
|
|
|
c.Save()
|
|
}()
|
|
}
|
|
|
|
// Avoid connecting to the destination via this Hub if:
|
|
// - The error is external - ie. from the server.
|
|
// - The error is a connection error.
|
|
// - No data was received.
|
|
// This indicates that there is some network level issue that we can
|
|
// possibly work around by using another exit node.
|
|
if err.IsError() && err.IsExternal() &&
|
|
err.Is(terminal.ErrConnectionError) &&
|
|
op.outgoingTraffic.Load() == 0 {
|
|
op.tunnel.avoidDestinationHub()
|
|
}
|
|
|
|
// Don't leak local errors to the server.
|
|
if !err.IsExternal() {
|
|
// Change error that is reported.
|
|
return terminal.ErrStopping
|
|
}
|
|
}
|
|
|
|
return err
|
|
}
|