mirror of
https://github.com/necronicle/z2k.git
synced 2026-04-28 03:20:25 +00:00
tg tunnel: backport keepalive and relay fixes
This commit is contained in:
parent
1133af483e
commit
e3b794dd74
16 changed files with 133 additions and 42 deletions
16
.github/workflows/ci.yml
vendored
16
.github/workflows/ci.yml
vendored
|
|
@ -34,7 +34,9 @@ jobs:
|
|||
uses: actions/setup-go@v5
|
||||
with:
|
||||
go-version: "stable"
|
||||
cache-dependency-path: mtproxy-client/go.sum
|
||||
cache-dependency-path: |
|
||||
mtproxy-client/go.sum
|
||||
vps-relay/go.sum
|
||||
|
||||
- name: Go vet
|
||||
working-directory: mtproxy-client
|
||||
|
|
@ -72,6 +74,18 @@ jobs:
|
|||
go build -ldflags="-s -w" -o /dev/null .
|
||||
done
|
||||
|
||||
- name: Go vet (vps-relay)
|
||||
working-directory: vps-relay
|
||||
run: go vet ./...
|
||||
|
||||
- name: Go test (vps-relay)
|
||||
working-directory: vps-relay
|
||||
run: go test ./...
|
||||
|
||||
- name: Go build (vps-relay)
|
||||
working-directory: vps-relay
|
||||
run: go build ./...
|
||||
|
||||
lua-check:
|
||||
name: Luacheck
|
||||
runs-on: ubuntu-latest
|
||||
|
|
|
|||
|
|
@ -8,6 +8,7 @@ max_line_length = false
|
|||
globals = {
|
||||
-- Desync action entry points (registered by z2k)
|
||||
"z2k_tls_alert_fatal",
|
||||
"z2k_tls_stalled",
|
||||
"z2k_success_no_reset",
|
||||
"z2k_tls_extshuffle",
|
||||
"z2k_tls_fp_pack_v2",
|
||||
|
|
|
|||
|
|
@ -1704,7 +1704,7 @@ step_finalize() {
|
|||
|
||||
# Start tunnel mode. -v enables stream-level logs needed by the
|
||||
# watchdog's stale-detection mode.
|
||||
/opt/sbin/tg-mtproxy-client --listen=:1443 -v >> /tmp/tg-tunnel.log 2>&1 &
|
||||
/opt/sbin/tg-mtproxy-client --listen=:1443 --timeout=15m -v >> /tmp/tg-tunnel.log 2>&1 &
|
||||
sleep 2
|
||||
|
||||
if pgrep -f "tg-mtproxy-client" >/dev/null 2>&1; then
|
||||
|
|
@ -1837,7 +1837,7 @@ start() {
|
|||
return 0
|
||||
fi
|
||||
echo "Starting tg-tunnel..."
|
||||
$BIN --listen=:1443 -v >> "$LOG" 2>&1 &
|
||||
$BIN --listen=:1443 --timeout=15m -v >> "$LOG" 2>&1 &
|
||||
echo $! > "$PIDFILE"
|
||||
sleep 2
|
||||
# Insert REDIRECT rules at TOP of both PREROUTING and OUTPUT (-I 1) so
|
||||
|
|
|
|||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
|
|
@ -14,7 +14,7 @@ var (
|
|||
tunnelURL = flag.String("tunnel-url", "wss://213.176.74.63.nip.io/ws", "Tunnel relay WebSocket URL")
|
||||
tunnelSecret = flag.String("tunnel-secret", "d01f72f9543b29da4e3724b1530c0d11cb30a6f8db15bc0adfe8f2d37b5844b2", "Shared secret for tunnel auth")
|
||||
verbose = flag.Bool("v", false, "Verbose logging")
|
||||
connTimeout = flag.Duration("timeout", 5*time.Minute, "Idle connection timeout")
|
||||
connTimeout = flag.Duration("timeout", 15*time.Minute, "Idle connection timeout")
|
||||
maxConns = flag.Int("max-conns", 1024, "Maximum concurrent connections")
|
||||
)
|
||||
|
||||
|
|
|
|||
|
|
@ -1,7 +1,6 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"context"
|
||||
"crypto/hmac"
|
||||
"crypto/sha256"
|
||||
|
|
@ -30,6 +29,11 @@ const (
|
|||
muxCONNECT_FAIL = 0x05
|
||||
)
|
||||
|
||||
const (
|
||||
wsPingInterval = 30 * time.Second
|
||||
wsReadTimeout = 90 * time.Second
|
||||
)
|
||||
|
||||
// Address types for CONNECT payload
|
||||
const (
|
||||
addrIPv4 = 1
|
||||
|
|
@ -90,6 +94,19 @@ func computeAuthHMAC(secret string) []byte {
|
|||
return mac.Sum(nil)
|
||||
}
|
||||
|
||||
func configureWSKeepalive(ws *websocket.Conn) {
|
||||
ws.SetReadLimit(2 * 1024 * 1024)
|
||||
_ = ws.SetReadDeadline(time.Now().Add(wsReadTimeout))
|
||||
ws.SetPongHandler(func(string) error {
|
||||
_ = ws.SetReadDeadline(time.Now().Add(wsReadTimeout))
|
||||
return nil
|
||||
})
|
||||
ws.SetPingHandler(func(data string) error {
|
||||
_ = ws.SetReadDeadline(time.Now().Add(wsReadTimeout))
|
||||
return ws.WriteControl(websocket.PongMessage, []byte(data), time.Now().Add(5*time.Second))
|
||||
})
|
||||
}
|
||||
|
||||
// tunnelClient manages the multiplexed WS tunnel.
|
||||
type tunnelClient struct {
|
||||
tunnelURL string
|
||||
|
|
@ -97,10 +114,10 @@ type tunnelClient struct {
|
|||
|
||||
ws *websocket.Conn
|
||||
writer *wsWriter
|
||||
streams sync.Map // uint16 → *tunnelStream
|
||||
streams sync.Map // uint16 → *tunnelStream
|
||||
nextID atomic.Uint32
|
||||
mu sync.Mutex // protects ws/writer replacement during reconnect
|
||||
connectSem chan struct{} // limits concurrent CONNECT to CF Workers limit
|
||||
mu sync.Mutex // protects ws/writer replacement during reconnect
|
||||
connectSem chan struct{} // limits concurrent in-flight CONNECTs — 6 keeps SYN rate under TG DC burst threshold
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
}
|
||||
|
|
@ -139,10 +156,9 @@ func (tc *tunnelClient) connectTunnelWS() (*websocket.Conn, error) {
|
|||
InsecureSkipVerify: false,
|
||||
},
|
||||
HandshakeTimeout: 10 * time.Second,
|
||||
Subprotocols: []string{"binary"},
|
||||
ReadBufferSize: 128 * 1024,
|
||||
WriteBufferSize: 128 * 1024,
|
||||
EnableCompression: true,
|
||||
ReadBufferSize: 256 * 1024,
|
||||
WriteBufferSize: 256 * 1024,
|
||||
EnableCompression: false,
|
||||
NetDial: func(network, addr string) (net.Conn, error) {
|
||||
// Force IPv4 — IPv6 to Cloudflare is unstable on some ISPs
|
||||
conn, err := net.DialTimeout("tcp4", addr, 10*time.Second)
|
||||
|
|
@ -161,7 +177,7 @@ func (tc *tunnelClient) connectTunnelWS() (*websocket.Conn, error) {
|
|||
if err != nil {
|
||||
return nil, fmt.Errorf("WS dial %s: %w", tc.tunnelURL, err)
|
||||
}
|
||||
ws.SetReadLimit(2 * 1024 * 1024)
|
||||
configureWSKeepalive(ws)
|
||||
|
||||
// Send auth message: [0x00 0x00][0x00][hmac_32_bytes]
|
||||
authMAC := computeAuthHMAC(tc.tunnelSecret)
|
||||
|
|
@ -187,6 +203,7 @@ func (tc *tunnelClient) closeAllStreams() {
|
|||
|
||||
// readLoop reads mux frames from the WS and dispatches to streams.
|
||||
func (tc *tunnelClient) readLoop(ws *websocket.Conn) {
|
||||
configureWSKeepalive(ws)
|
||||
for {
|
||||
_, msg, err := ws.ReadMessage()
|
||||
if err != nil {
|
||||
|
|
@ -261,11 +278,10 @@ func (tc *tunnelClient) readLoop(ws *websocket.Conn) {
|
|||
func (tc *tunnelClient) streamReadLoop(stream *tunnelStream) {
|
||||
defer stream.close()
|
||||
|
||||
reader := bufio.NewReaderSize(stream.conn, 64*1024)
|
||||
buf := make([]byte, 64*1024)
|
||||
|
||||
for {
|
||||
n, err := reader.Read(buf)
|
||||
n, err := stream.conn.Read(buf)
|
||||
if n > 0 {
|
||||
stream.conn.SetDeadline(time.Now().Add(*connTimeout))
|
||||
frame := encodeMuxFrame(stream.id, muxDATA, buf[:n])
|
||||
|
|
@ -326,12 +342,12 @@ func (tc *tunnelClient) run() {
|
|||
|
||||
connectedAt := time.Now()
|
||||
|
||||
// Keepalive: ping every 50s (CF kills idle WS after 100s)
|
||||
// Keepalive: ping every 30s (symmetric with server)
|
||||
wsDone := make(chan struct{})
|
||||
pingDone := make(chan struct{})
|
||||
go func() {
|
||||
defer close(pingDone)
|
||||
ticker := time.NewTicker(50 * time.Second)
|
||||
ticker := time.NewTicker(wsPingInterval)
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
select {
|
||||
|
|
@ -340,7 +356,13 @@ func (tc *tunnelClient) run() {
|
|||
w := tc.writer
|
||||
tc.mu.Unlock()
|
||||
if w != nil {
|
||||
w.WriteControl(websocket.PingMessage, nil, time.Now().Add(5*time.Second))
|
||||
if err := w.WriteControl(websocket.PingMessage, nil, time.Now().Add(5*time.Second)); err != nil {
|
||||
if *verbose {
|
||||
log.Printf("[tunnel] ping failed: %v", err)
|
||||
}
|
||||
_ = ws.Close()
|
||||
return
|
||||
}
|
||||
}
|
||||
case <-wsDone:
|
||||
return
|
||||
|
|
@ -465,7 +487,7 @@ func (tc *tunnelClient) handleTunnelConn(clientConn *net.TCPConn) {
|
|||
log.Printf("[tunnel] stream %d: %s -> %s:%d", streamID, clientConn.RemoteAddr(), origIP, origPort)
|
||||
}
|
||||
|
||||
// Rate-limit concurrent CONNECTs to stay within CF Workers 6-connection limit
|
||||
// Rate-limit concurrent in-flight CONNECTs — TG DC throttles SYN bursts from single IP
|
||||
select {
|
||||
case tc.connectSem <- struct{}{}:
|
||||
case <-time.After(10 * time.Second):
|
||||
|
|
|
|||
|
|
@ -1,12 +1,13 @@
|
|||
// vps-relay: TCP-over-WebSocket relay for the z2k tunnel client.
|
||||
//
|
||||
// Wire protocol (identical to cf-worker/worker.js):
|
||||
// [streamId u16 BE][msgType u8][payload]
|
||||
// Types: AUTH=0x00, CONNECT=0x01, DATA=0x02, CLOSE=0x03,
|
||||
// CONNECT_OK=0x04, CONNECT_FAIL=0x05
|
||||
// Auth: streamId=0, type=0x00, payload = HMAC-SHA256(secret, secret) (32 bytes)
|
||||
// CONNECT payload: [addr_type u8][addr][port u16 BE]
|
||||
// addr_type 1 = IPv4 (4 bytes), 4 = IPv6 (16 bytes)
|
||||
//
|
||||
// [streamId u16 BE][msgType u8][payload]
|
||||
// Types: AUTH=0x00, CONNECT=0x01, DATA=0x02, CLOSE=0x03,
|
||||
// CONNECT_OK=0x04, CONNECT_FAIL=0x05
|
||||
// Auth: streamId=0, type=0x00, payload = HMAC-SHA256(secret, secret) (32 bytes)
|
||||
// CONNECT payload: [addr_type u8][addr][port u16 BE]
|
||||
// addr_type 1 = IPv4 (4 bytes), 4 = IPv6 (16 bytes)
|
||||
//
|
||||
// No CF-style constraints: no 6-socket cap, no 10ms CPU limit, sessions
|
||||
// live as long as the WebSocket stays up.
|
||||
|
|
@ -23,8 +24,11 @@ import (
|
|||
"log"
|
||||
"net"
|
||||
"net/http"
|
||||
"os"
|
||||
"os/signal"
|
||||
"strconv"
|
||||
"sync"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/gorilla/websocket"
|
||||
|
|
@ -42,6 +46,11 @@ const (
|
|||
addrIPv6 = 4
|
||||
)
|
||||
|
||||
const (
|
||||
sessionWriteQueueDepth = 256
|
||||
sessionWriteQueueBytes = 8 * 1024 * 1024
|
||||
)
|
||||
|
||||
var (
|
||||
listenAddr = flag.String("listen", ":8080", "HTTP listen address (TLS terminated upstream by Caddy)")
|
||||
secret = flag.String("secret", "", "shared HMAC secret (must match tunnel client)")
|
||||
|
|
@ -168,8 +177,10 @@ type session struct {
|
|||
done chan struct{}
|
||||
once sync.Once
|
||||
|
||||
mu sync.Mutex
|
||||
streams map[uint16]*stream
|
||||
mu sync.Mutex
|
||||
queueMu sync.Mutex
|
||||
queuedBytes int
|
||||
streams map[uint16]*stream
|
||||
}
|
||||
|
||||
type stream struct {
|
||||
|
|
@ -182,7 +193,7 @@ func newSession(ws *websocket.Conn, id string) *session {
|
|||
return &session{
|
||||
id: id,
|
||||
ws: ws,
|
||||
writeCh: make(chan []byte, 512),
|
||||
writeCh: make(chan []byte, sessionWriteQueueDepth),
|
||||
done: make(chan struct{}),
|
||||
streams: make(map[uint16]*stream),
|
||||
}
|
||||
|
|
@ -201,14 +212,43 @@ func (s *session) kill() {
|
|||
}
|
||||
s.streams = nil
|
||||
s.mu.Unlock()
|
||||
s.queueMu.Lock()
|
||||
s.queuedBytes = 0
|
||||
s.queueMu.Unlock()
|
||||
})
|
||||
}
|
||||
|
||||
func (s *session) reserveQueue(frameLen int) bool {
|
||||
s.queueMu.Lock()
|
||||
defer s.queueMu.Unlock()
|
||||
if s.queuedBytes+frameLen > sessionWriteQueueBytes {
|
||||
return false
|
||||
}
|
||||
s.queuedBytes += frameLen
|
||||
return true
|
||||
}
|
||||
|
||||
func (s *session) releaseQueue(frameLen int) {
|
||||
s.queueMu.Lock()
|
||||
defer s.queueMu.Unlock()
|
||||
s.queuedBytes -= frameLen
|
||||
if s.queuedBytes < 0 {
|
||||
s.queuedBytes = 0
|
||||
}
|
||||
}
|
||||
|
||||
func (s *session) send(frame []byte) {
|
||||
if !s.reserveQueue(len(frame)) {
|
||||
log.Printf("[%s] write queue exceeded %d bytes, killing session", s.id, sessionWriteQueueBytes)
|
||||
go s.kill()
|
||||
return
|
||||
}
|
||||
select {
|
||||
case s.writeCh <- frame:
|
||||
case <-s.done:
|
||||
s.releaseQueue(len(frame))
|
||||
default:
|
||||
s.releaseQueue(len(frame))
|
||||
// Writer backpressure: drop the session if we can't keep up.
|
||||
log.Printf("[%s] writeCh full, killing session", s.id)
|
||||
go s.kill()
|
||||
|
|
@ -216,11 +256,12 @@ func (s *session) send(frame []byte) {
|
|||
}
|
||||
|
||||
func (s *session) writePump() {
|
||||
ticker := time.NewTicker(25 * time.Second)
|
||||
ticker := time.NewTicker(30 * time.Second)
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
select {
|
||||
case frame := <-s.writeCh:
|
||||
s.releaseQueue(len(frame))
|
||||
_ = s.ws.SetWriteDeadline(time.Now().Add(10 * time.Second))
|
||||
if err := s.ws.WriteMessage(websocket.BinaryMessage, frame); err != nil {
|
||||
if *verbose {
|
||||
|
|
@ -303,7 +344,7 @@ func (s *session) handleConnect(id uint16, payload []byte) {
|
|||
|
||||
// Pump TCP → WS
|
||||
go func() {
|
||||
buf := make([]byte, 32*1024)
|
||||
buf := make([]byte, 64*1024)
|
||||
for {
|
||||
n, err := conn.Read(buf)
|
||||
if n > 0 {
|
||||
|
|
@ -333,7 +374,7 @@ func (s *session) handleConnect(id uint16, payload []byte) {
|
|||
func (s *session) readPump() {
|
||||
defer s.kill()
|
||||
|
||||
s.ws.SetReadLimit(4 * 1024 * 1024)
|
||||
s.ws.SetReadLimit(2 * 1024 * 1024)
|
||||
_ = s.ws.SetReadDeadline(time.Now().Add(90 * time.Second))
|
||||
s.ws.SetPongHandler(func(string) error {
|
||||
_ = s.ws.SetReadDeadline(time.Now().Add(90 * time.Second))
|
||||
|
|
@ -409,8 +450,8 @@ func (s *session) readPump() {
|
|||
}
|
||||
|
||||
var upgrader = websocket.Upgrader{
|
||||
ReadBufferSize: 128 * 1024,
|
||||
WriteBufferSize: 128 * 1024,
|
||||
ReadBufferSize: 256 * 1024,
|
||||
WriteBufferSize: 256 * 1024,
|
||||
CheckOrigin: func(r *http.Request) bool { return true },
|
||||
EnableCompression: false,
|
||||
}
|
||||
|
|
@ -462,17 +503,30 @@ func main() {
|
|||
}
|
||||
|
||||
// Graceful shutdown
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
|
||||
defer stop()
|
||||
serveErr := make(chan error, 1)
|
||||
go func() {
|
||||
<-ctx.Done()
|
||||
sdCtx, sdCancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer sdCancel()
|
||||
_ = srv.Shutdown(sdCtx)
|
||||
serveErr <- srv.ListenAndServe()
|
||||
}()
|
||||
|
||||
log.Printf("z2k vps-relay listening on %s", *listenAddr)
|
||||
if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
|
||||
log.Fatal(err)
|
||||
select {
|
||||
case err := <-serveErr:
|
||||
if err != nil && err != http.ErrServerClosed {
|
||||
log.Fatal(err)
|
||||
}
|
||||
case <-ctx.Done():
|
||||
log.Printf("shutdown requested, draining active connections")
|
||||
sdCtx, sdCancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer sdCancel()
|
||||
if err := srv.Shutdown(sdCtx); err != nil {
|
||||
log.Printf("graceful shutdown failed: %v", err)
|
||||
_ = srv.Close()
|
||||
}
|
||||
if err := <-serveErr; err != nil && err != http.ErrServerClosed {
|
||||
log.Fatal(err)
|
||||
}
|
||||
log.Printf("server stopped")
|
||||
}
|
||||
}
|
||||
|
|
|
|||
2
z2k.sh
2
z2k.sh
|
|
@ -674,7 +674,7 @@ update_z2k() {
|
|||
sleep 1
|
||||
cp "$tg_tmp" /opt/sbin/tg-mtproxy-client
|
||||
chmod +x /opt/sbin/tg-mtproxy-client
|
||||
/opt/sbin/tg-mtproxy-client --listen=:1443 >> /tmp/tg-tunnel.log 2>&1 &
|
||||
/opt/sbin/tg-mtproxy-client --listen=:1443 --timeout=15m >> /tmp/tg-tunnel.log 2>&1 &
|
||||
sleep 2
|
||||
if pgrep -f "tg-mtproxy-client" >/dev/null 2>&1; then
|
||||
print_success "Telegram tunnel обновлён и перезапущен"
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue