safing-portmaster/spn/terminal/terminal_test.go
Daniel Hååvi 80664d1a27
Restructure modules ()
* Move portbase into monorepo

* Add new simple module mgr

* [WIP] Switch to new simple module mgr

* Add StateMgr and more worker variants

* [WIP] Switch more modules

* [WIP] Switch more modules

* [WIP] swtich more modules

* [WIP] switch all SPN modules

* [WIP] switch all service modules

* [WIP] Convert all workers to the new module system

* [WIP] add new task system to module manager

* [WIP] Add second take for scheduling workers

* [WIP] Add FIXME for bugs in new scheduler

* [WIP] Add minor improvements to scheduler

* [WIP] Add new worker scheduler

* [WIP] Fix more bug related to new module system

* [WIP] Fix start handing of the new module system

* [WIP] Improve startup process

* [WIP] Fix minor issues

* [WIP] Fix missing subsystem in settings

* [WIP] Initialize managers in constructor

* [WIP] Move module event initialization to constrictors

* [WIP] Fix setting for enabling and disabling the SPN module

* [WIP] Move API registeration into module construction

* [WIP] Update states mgr for all modules

* [WIP] Add CmdLine operation support

* Add state helper methods to module group and instance

* Add notification and module status handling to status package

* Fix starting issues

* Remove pilot widget and update security lock to new status data

* Remove debug logs

* Improve http server shutdown

* Add workaround for cleanly shutting down firewall+netquery

* Improve logging

* Add syncing states with notifications for new module system

* Improve starting, stopping, shutdown; resolve FIXMEs/TODOs

* [WIP] Fix most unit tests

* Review new module system and fix minor issues

* Push shutdown and restart events again via API

* Set sleep mode via interface

* Update example/template module

* [WIP] Fix spn/cabin unit test

* Remove deprecated UI elements

* Make log output more similar for the logging transition phase

* Switch spn hub and observer cmds to new module system

* Fix log sources

* Make worker mgr less error prone

* Fix tests and minor issues

* Fix observation hub

* Improve shutdown and restart handling

* Split up big connection.go source file

* Move varint and dsd packages to structures repo

* Improve expansion test

* Fix linter warnings

* Fix interception module on windows

* Fix linter errors

---------

Co-authored-by: Vladimir Stoilov <vladimir@safing.io>
2024-08-09 18:15:48 +03:00

311 lines
8.4 KiB
Go

