diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 6aa39a5..2f54bde 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -34,7 +34,9 @@ jobs: uses: actions/setup-go@v5 with: go-version: "stable" - cache-dependency-path: mtproxy-client/go.sum + cache-dependency-path: | + mtproxy-client/go.sum + vps-relay/go.sum - name: Go vet working-directory: mtproxy-client @@ -72,6 +74,18 @@ jobs: go build -ldflags="-s -w" -o /dev/null . done + - name: Go vet (vps-relay) + working-directory: vps-relay + run: go vet ./... + + - name: Go test (vps-relay) + working-directory: vps-relay + run: go test ./... + + - name: Go build (vps-relay) + working-directory: vps-relay + run: go build ./... + lua-check: name: Luacheck runs-on: ubuntu-latest diff --git a/.luacheckrc b/.luacheckrc index 052811f..810e730 100644 --- a/.luacheckrc +++ b/.luacheckrc @@ -8,6 +8,7 @@ max_line_length = false globals = { -- Desync action entry points (registered by z2k) "z2k_tls_alert_fatal", + "z2k_tls_stalled", "z2k_success_no_reset", "z2k_tls_extshuffle", "z2k_tls_fp_pack_v2", diff --git a/lib/install.sh b/lib/install.sh index a70f0d1..be67938 100644 --- a/lib/install.sh +++ b/lib/install.sh @@ -1704,7 +1704,7 @@ step_finalize() { # Start tunnel mode. -v enables stream-level logs needed by the # watchdog's stale-detection mode. - /opt/sbin/tg-mtproxy-client --listen=:1443 -v >> /tmp/tg-tunnel.log 2>&1 & + /opt/sbin/tg-mtproxy-client --listen=:1443 --timeout=15m -v >> /tmp/tg-tunnel.log 2>&1 & sleep 2 if pgrep -f "tg-mtproxy-client" >/dev/null 2>&1; then @@ -1837,7 +1837,7 @@ start() { return 0 fi echo "Starting tg-tunnel..." - $BIN --listen=:1443 -v >> "$LOG" 2>&1 & + $BIN --listen=:1443 --timeout=15m -v >> "$LOG" 2>&1 & echo $! > "$PIDFILE" sleep 2 # Insert REDIRECT rules at TOP of both PREROUTING and OUTPUT (-I 1) so diff --git a/mtproxy-client/builds/tg-mtproxy-client-linux-amd64 b/mtproxy-client/builds/tg-mtproxy-client-linux-amd64 index eab6e45..455fb55 100755 Binary files a/mtproxy-client/builds/tg-mtproxy-client-linux-amd64 and b/mtproxy-client/builds/tg-mtproxy-client-linux-amd64 differ diff --git a/mtproxy-client/builds/tg-mtproxy-client-linux-arm b/mtproxy-client/builds/tg-mtproxy-client-linux-arm index 8b1f884..14026df 100755 Binary files a/mtproxy-client/builds/tg-mtproxy-client-linux-arm and b/mtproxy-client/builds/tg-mtproxy-client-linux-arm differ diff --git a/mtproxy-client/builds/tg-mtproxy-client-linux-arm64 b/mtproxy-client/builds/tg-mtproxy-client-linux-arm64 index 22b30d0..f2ecd95 100755 Binary files a/mtproxy-client/builds/tg-mtproxy-client-linux-arm64 and b/mtproxy-client/builds/tg-mtproxy-client-linux-arm64 differ diff --git a/mtproxy-client/builds/tg-mtproxy-client-linux-mips b/mtproxy-client/builds/tg-mtproxy-client-linux-mips index 23f93bd..b5931c8 100755 Binary files a/mtproxy-client/builds/tg-mtproxy-client-linux-mips and b/mtproxy-client/builds/tg-mtproxy-client-linux-mips differ diff --git a/mtproxy-client/builds/tg-mtproxy-client-linux-mips64el b/mtproxy-client/builds/tg-mtproxy-client-linux-mips64el index 52fe1ed..ec2c293 100755 Binary files a/mtproxy-client/builds/tg-mtproxy-client-linux-mips64el and b/mtproxy-client/builds/tg-mtproxy-client-linux-mips64el differ diff --git a/mtproxy-client/builds/tg-mtproxy-client-linux-mipsel b/mtproxy-client/builds/tg-mtproxy-client-linux-mipsel index 419c04e..9e7ea94 100755 Binary files a/mtproxy-client/builds/tg-mtproxy-client-linux-mipsel and b/mtproxy-client/builds/tg-mtproxy-client-linux-mipsel differ diff --git a/mtproxy-client/builds/tg-mtproxy-client-linux-ppc64 b/mtproxy-client/builds/tg-mtproxy-client-linux-ppc64 index 00d4805..9a96878 100755 Binary files a/mtproxy-client/builds/tg-mtproxy-client-linux-ppc64 and b/mtproxy-client/builds/tg-mtproxy-client-linux-ppc64 differ diff --git a/mtproxy-client/builds/tg-mtproxy-client-linux-riscv64 b/mtproxy-client/builds/tg-mtproxy-client-linux-riscv64 index 9f8fca1..9908259 100755 Binary files a/mtproxy-client/builds/tg-mtproxy-client-linux-riscv64 and b/mtproxy-client/builds/tg-mtproxy-client-linux-riscv64 differ diff --git a/mtproxy-client/builds/tg-mtproxy-client-linux-x86 b/mtproxy-client/builds/tg-mtproxy-client-linux-x86 index 9a7cc22..7423114 100755 Binary files a/mtproxy-client/builds/tg-mtproxy-client-linux-x86 and b/mtproxy-client/builds/tg-mtproxy-client-linux-x86 differ diff --git a/mtproxy-client/main.go b/mtproxy-client/main.go index 037d3d6..15af86c 100644 --- a/mtproxy-client/main.go +++ b/mtproxy-client/main.go @@ -14,7 +14,7 @@ var ( tunnelURL = flag.String("tunnel-url", "wss://213.176.74.63.nip.io/ws", "Tunnel relay WebSocket URL") tunnelSecret = flag.String("tunnel-secret", "d01f72f9543b29da4e3724b1530c0d11cb30a6f8db15bc0adfe8f2d37b5844b2", "Shared secret for tunnel auth") verbose = flag.Bool("v", false, "Verbose logging") - connTimeout = flag.Duration("timeout", 5*time.Minute, "Idle connection timeout") + connTimeout = flag.Duration("timeout", 15*time.Minute, "Idle connection timeout") maxConns = flag.Int("max-conns", 1024, "Maximum concurrent connections") ) diff --git a/mtproxy-client/tunnel.go b/mtproxy-client/tunnel.go index f175ec9..229e6b4 100644 --- a/mtproxy-client/tunnel.go +++ b/mtproxy-client/tunnel.go @@ -1,7 +1,6 @@ package main import ( - "bufio" "context" "crypto/hmac" "crypto/sha256" @@ -30,6 +29,11 @@ const ( muxCONNECT_FAIL = 0x05 ) +const ( + wsPingInterval = 30 * time.Second + wsReadTimeout = 90 * time.Second +) + // Address types for CONNECT payload const ( addrIPv4 = 1 @@ -90,6 +94,19 @@ func computeAuthHMAC(secret string) []byte { return mac.Sum(nil) } +func configureWSKeepalive(ws *websocket.Conn) { + ws.SetReadLimit(2 * 1024 * 1024) + _ = ws.SetReadDeadline(time.Now().Add(wsReadTimeout)) + ws.SetPongHandler(func(string) error { + _ = ws.SetReadDeadline(time.Now().Add(wsReadTimeout)) + return nil + }) + ws.SetPingHandler(func(data string) error { + _ = ws.SetReadDeadline(time.Now().Add(wsReadTimeout)) + return ws.WriteControl(websocket.PongMessage, []byte(data), time.Now().Add(5*time.Second)) + }) +} + // tunnelClient manages the multiplexed WS tunnel. type tunnelClient struct { tunnelURL string @@ -97,10 +114,10 @@ type tunnelClient struct { ws *websocket.Conn writer *wsWriter - streams sync.Map // uint16 → *tunnelStream + streams sync.Map // uint16 → *tunnelStream nextID atomic.Uint32 - mu sync.Mutex // protects ws/writer replacement during reconnect - connectSem chan struct{} // limits concurrent CONNECT to CF Workers limit + mu sync.Mutex // protects ws/writer replacement during reconnect + connectSem chan struct{} // limits concurrent in-flight CONNECTs — 6 keeps SYN rate under TG DC burst threshold ctx context.Context cancel context.CancelFunc } @@ -139,10 +156,9 @@ func (tc *tunnelClient) connectTunnelWS() (*websocket.Conn, error) { InsecureSkipVerify: false, }, HandshakeTimeout: 10 * time.Second, - Subprotocols: []string{"binary"}, - ReadBufferSize: 128 * 1024, - WriteBufferSize: 128 * 1024, - EnableCompression: true, + ReadBufferSize: 256 * 1024, + WriteBufferSize: 256 * 1024, + EnableCompression: false, NetDial: func(network, addr string) (net.Conn, error) { // Force IPv4 — IPv6 to Cloudflare is unstable on some ISPs conn, err := net.DialTimeout("tcp4", addr, 10*time.Second) @@ -161,7 +177,7 @@ func (tc *tunnelClient) connectTunnelWS() (*websocket.Conn, error) { if err != nil { return nil, fmt.Errorf("WS dial %s: %w", tc.tunnelURL, err) } - ws.SetReadLimit(2 * 1024 * 1024) + configureWSKeepalive(ws) // Send auth message: [0x00 0x00][0x00][hmac_32_bytes] authMAC := computeAuthHMAC(tc.tunnelSecret) @@ -187,6 +203,7 @@ func (tc *tunnelClient) closeAllStreams() { // readLoop reads mux frames from the WS and dispatches to streams. func (tc *tunnelClient) readLoop(ws *websocket.Conn) { + configureWSKeepalive(ws) for { _, msg, err := ws.ReadMessage() if err != nil { @@ -261,11 +278,10 @@ func (tc *tunnelClient) readLoop(ws *websocket.Conn) { func (tc *tunnelClient) streamReadLoop(stream *tunnelStream) { defer stream.close() - reader := bufio.NewReaderSize(stream.conn, 64*1024) buf := make([]byte, 64*1024) for { - n, err := reader.Read(buf) + n, err := stream.conn.Read(buf) if n > 0 { stream.conn.SetDeadline(time.Now().Add(*connTimeout)) frame := encodeMuxFrame(stream.id, muxDATA, buf[:n]) @@ -326,12 +342,12 @@ func (tc *tunnelClient) run() { connectedAt := time.Now() - // Keepalive: ping every 50s (CF kills idle WS after 100s) + // Keepalive: ping every 30s (symmetric with server) wsDone := make(chan struct{}) pingDone := make(chan struct{}) go func() { defer close(pingDone) - ticker := time.NewTicker(50 * time.Second) + ticker := time.NewTicker(wsPingInterval) defer ticker.Stop() for { select { @@ -340,7 +356,13 @@ func (tc *tunnelClient) run() { w := tc.writer tc.mu.Unlock() if w != nil { - w.WriteControl(websocket.PingMessage, nil, time.Now().Add(5*time.Second)) + if err := w.WriteControl(websocket.PingMessage, nil, time.Now().Add(5*time.Second)); err != nil { + if *verbose { + log.Printf("[tunnel] ping failed: %v", err) + } + _ = ws.Close() + return + } } case <-wsDone: return @@ -465,7 +487,7 @@ func (tc *tunnelClient) handleTunnelConn(clientConn *net.TCPConn) { log.Printf("[tunnel] stream %d: %s -> %s:%d", streamID, clientConn.RemoteAddr(), origIP, origPort) } - // Rate-limit concurrent CONNECTs to stay within CF Workers 6-connection limit + // Rate-limit concurrent in-flight CONNECTs — TG DC throttles SYN bursts from single IP select { case tc.connectSem <- struct{}{}: case <-time.After(10 * time.Second): diff --git a/vps-relay/main.go b/vps-relay/main.go index 14b7184..1270fde 100644 --- a/vps-relay/main.go +++ b/vps-relay/main.go @@ -1,12 +1,13 @@ // vps-relay: TCP-over-WebSocket relay for the z2k tunnel client. // // Wire protocol (identical to cf-worker/worker.js): -// [streamId u16 BE][msgType u8][payload] -// Types: AUTH=0x00, CONNECT=0x01, DATA=0x02, CLOSE=0x03, -// CONNECT_OK=0x04, CONNECT_FAIL=0x05 -// Auth: streamId=0, type=0x00, payload = HMAC-SHA256(secret, secret) (32 bytes) -// CONNECT payload: [addr_type u8][addr][port u16 BE] -// addr_type 1 = IPv4 (4 bytes), 4 = IPv6 (16 bytes) +// +// [streamId u16 BE][msgType u8][payload] +// Types: AUTH=0x00, CONNECT=0x01, DATA=0x02, CLOSE=0x03, +// CONNECT_OK=0x04, CONNECT_FAIL=0x05 +// Auth: streamId=0, type=0x00, payload = HMAC-SHA256(secret, secret) (32 bytes) +// CONNECT payload: [addr_type u8][addr][port u16 BE] +// addr_type 1 = IPv4 (4 bytes), 4 = IPv6 (16 bytes) // // No CF-style constraints: no 6-socket cap, no 10ms CPU limit, sessions // live as long as the WebSocket stays up. @@ -23,8 +24,11 @@ import ( "log" "net" "net/http" + "os" + "os/signal" "strconv" "sync" + "syscall" "time" "github.com/gorilla/websocket" @@ -42,6 +46,11 @@ const ( addrIPv6 = 4 ) +const ( + sessionWriteQueueDepth = 256 + sessionWriteQueueBytes = 8 * 1024 * 1024 +) + var ( listenAddr = flag.String("listen", ":8080", "HTTP listen address (TLS terminated upstream by Caddy)") secret = flag.String("secret", "", "shared HMAC secret (must match tunnel client)") @@ -168,8 +177,10 @@ type session struct { done chan struct{} once sync.Once - mu sync.Mutex - streams map[uint16]*stream + mu sync.Mutex + queueMu sync.Mutex + queuedBytes int + streams map[uint16]*stream } type stream struct { @@ -182,7 +193,7 @@ func newSession(ws *websocket.Conn, id string) *session { return &session{ id: id, ws: ws, - writeCh: make(chan []byte, 512), + writeCh: make(chan []byte, sessionWriteQueueDepth), done: make(chan struct{}), streams: make(map[uint16]*stream), } @@ -201,14 +212,43 @@ func (s *session) kill() { } s.streams = nil s.mu.Unlock() + s.queueMu.Lock() + s.queuedBytes = 0 + s.queueMu.Unlock() }) } +func (s *session) reserveQueue(frameLen int) bool { + s.queueMu.Lock() + defer s.queueMu.Unlock() + if s.queuedBytes+frameLen > sessionWriteQueueBytes { + return false + } + s.queuedBytes += frameLen + return true +} + +func (s *session) releaseQueue(frameLen int) { + s.queueMu.Lock() + defer s.queueMu.Unlock() + s.queuedBytes -= frameLen + if s.queuedBytes < 0 { + s.queuedBytes = 0 + } +} + func (s *session) send(frame []byte) { + if !s.reserveQueue(len(frame)) { + log.Printf("[%s] write queue exceeded %d bytes, killing session", s.id, sessionWriteQueueBytes) + go s.kill() + return + } select { case s.writeCh <- frame: case <-s.done: + s.releaseQueue(len(frame)) default: + s.releaseQueue(len(frame)) // Writer backpressure: drop the session if we can't keep up. log.Printf("[%s] writeCh full, killing session", s.id) go s.kill() @@ -216,11 +256,12 @@ func (s *session) send(frame []byte) { } func (s *session) writePump() { - ticker := time.NewTicker(25 * time.Second) + ticker := time.NewTicker(30 * time.Second) defer ticker.Stop() for { select { case frame := <-s.writeCh: + s.releaseQueue(len(frame)) _ = s.ws.SetWriteDeadline(time.Now().Add(10 * time.Second)) if err := s.ws.WriteMessage(websocket.BinaryMessage, frame); err != nil { if *verbose { @@ -303,7 +344,7 @@ func (s *session) handleConnect(id uint16, payload []byte) { // Pump TCP → WS go func() { - buf := make([]byte, 32*1024) + buf := make([]byte, 64*1024) for { n, err := conn.Read(buf) if n > 0 { @@ -333,7 +374,7 @@ func (s *session) handleConnect(id uint16, payload []byte) { func (s *session) readPump() { defer s.kill() - s.ws.SetReadLimit(4 * 1024 * 1024) + s.ws.SetReadLimit(2 * 1024 * 1024) _ = s.ws.SetReadDeadline(time.Now().Add(90 * time.Second)) s.ws.SetPongHandler(func(string) error { _ = s.ws.SetReadDeadline(time.Now().Add(90 * time.Second)) @@ -409,8 +450,8 @@ func (s *session) readPump() { } var upgrader = websocket.Upgrader{ - ReadBufferSize: 128 * 1024, - WriteBufferSize: 128 * 1024, + ReadBufferSize: 256 * 1024, + WriteBufferSize: 256 * 1024, CheckOrigin: func(r *http.Request) bool { return true }, EnableCompression: false, } @@ -462,17 +503,30 @@ func main() { } // Graceful shutdown - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() + ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) + defer stop() + serveErr := make(chan error, 1) go func() { - <-ctx.Done() - sdCtx, sdCancel := context.WithTimeout(context.Background(), 5*time.Second) - defer sdCancel() - _ = srv.Shutdown(sdCtx) + serveErr <- srv.ListenAndServe() }() log.Printf("z2k vps-relay listening on %s", *listenAddr) - if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed { - log.Fatal(err) + select { + case err := <-serveErr: + if err != nil && err != http.ErrServerClosed { + log.Fatal(err) + } + case <-ctx.Done(): + log.Printf("shutdown requested, draining active connections") + sdCtx, sdCancel := context.WithTimeout(context.Background(), 5*time.Second) + defer sdCancel() + if err := srv.Shutdown(sdCtx); err != nil { + log.Printf("graceful shutdown failed: %v", err) + _ = srv.Close() + } + if err := <-serveErr; err != nil && err != http.ErrServerClosed { + log.Fatal(err) + } + log.Printf("server stopped") } } diff --git a/z2k.sh b/z2k.sh index 896d909..0bf8f00 100644 --- a/z2k.sh +++ b/z2k.sh @@ -674,7 +674,7 @@ update_z2k() { sleep 1 cp "$tg_tmp" /opt/sbin/tg-mtproxy-client chmod +x /opt/sbin/tg-mtproxy-client - /opt/sbin/tg-mtproxy-client --listen=:1443 >> /tmp/tg-tunnel.log 2>&1 & + /opt/sbin/tg-mtproxy-client --listen=:1443 --timeout=15m >> /tmp/tg-tunnel.log 2>&1 & sleep 2 if pgrep -f "tg-mtproxy-client" >/dev/null 2>&1; then print_success "Telegram tunnel обновлён и перезапущен"