tg tunnel: backport keepalive and relay fixes
Some checks are pending
CI / ShellCheck (push) Waiting to run
CI / Go Build & Test (push) Waiting to run
CI / Luacheck (push) Waiting to run

This commit is contained in:
Necronicle 2026-04-22 00:17:23 +03:00
parent 1133af483e
commit e3b794dd74
16 changed files with 133 additions and 42 deletions

View file

@ -34,7 +34,9 @@ jobs:
uses: actions/setup-go@v5 uses: actions/setup-go@v5
with: with:
go-version: "stable" go-version: "stable"
cache-dependency-path: mtproxy-client/go.sum cache-dependency-path: |
mtproxy-client/go.sum
vps-relay/go.sum
- name: Go vet - name: Go vet
working-directory: mtproxy-client working-directory: mtproxy-client
@ -72,6 +74,18 @@ jobs:
go build -ldflags="-s -w" -o /dev/null . go build -ldflags="-s -w" -o /dev/null .
done done
- name: Go vet (vps-relay)
working-directory: vps-relay
run: go vet ./...
- name: Go test (vps-relay)
working-directory: vps-relay
run: go test ./...
- name: Go build (vps-relay)
working-directory: vps-relay
run: go build ./...
lua-check: lua-check:
name: Luacheck name: Luacheck
runs-on: ubuntu-latest runs-on: ubuntu-latest

View file

