max-telegram-bridge-bot/bridge.go
Andrey Lugovskoy 8b80bb8ff1 Route crosspost error notifications to owner DM, not to the channel
For crosspost TG→MAX (a post in a TG channel), any delivery error
message used to be posted back into the same channel (visible to all
subscribers). The only person who can act on it is the crosspost owner.

New helper notifyTgUser(ctx, srcMsg, maxChatID, text, isCrosspost):
- crosspost: resolves tg_owner_id via GetCrosspostOwner and DMs that
  user; drops with a warn if the crosspost has no owner (legacy).
- bridge: keeps the existing behavior (post into the source chat,
  preserving forum thread).

All upload/size/circuit-breaker notifications in forwardTgToMax and
flushMediaGroup now go through notifyTgUser. Bridge flows are
unchanged; crosspost flows stop spamming channels.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-24 16:29:52 +04:00

336 lines
12 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package main
import (
"context"
"crypto/sha256"
"encoding/hex"
"log/slog"
"net/http"
"strings"
"sync"
"time"
maxbot "github.com/max-messenger/max-bot-api-client-go"
)
// Config — настройки bridge, читаемые из env.
type Config struct {
MaxToken string // токен MAX API (нужен для direct-send/upload)
TgBotURL string // ссылка на TG-бота для /help
MaxBotURL string // ссылка на MAX-бота для /help
WebhookURL string // базовый URL для webhook (если пусто — long polling)
WebhookPort string // порт для webhook сервера
TgAPIURL string // custom TG Bot API URL (если пусто — api.telegram.org)
AllowedUsers []int64 // whitelist TG user IDs (empty = allow all)
TgMaxFileSizeMB int // max file size TG->MAX in MB (0 = unlimited)
MaxMaxFileSizeMB int // max file size MAX->TG in MB (0 = unlimited)
// MaxAllowedExts — whitelist расширений для TG→MAX (nil = не проверять локально).
// Если задан, файлы с не-вхождением блокируются до отправки на CDN.
MaxAllowedExts map[string]struct{}
// MessageNewline — если true, текст идёт с новой строки после имени отправителя:
// "Имя:\nтекст" вместо "Имя: текст". Задаётся через env MESSAGE_FORMAT=newline.
MessageNewline bool
}
// chatBreaker хранит состояние circuit breaker для одного чата.
type chatBreaker struct {
fails int
blockedAt time.Time
}
const (
cbMaxFails = 3 // после N фейлов — блокируем
cbCooldown = 5 * time.Minute // на сколько блокируем
)
// Bridge — основная структура, объединяющая зависимости.
type Bridge struct {
cfg Config
repo Repository
tg TGSender
maxApi *maxbot.Api
maxBotUID int64 // MAX bot user ID (для фильтрации своих сообщений)
httpClient *http.Client // для скачивания/загрузки файлов (большой таймаут)
apiClient *http.Client // для коротких API-запросов (малый таймаут)
whSecret string // random path segment for webhook URLs
cpWaitMu sync.Mutex
cpWait map[int64]int64 // MAX userId → TG channel ID (ожидание пересылки)
cpTgOwnerMu sync.Mutex
cpTgOwner map[int64]int64 // TG channel ID → TG user ID (кто переслал пост)
cbMu sync.Mutex
breakers map[int64]*chatBreaker // destination chatID → breaker
// Буферизация TG media groups (альбомы)
mgMu sync.Mutex
mgBuffers map[string]*mediaGroupBuffer // MediaGroupID → buffer
}
// NewBridge создаёт экземпляр Bridge.
func NewBridge(cfg Config, repo Repository, tg TGSender, maxApi *maxbot.Api, maxBotUID int64) *Bridge {
// Derive webhook secret from tokens (stable across restarts)
h := sha256.Sum256([]byte(cfg.MaxToken + tg.BotToken()))
secret := hex.EncodeToString(h[:8])
return &Bridge{
cfg: cfg,
repo: repo,
tg: tg,
maxApi: maxApi,
maxBotUID: maxBotUID,
httpClient: &http.Client{
Timeout: 5 * time.Minute, // для download/upload больших файлов
},
apiClient: &http.Client{
Timeout: 15 * time.Second, // для коротких API-запросов
},
whSecret: secret,
cpWait: make(map[int64]int64),
cpTgOwner: make(map[int64]int64),
breakers: make(map[int64]*chatBreaker),
mgBuffers: make(map[string]*mediaGroupBuffer),
}
}
// cbBlocked проверяет, заблокирован ли чат.
func (b *Bridge) cbBlocked(chatID int64) bool {
b.cbMu.Lock()
defer b.cbMu.Unlock()
cb, ok := b.breakers[chatID]
if !ok {
return false
}
if cb.fails >= cbMaxFails && time.Since(cb.blockedAt) < cbCooldown {
return true
}
if cb.fails >= cbMaxFails {
// Кулдаун прошёл — сбрасываем, пробуем снова
delete(b.breakers, chatID)
}
return false
}
// cbFail регистрирует ошибку. Возвращает true если чат только что заблокировался.
func (b *Bridge) cbFail(chatID int64) bool {
b.cbMu.Lock()
defer b.cbMu.Unlock()
cb, ok := b.breakers[chatID]
if !ok {
cb = &chatBreaker{}
b.breakers[chatID] = cb
}
cb.fails++
if cb.fails == cbMaxFails {
cb.blockedAt = time.Now()
slog.Warn("circuit breaker: chat blocked", "chatID", chatID, "cooldown", cbCooldown)
return true
}
return false
}
// cbSuccess сбрасывает счётчик ошибок для чата.
func (b *Bridge) cbSuccess(chatID int64) {
b.cbMu.Lock()
defer b.cbMu.Unlock()
delete(b.breakers, chatID)
}
// maxMaxFileBytes returns the MAX-to-TG file size limit in bytes (0 = unlimited).
func (c *Config) maxMaxFileBytes() int64 {
if c.MaxMaxFileSizeMB <= 0 {
return 0
}
return int64(c.MaxMaxFileSizeMB) * 1024 * 1024
}
// isUserAllowed проверяет, есть ли tgUserID в белом списке.
// Если AllowedUsers пуст — доступ разрешён всем.
func (b *Bridge) isUserAllowed(tgUserID int64) bool {
if len(b.cfg.AllowedUsers) == 0 {
return true
}
for _, id := range b.cfg.AllowedUsers {
if id == tgUserID {
return true
}
}
return false
}
// checkUserAllowed проверяет доступ пользователя и отправляет сообщение об отказе если нужно.
// Возвращает true если доступ разрешён, false — если запрещён (и уже отправил ответ).
// userID == 0 трактуется как «нет отправителя» — доступ запрещается.
func (b *Bridge) checkUserAllowed(ctx context.Context, chatID, userID int64, threadID int) bool {
if userID != 0 && b.isUserAllowed(userID) {
return true
}
slog.Debug("TG user not allowed", "uid", userID)
b.tg.SendMessage(ctx, chatID, "У вас нет прав доступа к боту.", &SendOpts{ThreadID: threadID})
return false
}
// isCrosspostOwner проверяет, является ли userID владельцем связки.
// owner_id=0 и tg_owner_id=0 — старая связка, доступна всем.
func (b *Bridge) isCrosspostOwner(maxChatID, userID int64) bool {
maxOwner, tgOwner := b.repo.GetCrosspostOwner(maxChatID)
if maxOwner == 0 && tgOwner == 0 {
return true // legacy, no owner
}
return userID == maxOwner || userID == tgOwner
}
// tgFileURL возвращает прямой URL файла из TG — через custom API если настроен.
func (b *Bridge) tgFileURL(ctx context.Context, fileID string) (string, error) {
filePath, err := b.tg.GetFile(ctx, fileID)
if err != nil {
return "", err
}
return b.tg.GetFileDirectURL(filePath), nil
}
// tgChatTitle возвращает title TG-чата/канала по ID. Пустая строка если не удалось.
func (b *Bridge) tgChatTitle(ctx context.Context, chatID int64) string {
title, err := b.tg.GetChat(ctx, chatID)
if err != nil {
return ""
}
return title
}
// isSelfTgBot проверяет, является ли отправитель нашим ботом (а не чужим).
func (b *Bridge) isSelfTgBot(from *UserInfo) bool {
return from != nil && from.IsBot && from.UserName == b.tg.BotUsername()
}
// notifyTgUser отправляет пользовательское уведомление (например, об ошибке загрузки).
// Для bridge-режима — в чат, где пришло сообщение (в нужный тред, если форум).
// Для crosspost — в ЛС владельцу связки (tg_owner_id), чтобы не мусорить в канал.
// Если владелец не задан (legacy) — уведомление дропается с warn-логом.
func (b *Bridge) notifyTgUser(ctx context.Context, srcChat *TGMessage, maxChatID int64, text string, isCrosspost bool) {
if isCrosspost {
_, tgOwner := b.repo.GetCrosspostOwner(maxChatID)
if tgOwner == 0 {
slog.Warn("crosspost notify skipped: no tg owner", "maxChat", maxChatID, "text", text)
return
}
if _, err := b.tg.SendMessage(ctx, tgOwner, text, nil); err != nil {
slog.Warn("crosspost notify DM failed", "err", err, "tgOwner", tgOwner)
}
return
}
var opts *SendOpts
if srcChat != nil && srcChat.MessageThreadID != 0 {
opts = &SendOpts{ThreadID: srcChat.MessageThreadID}
}
b.tg.SendMessage(ctx, srcChat.Chat.ID, text, opts)
}
// uploadErrHint превращает техническую ошибку загрузки в короткий текст для юзера.
// Возвращает пустую строку для неизвестных ошибок — вызывающий код тогда
// отправит только generic-сообщение без технической мути.
func uploadErrHint(err error) string {
if err == nil {
return ""
}
s := err.Error()
switch {
case strings.Contains(s, "file is too big"):
return "файл слишком большой"
case strings.Contains(s, "file is not found") || strings.Contains(s, "FILE_REFERENCE_EXPIRED"):
return "файл не найден"
case strings.Contains(s, "attachment.not.ready"):
return "попробуйте ещё раз"
}
return ""
}
// uploadErrMsg собирает user-facing сообщение: base + ": hint" если подсказка известна,
// иначе просто "base." (без технической ошибки).
func uploadErrMsg(base string, err error) string {
if hint := uploadErrHint(err); hint != "" {
return base + ": " + hint + "."
}
return base + "."
}
func (b *Bridge) tgWebhookPath() string {
return "/tg-webhook-" + b.whSecret
}
func (b *Bridge) maxWebhookPath() string {
return "/max-webhook-" + b.whSecret
}
// registerCommands регистрирует команды бота в Telegram.
func (b *Bridge) registerCommands(ctx context.Context) {
cmds := []BotCommand{
{Command: "bridge", Description: "Связать чат с MAX-чатом"},
{Command: "unbridge", Description: "Удалить связку чатов"},
{Command: "thread", Description: "Установить топик для сообщений из MAX"},
{Command: "thread_bridge", Description: "Связать тред с отдельным MAX-чатом"},
{Command: "thread_unbridge", Description: "Удалить связку треда"},
{Command: "crosspost", Description: "Список связок кросспостинга"},
{Command: "help", Description: "Инструкция"},
}
if err := b.tg.SetMyCommands(ctx, cmds, nil); err != nil {
slog.Error("TG setMyCommands (default) failed", "err", err)
}
if err := b.tg.SetMyCommands(ctx, cmds, &CommandScope{Type: "all_chat_administrators"}); err != nil {
slog.Error("TG setMyCommands (admins) failed", "err", err)
}
}
// Run запускает TG и MAX listener'ы + периодическую очистку.
func (b *Bridge) Run(ctx context.Context) {
b.registerCommands(ctx)
go func() {
t := time.NewTicker(10 * time.Minute)
defer t.Stop()
for {
select {
case <-ctx.Done():
return
case <-t.C:
b.repo.CleanOldMessages()
}
}
}()
// Воркер очереди — проверяет каждые 10 секунд
go func() {
t := time.NewTicker(10 * time.Second)
defer t.Stop()
for {
select {
case <-ctx.Done():
return
case <-t.C:
b.processQueue(ctx)
}
}
}()
if b.cfg.WebhookURL != "" {
go func() {
addr := ":" + b.cfg.WebhookPort
srv := &http.Server{
Addr: addr,
ReadTimeout: 10 * time.Second,
WriteTimeout: 10 * time.Second,
IdleTimeout: 60 * time.Second,
}
slog.Info("Webhook server starting", "addr", addr)
if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
slog.Error("Webhook server failed", "err", err)
}
}()
}
var wg sync.WaitGroup
wg.Add(2)
go func() { defer wg.Done(); b.listenTelegram(ctx) }()
go func() { defer wg.Done(); b.listenMax(ctx) }()
wg.Wait()
}