mirror of
https://github.com/BEARlogin/max-telegram-bridge-bot.git
synced 2026-04-28 03:39:46 +00:00
Issue #38 part 2. Allows using multiple MAX group chats as per-thread mirrors of a TG forum group, since MAX has no native thread concept. - New table thread_pairs (tg_chat_id, tg_thread_id, max_chat_id) with unique(max_chat_id) — one MAX chat = at most one TG thread. - pending gains thread_id column for thread-bridge key exchange. - Repo methods: StartThreadBridge (TG issues key), CompleteThreadBridge (MAX consumes key), GetThreadMaxChat, GetThreadTgPair, UnpairThread, UnpairThreadByMax. - Commands: * /thread_bridge in a TG forum thread (admin, non-General) -> key * /thread_bridge <key> in a MAX chat -> binds * /thread_unbridge on either side TG's BOT_COMMAND_INVALID rule forbids hyphens, so the commands use underscore. - Routing priority: thread-bridge > regular pair. TG->MAX: if msg.MessageThreadID has a thread_pair, route there. MAX->TG: if MAX chat is in thread_pairs, route to (tg_chat, thread). - For thread-paired MAX chats, the Part 1 reply-to-source-thread override is disabled: thread-paired chats have a fixed target thread, replies stay there. - Safeguard: a MAX chat cannot be in both pairs and thread_pairs. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
284 lines
9.8 KiB
Go
284 lines
9.8 KiB
Go
package main
|
||
|
||
import (
|
||
"context"
|
||
"crypto/sha256"
|
||
"encoding/hex"
|
||
"log/slog"
|
||
"net/http"
|
||
"sync"
|
||
"time"
|
||
|
||
maxbot "github.com/max-messenger/max-bot-api-client-go"
|
||
)
|
||
|
||
// Config — настройки bridge, читаемые из env.
|
||
type Config struct {
|
||
MaxToken string // токен MAX API (нужен для direct-send/upload)
|
||
TgBotURL string // ссылка на TG-бота для /help
|
||
MaxBotURL string // ссылка на MAX-бота для /help
|
||
WebhookURL string // базовый URL для webhook (если пусто — long polling)
|
||
WebhookPort string // порт для webhook сервера
|
||
TgAPIURL string // custom TG Bot API URL (если пусто — api.telegram.org)
|
||
AllowedUsers []int64 // whitelist TG user IDs (empty = allow all)
|
||
TgMaxFileSizeMB int // max file size TG->MAX in MB (0 = unlimited)
|
||
MaxMaxFileSizeMB int // max file size MAX->TG in MB (0 = unlimited)
|
||
// MaxAllowedExts — whitelist расширений для TG→MAX (nil = не проверять локально).
|
||
// Если задан, файлы с не-вхождением блокируются до отправки на CDN.
|
||
MaxAllowedExts map[string]struct{}
|
||
// MessageNewline — если true, текст идёт с новой строки после имени отправителя:
|
||
// "Имя:\nтекст" вместо "Имя: текст". Задаётся через env MESSAGE_FORMAT=newline.
|
||
MessageNewline bool
|
||
}
|
||
|
||
// chatBreaker хранит состояние circuit breaker для одного чата.
|
||
type chatBreaker struct {
|
||
fails int
|
||
blockedAt time.Time
|
||
}
|
||
|
||
const (
|
||
cbMaxFails = 3 // после N фейлов — блокируем
|
||
cbCooldown = 5 * time.Minute // на сколько блокируем
|
||
)
|
||
|
||
// Bridge — основная структура, объединяющая зависимости.
|
||
type Bridge struct {
|
||
cfg Config
|
||
repo Repository
|
||
tg TGSender
|
||
maxApi *maxbot.Api
|
||
maxBotUID int64 // MAX bot user ID (для фильтрации своих сообщений)
|
||
httpClient *http.Client // для скачивания/загрузки файлов (большой таймаут)
|
||
apiClient *http.Client // для коротких API-запросов (малый таймаут)
|
||
whSecret string // random path segment for webhook URLs
|
||
|
||
cpWaitMu sync.Mutex
|
||
cpWait map[int64]int64 // MAX userId → TG channel ID (ожидание пересылки)
|
||
|
||
cpTgOwnerMu sync.Mutex
|
||
cpTgOwner map[int64]int64 // TG channel ID → TG user ID (кто переслал пост)
|
||
|
||
cbMu sync.Mutex
|
||
breakers map[int64]*chatBreaker // destination chatID → breaker
|
||
|
||
// Буферизация TG media groups (альбомы)
|
||
mgMu sync.Mutex
|
||
mgBuffers map[string]*mediaGroupBuffer // MediaGroupID → buffer
|
||
}
|
||
|
||
// NewBridge создаёт экземпляр Bridge.
|
||
func NewBridge(cfg Config, repo Repository, tg TGSender, maxApi *maxbot.Api, maxBotUID int64) *Bridge {
|
||
// Derive webhook secret from tokens (stable across restarts)
|
||
h := sha256.Sum256([]byte(cfg.MaxToken + tg.BotToken()))
|
||
secret := hex.EncodeToString(h[:8])
|
||
|
||
return &Bridge{
|
||
cfg: cfg,
|
||
repo: repo,
|
||
tg: tg,
|
||
maxApi: maxApi,
|
||
maxBotUID: maxBotUID,
|
||
httpClient: &http.Client{
|
||
Timeout: 5 * time.Minute, // для download/upload больших файлов
|
||
},
|
||
apiClient: &http.Client{
|
||
Timeout: 15 * time.Second, // для коротких API-запросов
|
||
},
|
||
whSecret: secret,
|
||
cpWait: make(map[int64]int64),
|
||
cpTgOwner: make(map[int64]int64),
|
||
breakers: make(map[int64]*chatBreaker),
|
||
mgBuffers: make(map[string]*mediaGroupBuffer),
|
||
}
|
||
}
|
||
|
||
// cbBlocked проверяет, заблокирован ли чат.
|
||
func (b *Bridge) cbBlocked(chatID int64) bool {
|
||
b.cbMu.Lock()
|
||
defer b.cbMu.Unlock()
|
||
cb, ok := b.breakers[chatID]
|
||
if !ok {
|
||
return false
|
||
}
|
||
if cb.fails >= cbMaxFails && time.Since(cb.blockedAt) < cbCooldown {
|
||
return true
|
||
}
|
||
if cb.fails >= cbMaxFails {
|
||
// Кулдаун прошёл — сбрасываем, пробуем снова
|
||
delete(b.breakers, chatID)
|
||
}
|
||
return false
|
||
}
|
||
|
||
// cbFail регистрирует ошибку. Возвращает true если чат только что заблокировался.
|
||
func (b *Bridge) cbFail(chatID int64) bool {
|
||
b.cbMu.Lock()
|
||
defer b.cbMu.Unlock()
|
||
cb, ok := b.breakers[chatID]
|
||
if !ok {
|
||
cb = &chatBreaker{}
|
||
b.breakers[chatID] = cb
|
||
}
|
||
cb.fails++
|
||
if cb.fails == cbMaxFails {
|
||
cb.blockedAt = time.Now()
|
||
slog.Warn("circuit breaker: chat blocked", "chatID", chatID, "cooldown", cbCooldown)
|
||
return true
|
||
}
|
||
return false
|
||
}
|
||
|
||
// cbSuccess сбрасывает счётчик ошибок для чата.
|
||
func (b *Bridge) cbSuccess(chatID int64) {
|
||
b.cbMu.Lock()
|
||
defer b.cbMu.Unlock()
|
||
delete(b.breakers, chatID)
|
||
}
|
||
|
||
// maxMaxFileBytes returns the MAX-to-TG file size limit in bytes (0 = unlimited).
|
||
func (c *Config) maxMaxFileBytes() int64 {
|
||
if c.MaxMaxFileSizeMB <= 0 {
|
||
return 0
|
||
}
|
||
return int64(c.MaxMaxFileSizeMB) * 1024 * 1024
|
||
}
|
||
|
||
// isUserAllowed проверяет, есть ли tgUserID в белом списке.
|
||
// Если AllowedUsers пуст — доступ разрешён всем.
|
||
func (b *Bridge) isUserAllowed(tgUserID int64) bool {
|
||
if len(b.cfg.AllowedUsers) == 0 {
|
||
return true
|
||
}
|
||
for _, id := range b.cfg.AllowedUsers {
|
||
if id == tgUserID {
|
||
return true
|
||
}
|
||
}
|
||
return false
|
||
}
|
||
|
||
// checkUserAllowed проверяет доступ пользователя и отправляет сообщение об отказе если нужно.
|
||
// Возвращает true если доступ разрешён, false — если запрещён (и уже отправил ответ).
|
||
// userID == 0 трактуется как «нет отправителя» — доступ запрещается.
|
||
func (b *Bridge) checkUserAllowed(ctx context.Context, chatID, userID int64, threadID int) bool {
|
||
if userID != 0 && b.isUserAllowed(userID) {
|
||
return true
|
||
}
|
||
slog.Debug("TG user not allowed", "uid", userID)
|
||
b.tg.SendMessage(ctx, chatID, "У вас нет прав доступа к боту.", &SendOpts{ThreadID: threadID})
|
||
return false
|
||
}
|
||
|
||
// isCrosspostOwner проверяет, является ли userID владельцем связки.
|
||
// owner_id=0 и tg_owner_id=0 — старая связка, доступна всем.
|
||
func (b *Bridge) isCrosspostOwner(maxChatID, userID int64) bool {
|
||
maxOwner, tgOwner := b.repo.GetCrosspostOwner(maxChatID)
|
||
if maxOwner == 0 && tgOwner == 0 {
|
||
return true // legacy, no owner
|
||
}
|
||
return userID == maxOwner || userID == tgOwner
|
||
}
|
||
|
||
// tgFileURL возвращает прямой URL файла из TG — через custom API если настроен.
|
||
func (b *Bridge) tgFileURL(ctx context.Context, fileID string) (string, error) {
|
||
filePath, err := b.tg.GetFile(ctx, fileID)
|
||
if err != nil {
|
||
return "", err
|
||
}
|
||
return b.tg.GetFileDirectURL(filePath), nil
|
||
}
|
||
|
||
// tgChatTitle возвращает title TG-чата/канала по ID. Пустая строка если не удалось.
|
||
func (b *Bridge) tgChatTitle(ctx context.Context, chatID int64) string {
|
||
title, err := b.tg.GetChat(ctx, chatID)
|
||
if err != nil {
|
||
return ""
|
||
}
|
||
return title
|
||
}
|
||
|
||
// isSelfTgBot проверяет, является ли отправитель нашим ботом (а не чужим).
|
||
func (b *Bridge) isSelfTgBot(from *UserInfo) bool {
|
||
return from != nil && from.IsBot && from.UserName == b.tg.BotUsername()
|
||
}
|
||
|
||
func (b *Bridge) tgWebhookPath() string {
|
||
return "/tg-webhook-" + b.whSecret
|
||
}
|
||
|
||
func (b *Bridge) maxWebhookPath() string {
|
||
return "/max-webhook-" + b.whSecret
|
||
}
|
||
|
||
// registerCommands регистрирует команды бота в Telegram.
|
||
func (b *Bridge) registerCommands(ctx context.Context) {
|
||
cmds := []BotCommand{
|
||
{Command: "bridge", Description: "Связать чат с MAX-чатом"},
|
||
{Command: "unbridge", Description: "Удалить связку чатов"},
|
||
{Command: "thread", Description: "Установить топик для сообщений из MAX"},
|
||
{Command: "thread_bridge", Description: "Связать тред с отдельным MAX-чатом"},
|
||
{Command: "thread_unbridge", Description: "Удалить связку треда"},
|
||
{Command: "crosspost", Description: "Список связок кросспостинга"},
|
||
{Command: "help", Description: "Инструкция"},
|
||
}
|
||
if err := b.tg.SetMyCommands(ctx, cmds, nil); err != nil {
|
||
slog.Error("TG setMyCommands (default) failed", "err", err)
|
||
}
|
||
if err := b.tg.SetMyCommands(ctx, cmds, &CommandScope{Type: "all_chat_administrators"}); err != nil {
|
||
slog.Error("TG setMyCommands (admins) failed", "err", err)
|
||
}
|
||
}
|
||
|
||
// Run запускает TG и MAX listener'ы + периодическую очистку.
|
||
func (b *Bridge) Run(ctx context.Context) {
|
||
b.registerCommands(ctx)
|
||
go func() {
|
||
t := time.NewTicker(10 * time.Minute)
|
||
defer t.Stop()
|
||
for {
|
||
select {
|
||
case <-ctx.Done():
|
||
return
|
||
case <-t.C:
|
||
b.repo.CleanOldMessages()
|
||
}
|
||
}
|
||
}()
|
||
|
||
// Воркер очереди — проверяет каждые 10 секунд
|
||
go func() {
|
||
t := time.NewTicker(10 * time.Second)
|
||
defer t.Stop()
|
||
for {
|
||
select {
|
||
case <-ctx.Done():
|
||
return
|
||
case <-t.C:
|
||
b.processQueue(ctx)
|
||
}
|
||
}
|
||
}()
|
||
|
||
if b.cfg.WebhookURL != "" {
|
||
go func() {
|
||
addr := ":" + b.cfg.WebhookPort
|
||
srv := &http.Server{
|
||
Addr: addr,
|
||
ReadTimeout: 10 * time.Second,
|
||
WriteTimeout: 10 * time.Second,
|
||
IdleTimeout: 60 * time.Second,
|
||
}
|
||
slog.Info("Webhook server starting", "addr", addr)
|
||
if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
|
||
slog.Error("Webhook server failed", "err", err)
|
||
}
|
||
}()
|
||
}
|
||
|
||
var wg sync.WaitGroup
|
||
wg.Add(2)
|
||
go func() { defer wg.Done(); b.listenTelegram(ctx) }()
|
||
go func() { defer wg.Done(); b.listenMax(ctx) }()
|
||
wg.Wait()
|
||
}
|