max-telegram-bridge-bot/bridge.go
Andrey Lugovskoy 1290a23ad7 Add /thread_bridge: link a single TG forum thread to a separate MAX chat
Issue #38 part 2. Allows using multiple MAX group chats as per-thread
mirrors of a TG forum group, since MAX has no native thread concept.

- New table thread_pairs (tg_chat_id, tg_thread_id, max_chat_id) with
  unique(max_chat_id) — one MAX chat = at most one TG thread.
- pending gains thread_id column for thread-bridge key exchange.
- Repo methods: StartThreadBridge (TG issues key), CompleteThreadBridge
  (MAX consumes key), GetThreadMaxChat, GetThreadTgPair, UnpairThread,
  UnpairThreadByMax.
- Commands:
  * /thread_bridge in a TG forum thread (admin, non-General) -> key
  * /thread_bridge <key> in a MAX chat -> binds
  * /thread_unbridge on either side
  TG's BOT_COMMAND_INVALID rule forbids hyphens, so the commands use
  underscore.
- Routing priority: thread-bridge > regular pair.
  TG->MAX: if msg.MessageThreadID has a thread_pair, route there.
  MAX->TG: if MAX chat is in thread_pairs, route to (tg_chat, thread).
- For thread-paired MAX chats, the Part 1 reply-to-source-thread
  override is disabled: thread-paired chats have a fixed target thread,
  replies stay there.
- Safeguard: a MAX chat cannot be in both pairs and thread_pairs.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-23 17:39:00 +04:00

