Revert: roll tunnel and worker back to e688fb1

Today's rewrite (e729560) introduced two independent regressions that
compounded into a total outage:

1. worker.js added `await socket.opened` before sending CONNECT_OK. On
   Cloudflare Workers, `socket.opened` runs an HTTP-based-service heuristic
   that pre-emptively rejects with "proxy request failed ... consider using
   fetch instead" for destinations that look HTTPS-ish (port 443 with
   TLS-like framing) — even when a plain read/write on the same socket
   would have worked. Every Telegram DC hit this path and the tunnel
   returned 100% CONNECT_FAIL.

2. The N-parallel-session + voluntary TTL-rotation architecture in the
   rewritten tunnel.go killed in-flight TCP streams every rotation cycle,
   so Telegram MTProto couldn't finish a single handshake before its
   session got rotated out. Staggering + longer TTLs didn't help enough.

Reverted tunnel.go, main.go, listener.go, worker.js, wrangler.toml and all
9 prebuilt binaries to e688fb1 (the last commit known to work for three
days straight). lib/install.sh keeps the iptables -I PREROUTING 1 fix from
today (unrelated to the tunnel rewrite, real bug) but drops the
--parallel/--session-ttl flags since the reverted binary doesn't know them.

The Roblox work from a6c607d stays: files/lua/z2k-modern-core.lua still
contains z2k_game_udp, lib/config_official.sh still wires it into the game
strategies. That part works and is orthogonal to the tunnel.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Necronicle 2026-04-13 00:58:03 +03:00
parent a6c607d75f
commit 2184b9b413
16 changed files with 538 additions and 1087 deletions

View file