package terminal
import (
"fmt"
"os"
"runtime/pprof"
"sync/atomic"
"testing"
"time"
"github.com/safing/portmaster/spn/cabin"
"github.com/safing/portmaster/spn/hub"
"github.com/safing/structures/container"
)
func TestTerminals(t *testing.T) {
t.Parallel()
identity, erro := cabin.CreateIdentity(module.mgr.Ctx(), "test")
if erro != nil {
t.Fatalf("failed to create identity: %s", erro)
}
// Test without and with encryption.
for _, encrypt := range []bool{false, true} {
// Test with different flow controls.
for _, fc := range []struct {
flowControl FlowControlType
flowControlSize uint32
}{
{
flowControl: FlowControlNone,
flowControlSize: 5,
},
{
flowControl: FlowControlDFQ,
flowControlSize: defaultTestQueueSize,
},
} {
// Run tests with combined options.
testTerminals(t, identity, &TerminalOpts{
Encrypt: encrypt,
Padding: defaultTestPadding,
FlowControl: fc.flowControl,
FlowControlSize: fc.flowControlSize,
})
}
}
}
func testTerminals(t *testing.T, identity *cabin.Identity, terminalOpts *TerminalOpts) {
t.Helper()
// Prepare encryption.
var dstHub *hub.Hub
if terminalOpts.Encrypt {
dstHub = identity.Hub
} else {
identity = nil
}
// Create test terminals.
var term1 *TestTerminal
var term2 *TestTerminal
var initData *container.Container
var err *Error
term1, initData, err = NewLocalTestTerminal(
module.mgr.Ctx(), 127, "c1", dstHub, terminalOpts, createForwardingUpstream(
t, "c1", "c2", func(msg *Msg) *Error {
return term2.Deliver(msg)
},
),
)
if err != nil {
t.Fatalf("failed to create local terminal: %s", err)
}
term2, _, err = NewRemoteTestTerminal(
module.mgr.Ctx(), 127, "c2", identity, initData, createForwardingUpstream(
t, "c2", "c1", func(msg *Msg) *Error {
return term1.Deliver(msg)
},
),
)
if err != nil {
t.Fatalf("failed to create remote terminal: %s", err)
}
// Start testing with counters.
countToQueueSize := uint64(terminalOpts.FlowControlSize)
optionsSuffix := fmt.Sprintf(
"encrypt=%v,flowType=%d",
terminalOpts.Encrypt,
terminalOpts.FlowControl,
)
testTerminalWithCounters(t, term1, term2, &testWithCounterOpts{
testName: "onlyup-flushing-waiting:" + optionsSuffix,
flush: true,
serverCountTo: countToQueueSize * 2,
waitBetweenMsgs: sendThresholdMaxWait * 2,
})
testTerminalWithCounters(t, term1, term2, &testWithCounterOpts{
testName: "onlyup-waiting:" + optionsSuffix,
serverCountTo: 10,
waitBetweenMsgs: sendThresholdMaxWait * 2,
})
testTerminalWithCounters(t, term1, term2, &testWithCounterOpts{
testName: "onlyup-flushing:" + optionsSuffix,
flush: true,
serverCountTo: countToQueueSize * 2,
waitBetweenMsgs: time.Millisecond,
})
testTerminalWithCounters(t, term1, term2, &testWithCounterOpts{
testName: "onlyup:" + optionsSuffix,
serverCountTo: countToQueueSize * 2,
waitBetweenMsgs: time.Millisecond,
})
testTerminalWithCounters(t, term1, term2, &testWithCounterOpts{
testName: "onlydown-flushing-waiting:" + optionsSuffix,
flush: true,
clientCountTo: countToQueueSize * 2,
waitBetweenMsgs: sendThresholdMaxWait * 2,
})
testTerminalWithCounters(t, term1, term2, &testWithCounterOpts{
testName: "onlydown-waiting:" + optionsSuffix,
clientCountTo: 10,
waitBetweenMsgs: sendThresholdMaxWait * 2,
})
testTerminalWithCounters(t, term1, term2, &testWithCounterOpts{
testName: "onlydown-flushing:" + optionsSuffix,
flush: true,
clientCountTo: countToQueueSize * 2,
waitBetweenMsgs: time.Millisecond,
})
testTerminalWithCounters(t, term1, term2, &testWithCounterOpts{
testName: "onlydown:" + optionsSuffix,
clientCountTo: countToQueueSize * 2,
waitBetweenMsgs: time.Millisecond,
})
testTerminalWithCounters(t, term1, term2, &testWithCounterOpts{
testName: "twoway-flushing-waiting:" + optionsSuffix,
flush: true,
clientCountTo: countToQueueSize * 2,
serverCountTo: countToQueueSize * 2,
waitBetweenMsgs: sendThresholdMaxWait * 2,
})
testTerminalWithCounters(t, term1, term2, &testWithCounterOpts{
testName: "twoway-waiting:" + optionsSuffix,
flush: true,
clientCountTo: 10,
serverCountTo: 10,
waitBetweenMsgs: sendThresholdMaxWait * 2,
})
testTerminalWithCounters(t, term1, term2, &testWithCounterOpts{
testName: "twoway-flushing:" + optionsSuffix,
flush: true,
clientCountTo: countToQueueSize * 2,
serverCountTo: countToQueueSize * 2,
waitBetweenMsgs: time.Millisecond,
})
testTerminalWithCounters(t, term1, term2, &testWithCounterOpts{
testName: "twoway:" + optionsSuffix,
clientCountTo: countToQueueSize * 2,
serverCountTo: countToQueueSize * 2,
waitBetweenMsgs: time.Millisecond,
})
testTerminalWithCounters(t, term1, term2, &testWithCounterOpts{
testName: "stresstest-down:" + optionsSuffix,
clientCountTo: countToQueueSize * 1000,
})
testTerminalWithCounters(t, term1, term2, &testWithCounterOpts{
testName: "stresstest-up:" + optionsSuffix,
serverCountTo: countToQueueSize * 1000,
})
testTerminalWithCounters(t, term1, term2, &testWithCounterOpts{
testName: "stresstest-duplex:" + optionsSuffix,
clientCountTo: countToQueueSize * 1000,
serverCountTo: countToQueueSize * 1000,
})
// Clean up.
term1.Abandon(nil)
term2.Abandon(nil)
// Give some time for the last log messages and clean up.
time.Sleep(100 * time.Millisecond)
}
func createForwardingUpstream(t *testing.T, srcName, dstName string, deliverFunc func(*Msg) *Error) Upstream {
t.Helper()
return UpstreamSendFunc(func(msg *Msg, _ time.Duration) *Error {
// Fast track nil containers.
if msg == nil {
dErr := deliverFunc(msg)
if dErr != nil {
t.Errorf("%s>%s: failed to deliver nil msg to terminal: %s", srcName, dstName, dErr)
return dErr.With("failed to deliver nil msg to terminal")
}
return nil
}
// Log messages.
if logTestCraneMsgs {
t.Logf("%s>%s: %v\n", srcName, dstName, msg.Data.CompileData())
}
// Deliver to other terminal.
dErr := deliverFunc(msg)
if dErr != nil {
t.Errorf("%s>%s: failed to deliver to terminal: %s", srcName, dstName, dErr)
return dErr.With("failed to deliver to terminal")
}
return nil
})
}
type testWithCounterOpts struct {
testName string
flush bool
clientCountTo uint64
serverCountTo uint64
waitBetweenMsgs time.Duration
}
func testTerminalWithCounters(t *testing.T, term1, term2 *TestTerminal, opts *testWithCounterOpts) {
t.Helper()
// Wait async for test to complete, print stack after timeout.
finished := make(chan struct{})
maxTestDuration := 60 * time.Second
go func() {
select {
case <-finished:
case <-time.After(maxTestDuration):
fmt.Printf("terminal test %s is taking more than %s, printing stack:\n", opts.testName, maxTestDuration)
_ = pprof.Lookup("goroutine").WriteTo(os.Stdout, 1)
os.Exit(1)
}
}()
t.Logf("starting terminal counter test %s", opts.testName)
defer t.Logf("stopping terminal counter test %s", opts.testName)
// Start counters.
counter, tErr := NewCounterOp(term1, CounterOpts{
ClientCountTo: opts.clientCountTo,
ServerCountTo: opts.serverCountTo,
Flush: opts.flush,
Wait: opts.waitBetweenMsgs,
})
if tErr != nil {
t.Fatalf("terminal test %s failed to start counter: %s", opts.testName, tErr)
}
// Wait until counters are done.
counter.Wait()
close(finished)
// Check for error.
if counter.Error != nil {
t.Fatalf("terminal test %s failed to count: %s", opts.testName, counter.Error)
}
// Log stats.
printCTStats(t, opts.testName, "term1", term1)
printCTStats(t, opts.testName, "term2", term2)
// Check if stats match, if DFQ is used on both sides.
dfq1, ok1 := term1.flowControl.(*DuplexFlowQueue)
dfq2, ok2 := term2.flowControl.(*DuplexFlowQueue)
if ok1 && ok2 &&
(atomic.LoadInt32(dfq1.sendSpace) != atomic.LoadInt32(dfq2.reportedSpace) ||
atomic.LoadInt32(dfq2.sendSpace) != atomic.LoadInt32(dfq1.reportedSpace)) {
t.Fatalf("terminal test %s has non-matching space counters", opts.testName)
}
}
func printCTStats(t *testing.T, testName, name string, term *TestTerminal) {
t.Helper()
dfq, ok := term.flowControl.(*DuplexFlowQueue)
if !ok {
return
}
t.Logf(
"%s: %s: sq=%d rq=%d sends=%d reps=%d",
testName,
name,
len(dfq.sendQueue),
len(dfq.recvQueue),
atomic.LoadInt32(dfq.sendSpace),
atomic.LoadInt32(dfq.reportedSpace),
)
}