284 lines
9.8 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package main
import (
"context"
"crypto/sha256"
"encoding/hex"
"log/slog"
"net/http"
"sync"
"time"
maxbot "github.com/max-messenger/max-bot-api-client-go"
)
// Config — настройки bridge, читаемые из env.
type Config struct {
MaxToken string // токен MAX API (нужен для direct-send/upload)
TgBotURL string // ссылка на TG-бота для /help
MaxBotURL string // ссылка на MAX-бота для /help
WebhookURL string // базовый URL для webhook (если пусто — long polling)
WebhookPort string // порт для webhook сервера
TgAPIURL string // custom TG Bot API URL (если пусто — api.telegram.org)
AllowedUsers []int64 // whitelist TG user IDs (empty = allow all)
TgMaxFileSizeMB int // max file size TG->MAX in MB (0 = unlimited)
MaxMaxFileSizeMB int // max file size MAX->TG in MB (0 = unlimited)
// MaxAllowedExts — whitelist расширений для TG→MAX (nil = не проверять локально).
// Если задан, файлы с не-вхождением блокируются до отправки на CDN.
MaxAllowedExts map[string]struct{}
// MessageNewline — если true, текст идёт с новой строки после имени отправителя:
// "Имя:\nтекст" вместо "Имя: текст". Задаётся через env MESSAGE_FORMAT=newline.
MessageNewline bool
}
// chatBreaker хранит состояние circuit breaker для одного чата.
type chatBreaker struct {
fails int
blockedAt time.Time
}
const (
cbMaxFails = 3 // после N фейлов — блокируем
cbCooldown = 5 * time.Minute // на сколько блокируем
)
// Bridge — основная структура, объединяющая зависимости.
type Bridge struct {
cfg Config
repo Repository
tg TGSender
maxApi *maxbot.Api
maxBotUID int64 // MAX bot user ID (для фильтрации своих сообщений)
httpClient *http.Client // для скачивания/загрузки файлов (большой таймаут)
apiClient *http.Client // для коротких API-запросов (малый таймаут)
whSecret string // random path segment for webhook URLs
cpWaitMu sync.Mutex
cpWait map[int64]int64 // MAX userId → TG channel ID (ожидание пересылки)
cpTgOwnerMu sync.Mutex
cpTgOwner map[int64]int64 // TG channel ID → TG user ID (кто переслал пост)
cbMu sync.Mutex
breakers map[int64]*chatBreaker // destination chatID → breaker
// Буферизация TG media groups (альбомы)
mgMu sync.Mutex
mgBuffers map[string]*mediaGroupBuffer // MediaGroupID → buffer
}
// NewBridge создаёт экземпляр Bridge.
func NewBridge(cfg Config, repo Repository, tg TGSender, maxApi *maxbot.Api, maxBotUID int64) *Bridge {
// Derive webhook secret from tokens (stable across restarts)
h := sha256.Sum256([]byte(cfg.MaxToken + tg.BotToken()))
secret := hex.EncodeToString(h[:8])
return &Bridge{
cfg: cfg,
repo: repo,
tg: tg,
maxApi: maxApi,
maxBotUID: maxBotUID,
httpClient: &http.Client{
Timeout: 5 * time.Minute, // для download/upload больших файлов
},
apiClient: &http.Client{
Timeout: 15 * time.Second, // для коротких API-запросов
},
whSecret: secret,
cpWait: make(map[int64]int64),
cpTgOwner: make(map[int64]int64),
breakers: make(map[int64]*chatBreaker),
mgBuffers: make(map[string]*mediaGroupBuffer),
}
}
// cbBlocked проверяет, заблокирован ли чат.
func (b *Bridge) cbBlocked(chatID int64) bool {
b.cbMu.Lock()
defer b.cbMu.Unlock()
cb, ok := b.breakers[chatID]
if !ok {
return false
}
if cb.fails >= cbMaxFails && time.Since(cb.blockedAt) < cbCooldown {
return true
}
if cb.fails >= cbMaxFails {
// Кулдаун прошёл — сбрасываем, пробуем снова
delete(b.breakers, chatID)
}
return false
}
// cbFail регистрирует ошибку. Возвращает true если чат только что заблокировался.
func (b *Bridge) cbFail(chatID int64) bool {
b.cbMu.Lock()
defer b.cbMu.Unlock()
cb, ok := b.breakers[chatID]
if !ok {
cb = &chatBreaker{}
b.breakers[chatID] = cb
}
cb.fails++
if cb.fails == cbMaxFails {
cb.blockedAt = time.Now()
slog.Warn("circuit breaker: chat blocked", "chatID", chatID, "cooldown", cbCooldown)
return true
}
return false
}
// cbSuccess сбрасывает счётчик ошибок для чата.
func (b *Bridge) cbSuccess(chatID int64) {
b.cbMu.Lock()
defer b.cbMu.Unlock()
delete(b.breakers, chatID)
}
// maxMaxFileBytes returns the MAX-to-TG file size limit in bytes (0 = unlimited).
func (c *Config) maxMaxFileBytes() int64 {
if c.MaxMaxFileSizeMB <= 0 {
return 0
}
return int64(c.MaxMaxFileSizeMB) * 1024 * 1024
}
// isUserAllowed проверяет, есть ли tgUserID в белом списке.
// Если AllowedUsers пуст — доступ разрешён всем.
func (b *Bridge) isUserAllowed(tgUserID int64) bool {
if len(b.cfg.AllowedUsers) == 0 {
return true
}
for _, id := range b.cfg.AllowedUsers {
if id == tgUserID {
return true
}
}
return false
}
// checkUserAllowed проверяет доступ пользователя и отправляет сообщение об отказе если нужно.
// Возвращает true если доступ разрешён, false — если запрещён (и уже отправил ответ).
// userID == 0 трактуется как «нет отправителя» — доступ запрещается.
func (b *Bridge) checkUserAllowed(ctx context.Context, chatID, userID int64, threadID int) bool {
if userID != 0 && b.isUserAllowed(userID) {
return true
}
slog.Debug("TG user not allowed", "uid", userID)
b.tg.SendMessage(ctx, chatID, "У вас нет прав доступа к боту.", &SendOpts{ThreadID: threadID})
return false
}
// isCrosspostOwner проверяет, является ли userID владельцем связки.
// owner_id=0 и tg_owner_id=0 — старая связка, доступна всем.
func (b *Bridge) isCrosspostOwner(maxChatID, userID int64) bool {
maxOwner, tgOwner := b.repo.GetCrosspostOwner(maxChatID)
if maxOwner == 0 && tgOwner == 0 {
return true // legacy, no owner
}
return userID == maxOwner || userID == tgOwner
}
// tgFileURL возвращает прямой URL файла из TG — через custom API если настроен.
func (b *Bridge) tgFileURL(ctx context.Context, fileID string) (string, error) {
filePath, err := b.tg.GetFile(ctx, fileID)
if err != nil {
return "", err
}
return b.tg.GetFileDirectURL(filePath), nil
}
// tgChatTitle возвращает title TG-чата/канала по ID. Пустая строка если не удалось.
func (b *Bridge) tgChatTitle(ctx context.Context, chatID int64) string {
title, err := b.tg.GetChat(ctx, chatID)
if err != nil {
return ""
}
return title
}
// isSelfTgBot проверяет, является ли отправитель нашим ботом (а не чужим).
func (b *Bridge) isSelfTgBot(from *UserInfo) bool {
return from != nil && from.IsBot && from.UserName == b.tg.BotUsername()
}
func (b *Bridge) tgWebhookPath() string {
return "/tg-webhook-" + b.whSecret
}
func (b *Bridge) maxWebhookPath() string {
return "/max-webhook-" + b.whSecret
}
// registerCommands регистрирует команды бота в Telegram.
func (b *Bridge) registerCommands(ctx context.Context) {
cmds := []BotCommand{
{Command: "bridge", Description: "Связать чат с MAX-чатом"},
{Command: "unbridge", Description: "Удалить связку чатов"},
{Command: "thread", Description: "Установить топик для сообщений из MAX"},
{Command: "thread_bridge", Description: "Связать тред с отдельным MAX-чатом"},
{Command: "thread_unbridge", Description: "Удалить связку треда"},
{Command: "crosspost", Description: "Список связок кросспостинга"},
{Command: "help", Description: "Инструкция"},
}
if err := b.tg.SetMyCommands(ctx, cmds, nil); err != nil {
slog.Error("TG setMyCommands (default) failed", "err", err)
}
if err := b.tg.SetMyCommands(ctx, cmds, &CommandScope{Type: "all_chat_administrators"}); err != nil {
slog.Error("TG setMyCommands (admins) failed", "err", err)
}
}
// Run запускает TG и MAX listener'ы + периодическую очистку.
func (b *Bridge) Run(ctx context.Context) {
b.registerCommands(ctx)
go func() {
t := time.NewTicker(10 * time.Minute)
defer t.Stop()
for {
select {
case <-ctx.Done():
return
case <-t.C:
b.repo.CleanOldMessages()
}
}
}()
// Воркер очереди — проверяет каждые 10 секунд
go func() {
t := time.NewTicker(10 * time.Second)
defer t.Stop()
for {
select {
case <-ctx.Done():
return
case <-t.C:
b.processQueue(ctx)
}
}
}()
if b.cfg.WebhookURL != "" {
go func() {
addr := ":" + b.cfg.WebhookPort
srv := &http.Server{
Addr: addr,
ReadTimeout: 10 * time.Second,
WriteTimeout: 10 * time.Second,
IdleTimeout: 60 * time.Second,
}
slog.Info("Webhook server starting", "addr", addr)
if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
slog.Error("Webhook server failed", "err", err)
}
}()
}
var wg sync.WaitGroup
wg.Add(2)
go func() { defer wg.Done(); b.listenTelegram(ctx) }()
go func() { defer wg.Done(); b.listenMax(ctx) }()
wg.Wait()
}