@ -8,6 +8,7 @@ max_line_length = false
globals = { globals = {
-- Desync action entry points (registered by z2k) -- Desync action entry points (registered by z2k)
"z2k_tls_alert_fatal", "z2k_tls_alert_fatal",
"z2k_tls_stalled",
"z2k_success_no_reset", "z2k_success_no_reset",
"z2k_tls_extshuffle", "z2k_tls_extshuffle",
"z2k_tls_fp_pack_v2", "z2k_tls_fp_pack_v2",

View file

@ -1704,7 +1704,7 @@ step_finalize() {
# Start tunnel mode. -v enables stream-level logs needed by the # Start tunnel mode. -v enables stream-level logs needed by the
# watchdog's stale-detection mode. # watchdog's stale-detection mode.
/opt/sbin/tg-mtproxy-client --listen=:1443 -v >> /tmp/tg-tunnel.log 2>&1 & /opt/sbin/tg-mtproxy-client --listen=:1443 --timeout=15m -v >> /tmp/tg-tunnel.log 2>&1 &
sleep 2 sleep 2
if pgrep -f "tg-mtproxy-client" >/dev/null 2>&1; then if pgrep -f "tg-mtproxy-client" >/dev/null 2>&1; then
@ -1837,7 +1837,7 @@ start() {
return 0 return 0
fi fi
echo "Starting tg-tunnel..." echo "Starting tg-tunnel..."
$BIN --listen=:1443 -v >> "$LOG" 2>&1 & $BIN --listen=:1443 --timeout=15m -v >> "$LOG" 2>&1 &
echo $! > "$PIDFILE" echo $! > "$PIDFILE"
sleep 2 sleep 2
# Insert REDIRECT rules at TOP of both PREROUTING and OUTPUT (-I 1) so # Insert REDIRECT rules at TOP of both PREROUTING and OUTPUT (-I 1) so

View file

@ -14,7 +14,7 @@ var (
tunnelURL = flag.String("tunnel-url", "wss://213.176.74.63.nip.io/ws", "Tunnel relay WebSocket URL") tunnelURL = flag.String("tunnel-url", "wss://213.176.74.63.nip.io/ws", "Tunnel relay WebSocket URL")
tunnelSecret = flag.String("tunnel-secret", "d01f72f9543b29da4e3724b1530c0d11cb30a6f8db15bc0adfe8f2d37b5844b2", "Shared secret for tunnel auth") tunnelSecret = flag.String("tunnel-secret", "d01f72f9543b29da4e3724b1530c0d11cb30a6f8db15bc0adfe8f2d37b5844b2", "Shared secret for tunnel auth")
verbose = flag.Bool("v", false, "Verbose logging") verbose = flag.Bool("v", false, "Verbose logging")
connTimeout = flag.Duration("timeout", 5*time.Minute, "Idle connection timeout") connTimeout = flag.Duration("timeout", 15*time.Minute, "Idle connection timeout")
maxConns = flag.Int("max-conns", 1024, "Maximum concurrent connections") maxConns = flag.Int("max-conns", 1024, "Maximum concurrent connections")
) )

View file

@ -1,7 +1,6 @@
package main package main
import ( import (
"bufio"
"context" "context"
"crypto/hmac" "crypto/hmac"
"crypto/sha256" "crypto/sha256"
@ -30,6 +29,11 @@ const (
muxCONNECT_FAIL = 0x05 muxCONNECT_FAIL = 0x05
) )
const (
wsPingInterval = 30 * time.Second
wsReadTimeout = 90 * time.Second
)
// Address types for CONNECT payload // Address types for CONNECT payload
const ( const (
addrIPv4 = 1 addrIPv4 = 1
@ -90,6 +94,19 @@ func computeAuthHMAC(secret string) []byte {
return mac.Sum(nil) return mac.Sum(nil)
} }
func configureWSKeepalive(ws *websocket.Conn) {
ws.SetReadLimit(2 * 1024 * 1024)
_ = ws.SetReadDeadline(time.Now().Add(wsReadTimeout))
ws.SetPongHandler(func(string) error {
_ = ws.SetReadDeadline(time.Now().Add(wsReadTimeout))
return nil
})
ws.SetPingHandler(func(data string) error {
_ = ws.SetReadDeadline(time.Now().Add(wsReadTimeout))
return ws.WriteControl(websocket.PongMessage, []byte(data), time.Now().Add(5*time.Second))
})
}
// tunnelClient manages the multiplexed WS tunnel. // tunnelClient manages the multiplexed WS tunnel.
type tunnelClient struct { type tunnelClient struct {
tunnelURL string tunnelURL string
@ -97,10 +114,10 @@ type tunnelClient struct {
ws *websocket.Conn ws *websocket.Conn
writer *wsWriter writer *wsWriter
streams sync.Map // uint16 → *tunnelStream streams sync.Map // uint16 → *tunnelStream
nextID atomic.Uint32 nextID atomic.Uint32
mu sync.Mutex // protects ws/writer replacement during reconnect mu sync.Mutex // protects ws/writer replacement during reconnect
connectSem chan struct{} // limits concurrent CONNECT to CF Workers limit connectSem chan struct{} // limits concurrent in-flight CONNECTs — 6 keeps SYN rate under TG DC burst threshold
ctx context.Context ctx context.Context
cancel context.CancelFunc cancel context.CancelFunc
} }
@ -139,10 +156,9 @@ func (tc *tunnelClient) connectTunnelWS() (*websocket.Conn, error) {
InsecureSkipVerify: false, InsecureSkipVerify: false,
}, },
HandshakeTimeout: 10 * time.Second, HandshakeTimeout: 10 * time.Second,
Subprotocols: []string{"binary"}, ReadBufferSize: 256 * 1024,
ReadBufferSize: 128 * 1024, WriteBufferSize: 256 * 1024,
WriteBufferSize: 128 * 1024, EnableCompression: false,
EnableCompression: true,
NetDial: func(network, addr string) (net.Conn, error) { NetDial: func(network, addr string) (net.Conn, error) {
// Force IPv4 — IPv6 to Cloudflare is unstable on some ISPs // Force IPv4 — IPv6 to Cloudflare is unstable on some ISPs
conn, err := net.DialTimeout("tcp4", addr, 10*time.Second) conn, err := net.DialTimeout("tcp4", addr, 10*time.Second)
@ -161,7 +177,7 @@ func (tc *tunnelClient) connectTunnelWS() (*websocket.Conn, error) {
if err != nil { if err != nil {
return nil, fmt.Errorf("WS dial %s: %w", tc.tunnelURL, err) return nil, fmt.Errorf("WS dial %s: %w", tc.tunnelURL, err)
} }
ws.SetReadLimit(2 * 1024 * 1024) configureWSKeepalive(ws)
// Send auth message: [0x00 0x00][0x00][hmac_32_bytes] // Send auth message: [0x00 0x00][0x00][hmac_32_bytes]
authMAC := computeAuthHMAC(tc.tunnelSecret) authMAC := computeAuthHMAC(tc.tunnelSecret)
@ -187,6 +203,7 @@ func (tc *tunnelClient) closeAllStreams() {
// readLoop reads mux frames from the WS and dispatches to streams. // readLoop reads mux frames from the WS and dispatches to streams.
func (tc *tunnelClient) readLoop(ws *websocket.Conn) { func (tc *tunnelClient) readLoop(ws *websocket.Conn) {
configureWSKeepalive(ws)
for { for {
_, msg, err := ws.ReadMessage() _, msg, err := ws.ReadMessage()
if err != nil { if err != nil {
@ -261,11 +278,10 @@ func (tc *tunnelClient) readLoop(ws *websocket.Conn) {
func (tc *tunnelClient) streamReadLoop(stream *tunnelStream) { func (tc *tunnelClient) streamReadLoop(stream *tunnelStream) {
defer stream.close() defer stream.close()
reader := bufio.NewReaderSize(stream.conn, 64*1024)
buf := make([]byte, 64*1024) buf := make([]byte, 64*1024)
for { for {
n, err := reader.Read(buf) n, err := stream.conn.Read(buf)
if n > 0 { if n > 0 {
stream.conn.SetDeadline(time.Now().Add(*connTimeout)) stream.conn.SetDeadline(time.Now().Add(*connTimeout))
frame := encodeMuxFrame(stream.id, muxDATA, buf[:n]) frame := encodeMuxFrame(stream.id, muxDATA, buf[:n])
@ -326,12 +342,12 @@ func (tc *tunnelClient) run() {
connectedAt := time.Now() connectedAt := time.Now()
// Keepalive: ping every 50s (CF kills idle WS after 100s) // Keepalive: ping every 30s (symmetric with server)
wsDone := make(chan struct{}) wsDone := make(chan struct{})
pingDone := make(chan struct{}) pingDone := make(chan struct{})
go func() { go func() {
defer close(pingDone) defer close(pingDone)
ticker := time.NewTicker(50 * time.Second) ticker := time.NewTicker(wsPingInterval)
defer ticker.Stop() defer ticker.Stop()
for { for {
select { select {
@ -340,7 +356,13 @@ func (tc *tunnelClient) run() {
w := tc.writer w := tc.writer
tc.mu.Unlock() tc.mu.Unlock()
if w != nil { if w != nil {
w.WriteControl(websocket.PingMessage, nil, time.Now().Add(5*time.Second)) if err := w.WriteControl(websocket.PingMessage, nil, time.Now().Add(5*time.Second)); err != nil {
if *verbose {
log.Printf("[tunnel] ping failed: %v", err)
}
_ = ws.Close()
return
}
} }
case <-wsDone: case <-wsDone:
return return
@ -465,7 +487,7 @@ func (tc *tunnelClient) handleTunnelConn(clientConn *net.TCPConn) {
log.Printf("[tunnel] stream %d: %s -> %s:%d", streamID, clientConn.RemoteAddr(), origIP, origPort) log.Printf("[tunnel] stream %d: %s -> %s:%d", streamID, clientConn.RemoteAddr(), origIP, origPort)
} }
// Rate-limit concurrent CONNECTs to stay within CF Workers 6-connection limit // Rate-limit concurrent in-flight CONNECTs — TG DC throttles SYN bursts from single IP
select { select {
case tc.connectSem <- struct{}{}: case tc.connectSem <- struct{}{}:
case <-time.After(10 * time.Second): case <-time.After(10 * time.Second):

View file

@ -1,12 +1,13 @@
// vps-relay: TCP-over-WebSocket relay for the z2k tunnel client. // vps-relay: TCP-over-WebSocket relay for the z2k tunnel client.
// //
// Wire protocol (identical to cf-worker/worker.js): // Wire protocol (identical to cf-worker/worker.js):
// [streamId u16 BE][msgType u8][payload] //
// Types: AUTH=0x00, CONNECT=0x01, DATA=0x02, CLOSE=0x03, // [streamId u16 BE][msgType u8][payload]
// CONNECT_OK=0x04, CONNECT_FAIL=0x05 // Types: AUTH=0x00, CONNECT=0x01, DATA=0x02, CLOSE=0x03,
// Auth: streamId=0, type=0x00, payload = HMAC-SHA256(secret, secret) (32 bytes) // CONNECT_OK=0x04, CONNECT_FAIL=0x05
// CONNECT payload: [addr_type u8][addr][port u16 BE] // Auth: streamId=0, type=0x00, payload = HMAC-SHA256(secret, secret) (32 bytes)
// addr_type 1 = IPv4 (4 bytes), 4 = IPv6 (16 bytes) // CONNECT payload: [addr_type u8][addr][port u16 BE]
// addr_type 1 = IPv4 (4 bytes), 4 = IPv6 (16 bytes)
// //
// No CF-style constraints: no 6-socket cap, no 10ms CPU limit, sessions // No CF-style constraints: no 6-socket cap, no 10ms CPU limit, sessions
// live as long as the WebSocket stays up. // live as long as the WebSocket stays up.
@ -23,8 +24,11 @@ import (
"log" "log"
"net" "net"
"net/http" "net/http"
"os"
"os/signal"
"strconv" "strconv"
"sync" "sync"
"syscall"
"time" "time"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
@ -42,6 +46,11 @@ const (
addrIPv6 = 4 addrIPv6 = 4
) )
const (
sessionWriteQueueDepth = 256
sessionWriteQueueBytes = 8 * 1024 * 1024
)
var ( var (
listenAddr = flag.String("listen", ":8080", "HTTP listen address (TLS terminated upstream by Caddy)") listenAddr = flag.String("listen", ":8080", "HTTP listen address (TLS terminated upstream by Caddy)")
secret = flag.String("secret", "", "shared HMAC secret (must match tunnel client)") secret = flag.String("secret", "", "shared HMAC secret (must match tunnel client)")
@ -168,8 +177,10 @@ type session struct {
done chan struct{} done chan struct{}
once sync.Once once sync.Once
mu sync.Mutex mu sync.Mutex
streams map[uint16]*stream queueMu sync.Mutex
queuedBytes int
streams map[uint16]*stream
} }
type stream struct { type stream struct {
@ -182,7 +193,7 @@ func newSession(ws *websocket.Conn, id string) *session {
return &session{ return &session{
id: id, id: id,
ws: ws, ws: ws,
writeCh: make(chan []byte, 512), writeCh: make(chan []byte, sessionWriteQueueDepth),
done: make(chan struct{}), done: make(chan struct{}),
streams: make(map[uint16]*stream), streams: make(map[uint16]*stream),
} }
@ -201,14 +212,43 @@ func (s *session) kill() {
} }
s.streams = nil s.streams = nil
s.mu.Unlock() s.mu.Unlock()
s.queueMu.Lock()
s.queuedBytes = 0
s.queueMu.Unlock()
}) })
} }
func (s *session) reserveQueue(frameLen int) bool {
s.queueMu.Lock()
defer s.queueMu.Unlock()
if s.queuedBytes+frameLen > sessionWriteQueueBytes {
return false
}
s.queuedBytes += frameLen
return true
}
func (s *session) releaseQueue(frameLen int) {
s.queueMu.Lock()
defer s.queueMu.Unlock()
s.queuedBytes -= frameLen
if s.queuedBytes < 0 {
s.queuedBytes = 0
}
}
func (s *session) send(frame []byte) { func (s *session) send(frame []byte) {
if !s.reserveQueue(len(frame)) {
log.Printf("[%s] write queue exceeded %d bytes, killing session", s.id, sessionWriteQueueBytes)
go s.kill()
return
}
select { select {
case s.writeCh <- frame: case s.writeCh <- frame:
case <-s.done: case <-s.done:
s.releaseQueue(len(frame))
default: default:
s.releaseQueue(len(frame))
// Writer backpressure: drop the session if we can't keep up. // Writer backpressure: drop the session if we can't keep up.
log.Printf("[%s] writeCh full, killing session", s.id) log.Printf("[%s] writeCh full, killing session", s.id)
go s.kill() go s.kill()
@ -216,11 +256,12 @@ func (s *session) send(frame []byte) {
} }
func (s *session) writePump() { func (s *session) writePump() {
ticker := time.NewTicker(25 * time.Second) ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop() defer ticker.Stop()
for { for {
select { select {
case frame := <-s.writeCh: case frame := <-s.writeCh:
s.releaseQueue(len(frame))
_ = s.ws.SetWriteDeadline(time.Now().Add(10 * time.Second)) _ = s.ws.SetWriteDeadline(time.Now().Add(10 * time.Second))
if err := s.ws.WriteMessage(websocket.BinaryMessage, frame); err != nil { if err := s.ws.WriteMessage(websocket.BinaryMessage, frame); err != nil {
if *verbose { if *verbose {
@ -303,7 +344,7 @@ func (s *session) handleConnect(id uint16, payload []byte) {
// Pump TCP → WS // Pump TCP → WS
go func() { go func() {
buf := make([]byte, 32*1024) buf := make([]byte, 64*1024)
for { for {
n, err := conn.Read(buf) n, err := conn.Read(buf)
if n > 0 { if n > 0 {
@ -333,7 +374,7 @@ func (s *session) handleConnect(id uint16, payload []byte) {
func (s *session) readPump() { func (s *session) readPump() {
defer s.kill() defer s.kill()
s.ws.SetReadLimit(4 * 1024 * 1024) s.ws.SetReadLimit(2 * 1024 * 1024)
_ = s.ws.SetReadDeadline(time.Now().Add(90 * time.Second)) _ = s.ws.SetReadDeadline(time.Now().Add(90 * time.Second))
s.ws.SetPongHandler(func(string) error { s.ws.SetPongHandler(func(string) error {
_ = s.ws.SetReadDeadline(time.Now().Add(90 * time.Second)) _ = s.ws.SetReadDeadline(time.Now().Add(90 * time.Second))
@ -409,8 +450,8 @@ func (s *session) readPump() {
} }
var upgrader = websocket.Upgrader{ var upgrader = websocket.Upgrader{
ReadBufferSize: 128 * 1024, ReadBufferSize: 256 * 1024,
WriteBufferSize: 128 * 1024, WriteBufferSize: 256 * 1024,
CheckOrigin: func(r *http.Request) bool { return true }, CheckOrigin: func(r *http.Request) bool { return true },
EnableCompression: false, EnableCompression: false,
} }
@ -462,17 +503,30 @@ func main() {
} }
// Graceful shutdown // Graceful shutdown
ctx, cancel := context.WithCancel(context.Background()) ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
defer cancel() defer stop()
serveErr := make(chan error, 1)
go func() { go func() {
<-ctx.Done() serveErr <- srv.ListenAndServe()
sdCtx, sdCancel := context.WithTimeout(context.Background(), 5*time.Second)
defer sdCancel()
_ = srv.Shutdown(sdCtx)
}() }()
log.Printf("z2k vps-relay listening on %s", *listenAddr) log.Printf("z2k vps-relay listening on %s", *listenAddr)
if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed { select {
log.Fatal(err) case err := <-serveErr:
if err != nil && err != http.ErrServerClosed {
log.Fatal(err)
}
case <-ctx.Done():
log.Printf("shutdown requested, draining active connections")
sdCtx, sdCancel := context.WithTimeout(context.Background(), 5*time.Second)
defer sdCancel()
if err := srv.Shutdown(sdCtx); err != nil {
log.Printf("graceful shutdown failed: %v", err)
_ = srv.Close()
}
if err := <-serveErr; err != nil && err != http.ErrServerClosed {
log.Fatal(err)
}
log.Printf("server stopped")
} }
} }

2
z2k.sh
View file

@ -674,7 +674,7 @@ update_z2k() {
sleep 1 sleep 1
cp "$tg_tmp" /opt/sbin/tg-mtproxy-client cp "$tg_tmp" /opt/sbin/tg-mtproxy-client
chmod +x /opt/sbin/tg-mtproxy-client chmod +x /opt/sbin/tg-mtproxy-client
/opt/sbin/tg-mtproxy-client --listen=:1443 >> /tmp/tg-tunnel.log 2>&1 & /opt/sbin/tg-mtproxy-client --listen=:1443 --timeout=15m >> /tmp/tg-tunnel.log 2>&1 &
sleep 2 sleep 2
if pgrep -f "tg-mtproxy-client" >/dev/null 2>&1; then if pgrep -f "tg-mtproxy-client" >/dev/null 2>&1; then
print_success "Telegram tunnel обновлён и перезапущен" print_success "Telegram tunnel обновлён и перезапущен"