@ -7,7 +7,6 @@
import { connect } from "cloudflare:sockets";
// Mux message types
const MUX_AUTH = 0x00;
const MUX_CONNECT = 0x01;
const MUX_DATA = 0x02;
const MUX_CLOSE = 0x03;
@ -18,106 +17,103 @@ const MUX_CONNECT_FAIL = 0x05;
const ADDR_IPV4 = 1;
const ADDR_IPV6 = 4;
// Telegram DC IP allowlist (from AS62041, AS59930)
const TELEGRAM_CIDRS = [
"149.154.160.0/20",
"91.108.4.0/22",
"91.108.8.0/22",
"91.108.12.0/22",
"91.108.16.0/22",
"91.108.20.0/22",
"91.108.56.0/22",
"91.105.192.0/23",
"95.161.64.0/20",
"185.76.151.0/24",
].map(cidr => {
const [addr, bits] = cidr.split("/");
const parts = addr.split(".").map(Number);
const ip = (parts[0] << 24) | (parts[1] << 16) | (parts[2] << 8) | parts[3];
const mask = ~0 << (32 - parseInt(bits));
return { network: ip & mask, mask };
});
const TELEGRAM_V6_PREFIXES = [
"2001:b28:f23d:",
"2001:b28:f23f:",
"2001:67c:4e8:",
];
function isTelegramIP(address) {
const v4parts = address.split(".");
if (v4parts.length === 4) {
const nums = v4parts.map(Number);
if (nums.some(n => isNaN(n) || n < 0 || n > 255)) return false;
const ip = (nums[0] << 24) | (nums[1] << 16) | (nums[2] << 8) | nums[3];
return TELEGRAM_CIDRS.some(c => (ip & c.mask) === c.network);
}
const lower = address.toLowerCase();
return TELEGRAM_V6_PREFIXES.some(p => lower.startsWith(p));
}
/**
* Encode a mux frame: [2 bytes stream_id BE][1 byte msg_type][payload]
*/
function encodeMuxFrame(streamId, msgType, payload) {
const plen = payload ? payload.byteLength : 0;
const buf = new Uint8Array(3 + plen);
buf[0] = (streamId >> 8) & 0xff;
buf[1] = streamId & 0xff;
buf[2] = msgType;
if (plen > 0) {
buf.set(new Uint8Array(payload), 3);
const header = new Uint8Array(3);
header[0] = (streamId >> 8) & 0xff;
header[1] = streamId & 0xff;
header[2] = msgType;
if (!payload || payload.byteLength === 0) {
return header.buffer;
}
return buf.buffer;
const frame = new Uint8Array(3 + payload.byteLength);
frame.set(header);
frame.set(new Uint8Array(payload), 3);
return frame.buffer;
}
/**
* Decode a mux frame from an ArrayBuffer.
*/
function decodeMuxFrame(buffer) {
const view = new DataView(buffer);
if (buffer.byteLength < 3) {
throw new Error(`mux frame too short: ${buffer.byteLength}`);
}
const view = new DataView(buffer);
return {
streamId: view.getUint16(0, false),
streamId: view.getUint16(0, false), // big-endian
msgType: view.getUint8(2),
payload: buffer.slice(3),
};
}
/**
* Parse CONNECT payload: [addr_type][addr][port BE]
*/
function parseConnectPayload(buffer) {
const view = new DataView(buffer);
const addrType = view.getUint8(0);
if (addrType === ADDR_IPV4) {
if (buffer.byteLength < 7) throw new Error("IPv4 CONNECT too short");
const a = view.getUint8(1), b = view.getUint8(2), c = view.getUint8(3), d = view.getUint8(4);
return { address: `${a}.${b}.${c}.${d}`, port: view.getUint16(5, false) };
const a = view.getUint8(1);
const b = view.getUint8(2);
const c = view.getUint8(3);
const d = view.getUint8(4);
const port = view.getUint16(5, false);
return { address: `${a}.${b}.${c}.${d}`, port };
}
if (addrType === ADDR_IPV6) {
if (buffer.byteLength < 19) throw new Error("IPv6 CONNECT too short");
const parts = [];
for (let i = 0; i < 8; i++) parts.push(view.getUint16(1 + i * 2, false).toString(16));
return { address: parts.join(":"), port: view.getUint16(17, false) };
for (let i = 0; i < 8; i++) {
parts.push(view.getUint16(1 + i * 2, false).toString(16));
}
const port = view.getUint16(17, false);
return { address: parts.join(":"), port };
}
throw new Error(`unknown addr type: ${addrType}`);
}
/**
* Compute HMAC-SHA256 of secret keyed by itself.
*/
async function computeAuthHMAC(secret) {
const enc = new TextEncoder();
const keyData = enc.encode(secret);
const key = await crypto.subtle.importKey("raw", keyData, { name: "HMAC", hash: "SHA-256" }, false, ["sign"]);
const encoder = new TextEncoder();
const keyData = encoder.encode(secret);
const key = await crypto.subtle.importKey(
"raw", keyData, { name: "HMAC", hash: "SHA-256" }, false, ["sign"]
);
const sig = await crypto.subtle.sign("HMAC", key, keyData);
return new Uint8Array(sig);
}
/**
* Compare two Uint8Arrays in constant time.
*/
function timingSafeEqual(a, b) {
if (a.length !== b.length) return false;
let r = 0;
for (let i = 0; i < a.length; i++) r |= a[i] ^ b[i];
return r === 0;
let result = 0;
for (let i = 0; i < a.length; i++) {
result |= a[i] ^ b[i];
}
return result === 0;
}
export default {
async fetch(request, env) {
const url = new URL(request.url);
// Only handle /ws path
if (url.pathname !== "/ws") {
return new Response("z2k-tunnel relay", { status: 200 });
}
// Must be WebSocket upgrade
if (request.headers.get("Upgrade") !== "websocket") {
return new Response("Expected WebSocket", { status: 426 });
}
@ -130,132 +126,113 @@ export default {
const pair = new WebSocketPair();
const [client, server] = Object.values(pair);
server.accept();
const sessionId = Math.random().toString(36).slice(2, 8);
const clientIP = request.headers.get("CF-Connecting-IP") || "unknown";
const sessionStart = Date.now();
console.log(`[${sessionId}] WS accepted from ${clientIP}`);
// Per-stream state: { socket, writer, closed }
// Track TCP streams: streamId → { socket, writer }
const streams = new Map();
const MAX_STREAMS = 100; // prevent memory exhaustion
let authenticated = false;
let totalConnects = 0;
let totalMessages = 0;
// Direct send — no Promise chaining (avoids microtask overhead per frame).
// server.send() is synchronous in CF Workers; on WS teardown it throws
// and we swallow since the close handler will clean up streams.
function sendFrame(streamId, msgType, payload) {
try {
server.send(encodeMuxFrame(streamId, msgType, payload));
} catch (_) {}
}
// Pre-compute expected auth HMAC
const expectedHMAC = await computeAuthHMAC(secret);
function closeStream(streamId) {
const s = streams.get(streamId);
if (!s || s.closed) return;
s.closed = true;
streams.delete(streamId);
try { s.writer.close(); } catch (_) {}
try { s.socket.close(); } catch (_) {}
/**
* Send a mux frame to the client WS.
*/
function sendFrame(streamId, msgType, payload) {
try {
const frame = encodeMuxFrame(streamId, msgType, payload);
server.send(frame);
} catch (e) {
console.error(`sendFrame error stream=${streamId}: ${e.message}`);
}
}
/**
* Handle a CONNECT request: open TCP to target and start reading.
*/
async function handleConnect(streamId, payload) {
let target;
try {
target = parseConnectPayload(payload);
} catch (e) {
console.error(`stream ${streamId} bad CONNECT: ${e.message}`);
sendFrame(streamId, MUX_CONNECT_FAIL, null);
return;
}
if (!isTelegramIP(target.address)) {
console.warn(`[${sessionId}] stream ${streamId} blocked non-Telegram target ${target.address}`);
sendFrame(streamId, MUX_CONNECT_FAIL, null);
return;
}
totalConnects++;
const t0 = Date.now();
console.log(`stream ${streamId} CONNECT ${target.address}:${target.port}`);
let socket;
try {
socket = connect(
{ hostname: target.address, port: target.port },
{ allowHalfOpen: false }
);
socket = connect({ hostname: target.address, port: target.port });
} catch (e) {
console.error(`[${sessionId}] stream ${streamId} connect() threw: ${e.message}`);
console.error(`stream ${streamId} connect failed: ${e.message}`);
sendFrame(streamId, MUX_CONNECT_FAIL, null);
return;
}
// Wait for TCP handshake; confirms reachability before we tell the client OK.
try {
await socket.opened;
} catch (e) {
console.error(`[${sessionId}] stream ${streamId} opened rejected: ${e.message || e}`);
sendFrame(streamId, MUX_CONNECT_FAIL, null);
try { socket.close(); } catch (_) {}
return;
}
// Register stream and send CONNECT_OK immediately — don't wait
// for TCP handshake. Client data is buffered by socket.writable
// and flushed when connection establishes. This saves ~200ms per stream.
const writer = socket.writable.getWriter();
const state = { socket, writer, closed: false };
streams.set(streamId, state);
streams.set(streamId, { socket, writer });
sendFrame(streamId, MUX_CONNECT_OK, null);
// TCP → WS pump. Direct synchronous server.send() per chunk — minimum
// CPU overhead to keep us under the Worker CPU-time limit.
let bytesIn = 0;
let reads = 0;
let closeReason = "eof";
// Monitor connection failure asynchronously
socket.opened.catch((e) => {
console.error(`stream ${streamId} connect failed: ${e.message}`);
streams.delete(streamId);
try { writer.close(); } catch (_) {}
sendFrame(streamId, MUX_CLOSE, null);
});
// Read from TCP socket, send DATA frames back to client
try {
const reader = socket.readable.getReader();
while (!state.closed) {
while (true) {
const { done, value } = await reader.read();
if (done) break;
if (value && value.byteLength > 0) {
bytesIn += value.byteLength;
reads++;
try {
server.send(encodeMuxFrame(streamId, MUX_DATA, value.buffer.slice(value.byteOffset, value.byteOffset + value.byteLength)));
} catch (_) {
closeReason = "ws_send_err";
break;
}
// value may be Uint8Array — ensure we pass ArrayBuffer
const buf = value.buffer ? value.buffer.slice(value.byteOffset, value.byteOffset + value.byteLength) : value;
sendFrame(streamId, MUX_DATA, buf);
}
}
} catch (e) {
closeReason = `read_err: ${e.message || e}`;
// Socket closed or errored
if (streams.has(streamId)) {
console.log(`stream ${streamId} TCP read ended: ${e.message}`);
}
}
const dur = Date.now() - t0;
// Only log long-lived or erroring streams to save CPU on log serialization.
if (dur > 5000 || closeReason !== "eof") {
console.log(`[${sessionId}] stream ${streamId} CLOSE reason=${closeReason} bytesIn=${bytesIn} reads=${reads} dur=${dur}ms`);
}
// TCP disconnected — send CLOSE
streams.delete(streamId);
sendFrame(streamId, MUX_CLOSE, null);
try { writer.close(); } catch (_) {}
}
if (!state.closed) {
state.closed = true;
/**
* Close a stream and its TCP socket.
*/
function closeStream(streamId) {
const entry = streams.get(streamId);
if (entry) {
streams.delete(streamId);
sendFrame(streamId, MUX_CLOSE, null);
try { writer.close(); } catch (_) {}
try { socket.close(); } catch (_) {}
try { entry.writer.close(); } catch (_) {}
try { entry.socket.close(); } catch (_) {}
}
}
server.addEventListener("message", async (event) => {
totalMessages++;
let data;
if (event.data instanceof ArrayBuffer) {
data = event.data;
} else if (event.data instanceof Blob) {
data = await event.data.arrayBuffer();
} else {
// string message — ignore
return;
}
@ -263,74 +240,79 @@ export default {
try {
frame = decodeMuxFrame(data);
} catch (e) {
console.error(`[${sessionId}] bad mux frame #${totalMessages}: ${e.message}`);
console.error(`bad mux frame: ${e.message}`);
return;
}
// First message must be auth
if (!authenticated) {
if (frame.streamId !== 0 || frame.msgType !== MUX_AUTH) {
console.error(`[${sessionId}] first message not auth`);
try { server.close(4001, "auth required"); } catch (_) {}
if (frame.streamId !== 0 || frame.msgType !== 0x00) {
console.error("first message not auth frame");
server.close(4001, "auth required");
return;
}
const got = new Uint8Array(frame.payload);
if (got.length !== 32 || !timingSafeEqual(got, expectedHMAC)) {
console.error(`[${sessionId}] auth failed`);
try { server.close(4002, "auth failed"); } catch (_) {}
const receivedHMAC = new Uint8Array(frame.payload);
if (receivedHMAC.length !== 32 || !timingSafeEqual(receivedHMAC, expectedHMAC)) {
console.error("auth failed: bad HMAC");
server.close(4002, "auth failed");
return;
}
authenticated = true;
console.log(`[${sessionId}] authenticated`);
console.log("client authenticated");
return;
}
// Dispatch by message type
switch (frame.msgType) {
case MUX_CONNECT:
// Fire-and-forget: handleConnect awaits internally.
handleConnect(frame.streamId, frame.payload).catch(e => {
console.error(`[${sessionId}] stream ${frame.streamId} handleConnect unhandled: ${e.message || e}`);
closeStream(frame.streamId);
});
if (streams.size >= MAX_STREAMS) {
console.warn(`stream ${frame.streamId} rejected: ${streams.size}/${MAX_STREAMS} streams active`);
sendFrame(frame.streamId, MUX_CONNECT_FAIL, null);
} else {
// No await — handleConnect runs its own read loop async
handleConnect(frame.streamId, frame.payload).catch(e => {
console.error(`stream ${frame.streamId} unhandled error: ${e.message}`);
closeStream(frame.streamId);
});
}
break;
case MUX_DATA: {
const s = streams.get(frame.streamId);
if (!s || s.closed) {
// Silent drop — client closed or stream unknown (in-flight race is ok).
return;
const entry = streams.get(frame.streamId);
if (entry) {
try {
await entry.writer.write(new Uint8Array(frame.payload));
} catch (e) {
console.error(`stream ${frame.streamId} TCP write error: ${e.message}`);
closeStream(frame.streamId);
sendFrame(frame.streamId, MUX_CLOSE, null);
}
}
// Fire-and-forget TCP write; on error, close the stream asynchronously.
// Awaiting here would suspend/resume the message handler per packet,
// which costs too much CPU under heavy telegram load.
s.writer.write(new Uint8Array(frame.payload)).catch((e) => {
if (s.closed) return;
s.closed = true;
streams.delete(frame.streamId);
sendFrame(frame.streamId, MUX_CLOSE, null);
try { s.writer.close(); } catch (_) {}
try { s.socket.close(); } catch (_) {}
});
break;
}
case MUX_CLOSE:
console.log(`stream ${frame.streamId} closed by client`);
closeStream(frame.streamId);
break;
default:
console.warn(`[${sessionId}] unknown msg type 0x${frame.msgType.toString(16)} stream=${frame.streamId}`);
console.warn(`unknown msg type 0x${frame.msgType.toString(16)} stream=${frame.streamId}`);
}
});
server.addEventListener("close", (event) => {
const dur = Date.now() - sessionStart;
console.log(`[${sessionId}] WS closed code=${event.code} reason="${event.reason}" dur=${dur}ms active=${streams.size} totalConnects=${totalConnects} totalMessages=${totalMessages}`);
for (const [id] of streams) closeStream(id);
server.addEventListener("close", () => {
console.log("WS closed, cleaning up all streams");
for (const [id] of streams) {
closeStream(id);
}
});
server.addEventListener("error", (e) => {
console.error(`[${sessionId}] WS error: ${e.message || e} active=${streams.size}`);
for (const [id] of streams) closeStream(id);
console.error(`WS error: ${e.message || e}`);
for (const [id] of streams) {
closeStream(id);
}
});
return new Response(null, { status: 101, webSocket: client });

View file

@ -2,11 +2,5 @@ name = "z2k-tunnel"
main = "worker.js"
compatibility_date = "2024-01-01"
# On the Paid plan, uncomment the block below to raise the per-invocation CPU
# budget from the 30s default to the 5-minute maximum. On the Free plan this
# setting is rejected by the API, so it stays commented out.
# [limits]
# cpu_ms = 300000
[vars]
TUNNEL_SECRET = "d01f72f9543b29da4e3724b1530c0d11cb30a6f8db15bc0adfe8f2d37b5844b2"

View file

@ -1700,12 +1700,7 @@ step_finalize() {
sleep 1
# Start tunnel mode.
# --parallel=6: open 6 parallel WS sessions = 6*6 = 36 concurrent CF
# TCP slots (CF's 6-slot limit is per fetch invocation, not per client).
# --session-ttl=8s: voluntarily rotate each session before Cloudflare's
# per-invocation CPU budget is exhausted. On the Free plan this is
# required for a streaming workload; on Paid it can be relaxed to 0.
/opt/sbin/tg-mtproxy-client --listen=:1443 --parallel=6 --session-ttl=8s >> /tmp/tg-tunnel.log 2>&1 &
/opt/sbin/tg-mtproxy-client --listen=:1443 >> /tmp/tg-tunnel.log 2>&1 &
sleep 2
if pgrep -f "tg-mtproxy-client" >/dev/null 2>&1; then
@ -1754,8 +1749,7 @@ start() {
return 0
fi
echo "Starting tg-tunnel..."
# See installer comments for flag rationale.
$BIN --listen=:1443 --parallel=6 --session-ttl=8s >> "$LOG" 2>&1 &
$BIN --listen=:1443 >> "$LOG" 2>&1 &
echo $! > "$PIDFILE"
sleep 2
# Insert REDIRECT rules at TOP of PREROUTING (-I 1) so they precede

View file

@ -79,7 +79,7 @@ func getOriginalDst(conn *net.TCPConn) (net.IP, int, error) {
var ipv6Addr [16]byte
copy(ipv6Addr[0:8], raw6[8:16])
ifaceBytes := make([]byte, 4)
binary.NativeEndian.PutUint32(ifaceBytes, addr6.Interface)
binary.BigEndian.PutUint32(ifaceBytes, addr6.Interface)
copy(ipv6Addr[8:12], ifaceBytes)
// ipv6Addr[12:16] = 0 (truncated, acceptable for /48 and larger prefixes)
origIP = net.IP(ipv6Addr[:])

View file

@ -3,7 +3,10 @@ package main
import (
"flag"
"log"
"sync"
"time"
"github.com/gorilla/websocket"
)
var (
@ -12,11 +15,32 @@ var (
tunnelSecret = flag.String("tunnel-secret", "d01f72f9543b29da4e3724b1530c0d11cb30a6f8db15bc0adfe8f2d37b5844b2", "Shared secret for tunnel auth")
verbose = flag.Bool("v", false, "Verbose logging")
connTimeout = flag.Duration("timeout", 5*time.Minute, "Idle connection timeout")
maxConns = flag.Int("max-conns", 1024, "Maximum concurrent accept-side connections")
parallelWS = flag.Int("parallel", 6, "Number of parallel WebSocket sessions to the relay (each = 6 concurrent TCP slots on CF)")
sessionTTL = flag.Duration("session-ttl", 0, "Voluntarily rotate each WS session after this duration (0 = disabled). Workaround for CF CPU limit.")
maxConns = flag.Int("max-conns", 1024, "Maximum concurrent connections")
)
// connSemaphore limits concurrent connections
var connSemaphore chan struct{}
// wsWriter serializes all writes to a WebSocket connection.
// gorilla/websocket supports only one concurrent writer.
type wsWriter struct {
ws *websocket.Conn
mu sync.Mutex
}
func (w *wsWriter) WriteMessage(messageType int, data []byte) error {
w.mu.Lock()
defer w.mu.Unlock()
w.ws.SetWriteDeadline(time.Now().Add(10 * time.Second))
return w.ws.WriteMessage(messageType, data)
}
func (w *wsWriter) WriteControl(messageType int, data []byte, deadline time.Time) error {
w.mu.Lock()
defer w.mu.Unlock()
return w.ws.WriteControl(messageType, data, deadline)
}
func main() {
flag.Parse()

File diff suppressed because it is too large Load diff

View file

@ -1,336 +0,0 @@
package main
import (
"bytes"
"context"
"net"
"sync/atomic"
"testing"
"time"
)
func TestEncodeDecodeFrame(t *testing.T) {
cases := []struct {
id uint16
msgType byte
payload []byte
}{
{0, muxAUTH, bytes.Repeat([]byte{0xAA}, 32)},
{1, muxCONNECT, []byte{1, 10, 0, 0, 1, 0x01, 0xBB}},
{65535, muxDATA, []byte("hello")},
{7, muxCLOSE, nil},
{42, muxCONNECT_OK, nil},
{42, muxCONNECT_FAIL, nil},
}
for _, c := range cases {
frame := encodeMuxFrame(c.id, c.msgType, c.payload)
decoded, err := decodeMuxFrame(frame)
if err != nil {
t.Fatalf("decode failed for %+v: %v", c, err)
}
if decoded.StreamID != c.id || decoded.MsgType != c.msgType {
t.Errorf("header mismatch: got id=%d type=0x%02x want id=%d type=0x%02x",
decoded.StreamID, decoded.MsgType, c.id, c.msgType)
}
if !bytes.Equal(decoded.Payload, c.payload) && !(len(c.payload) == 0 && len(decoded.Payload) == 0) {
t.Errorf("payload mismatch: got %v want %v", decoded.Payload, c.payload)
}
}
}
func TestDecodeFrameTooShort(t *testing.T) {
if _, err := decodeMuxFrame([]byte{0, 1}); err == nil {
t.Error("expected error for short frame")
}
}
// newTestSession builds a session with a nil websocket — safe as long as the
// test doesn't invoke writePump/readPump/dial. Suitable for state-machine tests.
func newTestSession(tc *tunnelClient) *session {
return &session{
tc: tc,
idx: 0,
writeCh: make(chan []byte, 256),
pingReq: make(chan struct{}, 1),
done: make(chan struct{}),
streams: make(map[uint16]*stream),
cfGate: make(chan struct{}, cfLimit),
}
}
// newDummyTCP returns a *net.TCPConn that isn't actually connected to anything
// useful — it's a pipe side that can be closed. Only the Close path is exercised.
func newDummyTCP(t *testing.T) *net.TCPConn {
t.Helper()
// Create a loopback listener + dial to get a real *net.TCPConn.
ln, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Fatalf("listen: %v", err)
}
defer ln.Close()
type result struct {
c *net.TCPConn
e error
}
ch := make(chan result, 1)
go func() {
c, err := ln.Accept()
if err != nil {
ch <- result{nil, err}
return
}
ch <- result{c.(*net.TCPConn), nil}
}()
d := net.Dialer{Timeout: 2 * time.Second}
conn, err := d.DialContext(context.Background(), "tcp", ln.Addr().String())
if err != nil {
t.Fatalf("dial: %v", err)
}
_ = conn.Close() // we only need the server-side conn alive for Close()
r := <-ch
if r.e != nil {
t.Fatalf("accept: %v", r.e)
}
return r.c
}
func TestStreamHalfCloseStateMachine(t *testing.T) {
tc := &tunnelClient{}
s := newTestSession(tc)
// Acquire one gate slot to simulate a live stream.
s.cfGate <- struct{}{}
conn := newDummyTCP(t)
st := &stream{id: 42, conn: conn, state: stOpen}
s.mu.Lock()
s.streams[42] = st
s.mu.Unlock()
// local close first
s.localClose(st)
s.mu.Lock()
if !st.localClosed {
t.Error("localClosed should be true after localClose")
}
if st.remoteClosed {
t.Error("remoteClosed should still be false")
}
if _, exists := s.streams[42]; !exists {
t.Error("stream should still be in map during half-close grace")
}
if st.gateReleased {
t.Error("gate should not be released while remoteClosed is false")
}
s.mu.Unlock()
// now remote close arrives → both closed → should be purged and gate released
s.remoteClose(st)
s.mu.Lock()
if _, exists := s.streams[42]; exists {
t.Error("stream should be removed when both sides closed")
}
if !st.gateReleased {
t.Error("gate should be released when both sides closed")
}
s.mu.Unlock()
// gate slot must be free again
select {
case s.cfGate <- struct{}{}:
default:
t.Error("gate slot not released — all 6 occupied")
}
// Verify localClose is idempotent (must not panic, must not double-release).
s.localClose(st)
s.remoteClose(st)
}
func TestLocalCloseSendsCloseFrame(t *testing.T) {
tc := &tunnelClient{}
s := newTestSession(tc)
s.cfGate <- struct{}{}
conn := newDummyTCP(t)
st := &stream{id: 7, conn: conn, state: stOpen}
s.mu.Lock()
s.streams[7] = st
s.mu.Unlock()
go s.localClose(st)
// Drain the frame from writeCh within a timeout.
select {
case frame := <-s.writeCh:
decoded, err := decodeMuxFrame(frame)
if err != nil {
t.Fatalf("decode: %v", err)
}
if decoded.StreamID != 7 || decoded.MsgType != muxCLOSE {
t.Errorf("want CLOSE on stream 7, got id=%d type=0x%02x", decoded.StreamID, decoded.MsgType)
}
case <-time.After(2 * time.Second):
t.Fatal("no CLOSE frame enqueued within 2s")
}
}
func TestRemoteCloseDoesNotSendCloseFrame(t *testing.T) {
tc := &tunnelClient{}
s := newTestSession(tc)
s.cfGate <- struct{}{}
conn := newDummyTCP(t)
st := &stream{id: 9, conn: conn, state: stOpen}
s.mu.Lock()
s.streams[9] = st
s.mu.Unlock()
s.remoteClose(st)
select {
case f := <-s.writeCh:
t.Errorf("remoteClose should not enqueue anything, got %v", f)
case <-time.After(200 * time.Millisecond):
// ok
}
}
func TestGrimReaperGrace(t *testing.T) {
tc := &tunnelClient{}
s := newTestSession(tc)
s.cfGate <- struct{}{}
conn := newDummyTCP(t)
st := &stream{
id: 11,
conn: conn,
state: stOpen,
localClosed: true,
closeDeadline: time.Now().Add(-1 * time.Second), // already expired
}
s.mu.Lock()
s.streams[11] = st
s.mu.Unlock()
// Run one reaper sweep manually (grimReaper uses a 1s ticker; call its body).
now := time.Now()
s.mu.Lock()
for id, cur := range s.streams {
fullyClosed := cur.localClosed && cur.remoteClosed
graceExpired := !cur.closeDeadline.IsZero() && now.After(cur.closeDeadline)
if fullyClosed || graceExpired {
if !cur.gateReleased {
cur.gateReleased = true
select {
case <-s.cfGate:
default:
}
}
delete(s.streams, id)
}
}
s.mu.Unlock()
s.mu.Lock()
_, exists := s.streams[11]
s.mu.Unlock()
if exists {
t.Error("stream should have been reaped after grace expiry")
}
if !st.gateReleased {
t.Error("gate should be released by reaper")
}
}
func TestCfGateExactlyOnceRelease(t *testing.T) {
tc := &tunnelClient{}
s := newTestSession(tc)
// Fill all 6 slots with fake streams; close them all via different paths
// and confirm we still have exactly 6 free slots at the end.
streams := make([]*stream, cfLimit)
for i := 0; i < cfLimit; i++ {
s.cfGate <- struct{}{}
conn := newDummyTCP(t)
st := &stream{id: uint16(i + 1), conn: conn, state: stOpen}
s.mu.Lock()
s.streams[st.id] = st
s.mu.Unlock()
streams[i] = st
}
// Mix of close paths: both sides closed through different orderings.
s.localClose(streams[0])
s.remoteClose(streams[0])
s.remoteClose(streams[1])
s.localClose(streams[1])
// Idempotent double-call followed by the other side.
s.localClose(streams[2])
s.localClose(streams[2])
s.remoteClose(streams[2])
s.remoteClose(streams[3])
s.remoteClose(streams[3])
s.localClose(streams[3])
// Last two: simulate reaper path (half-closed, grace expired).
for _, st := range streams[4:] {
s.mu.Lock()
st.localClosed = true
st.closeDeadline = time.Now().Add(-1 * time.Second)
s.mu.Unlock()
}
now := time.Now()
s.mu.Lock()
for id, cur := range s.streams {
if !cur.closeDeadline.IsZero() && now.After(cur.closeDeadline) {
if !cur.gateReleased {
cur.gateReleased = true
select {
case <-s.cfGate:
default:
}
}
delete(s.streams, id)
}
}
s.mu.Unlock()
// Drain all 6 slots to confirm exactly 6 were released.
freed := 0
for i := 0; i < cfLimit+2; i++ {
select {
case s.cfGate <- struct{}{}:
freed++
default:
break
}
}
if freed != cfLimit {
t.Errorf("expected exactly %d free slots after cleanup, got %d", cfLimit, freed)
}
}
// Sanity check that atomic pointer swap works as tunnelClient expects.
func TestSessionPointerSwap(t *testing.T) {
var p atomic.Pointer[session]
if p.Load() != nil {
t.Error("initial should be nil")
}
s1 := &session{}
p.Store(s1)
if p.Load() != s1 {
t.Error("load mismatch")
}
p.Store(nil)
if p.Load() != nil {
t.Error("nil store failed")
}
}