feat(dns): Added DNS availability test and updated UI

- Implemented `check_dns_availability` to measure DNS server latency
- Added `DNS_AVAILABILITY_SERVERS` and `DNS_AVAILABILITY_DOMAINS` to config
- Updated `_format_summary` to include DNS availability statistics
- Integrated DNS availability results into the CLI menu (Test 2)
- Added DoH wire-format (RFC 8484) support for improved probe reliability
- Enhanced `_parse_dns_response` for better error handling and parsing
- Refactored `run_domains_test` to better classify ISP, local, and Fake-IP
This commit is contained in:
Kirill Minovsky 2026-04-24 15:09:12 +03:00
parent c14f984ccd
commit 8463fa078d
6 changed files with 1004 additions and 314 deletions

View file

@ -14,7 +14,7 @@ from cli.ui import clean_hostname, build_domain_row
from core.tls_scanner import check_domain_tls, check_http_injection, create_dpi_client
from core.tcp16_scanner import check_tcp_16_20, check_tcp_16_20_with_rtt
from core.telegram_scanner import run_telegram_test as _run_telegram_test
from utils.network import get_resolved_ip
from utils.network import get_resolved_ip, get_fake_ip_type
# ── Воркеры ──────────────────────────────────────────────────────────────────
@ -51,17 +51,29 @@ async def _resolve_worker(domain_raw: str, semaphore: asyncio.Semaphore, stub_ip
entry["dns_fake"] = None
return entry
if stub_ips and resolved_ipv4 in stub_ips:
fake = "[bold red]DNS FAKE[/bold red]"
detail = f"DNS подмена -> {resolved_ipv4}"
entry["t13v4_res"] = (fake, detail, 0.0)
entry["t12_res"] = (fake, detail, 0.0)
entry["http_res"] = (fake, detail)
entry["dns_fake"] = True
fake_type = get_fake_ip_type(resolved_ipv4)
if fake_type != "fakeip" and stub_ips and resolved_ipv4 in stub_ips:
fake_type = "isp"
if fake_type == "isp":
fake = "[bold red]DNS FAKE[/bold red]"
detail = f"Заглушка провайдера -> {resolved_ipv4}"
entry["t13v4_res"] = (fake, detail, 0.0)
entry["t12_res"] = (fake, detail, 0.0)
entry["http_res"] = (fake, detail)
entry["dns_fake"] = True
elif fake_type == "local":
fake = "[bold yellow]LOCAL IP[/bold yellow]"
detail = f"Локальный IP -> {resolved_ipv4}"
entry["t13v4_res"] = (fake, detail, 0.0)
entry["t12_res"] = (fake, detail, 0.0)
entry["http_res"] = (fake, detail)
entry["dns_fake"] = True
return entry
return entry
async def _tls_worker(
entry: dict,
client: httpx.AsyncClient,
@ -104,7 +116,7 @@ async def _tcp16_worker(item: dict, semaphore: asyncio.Semaphore) -> list:
port = int(item.get("port", 443))
sni = None if port == 80 else (item.get("sni") or config.FAT_DEFAULT_SNI)
alive_str, status, detail = await check_tcp_16_20(ip, port, sni, semaphore)
alive_str, status, detail, rtt = await check_tcp_16_20(ip, port, sni, semaphore)
asn_raw = str(item.get("asn", "")).strip()
asn_str = (
@ -113,6 +125,11 @@ async def _tcp16_worker(item: dict, semaphore: asyncio.Semaphore) -> list:
else asn_raw.upper()
) or "-"
if rtt is not None:
rtt_ms = f"{int(rtt * 1000)}мс"
detail = f"{detail}"
#detail = f"{detail} | {rtt_ms}" if detail else rtt_ms
return [item["id"], asn_str, item["provider"], alive_str, status, detail]
@ -187,13 +204,25 @@ async def run_domains_test(semaphore: asyncio.Semaphore, stub_ips: set, domains:
await client_http.aclose()
rows = sorted([build_domain_row(e) for e in entries], key=lambda x: x[0])
dns_fail_count = 0
resolved_ips_counter: dict = {}
isp_stubs = {}
local_stubs = {}
fakeip_stubs = {}
for r in rows:
resolved_ip = r[5] if len(r) > 5 else None
if resolved_ip and stub_ips and resolved_ip in stub_ips:
resolved_ips_counter[resolved_ip] = resolved_ips_counter.get(resolved_ip, 0) + 1
if resolved_ip:
ftype = get_fake_ip_type(resolved_ip)
if ftype != "fakeip" and stub_ips and resolved_ip in stub_ips:
ftype = "isp"
if ftype == "isp":
isp_stubs[resolved_ip] = isp_stubs.get(resolved_ip, 0) + 1
elif ftype == "local":
local_stubs[resolved_ip] = local_stubs.get(resolved_ip, 0) + 1
elif ftype == "fakeip":
fakeip_stubs[resolved_ip] = fakeip_stubs.get(resolved_ip, 0) + 1
if any("DNS FAIL" in r[col] for col in (1, 2, 3)):
dns_fail_count += 1
@ -201,19 +230,38 @@ async def run_domains_test(semaphore: asyncio.Semaphore, stub_ips: set, domains:
table.add_row(*r[:5])
console.print(table)
confirmed_stubs = {ip: c for ip, c in resolved_ips_counter.items() if stub_ips and ip in stub_ips}
if confirmed_stubs or dns_fail_count > 0:
console.print(f"\n[bold yellow][i][!] НА ВАШЕМ УСТРОЙСТВЕ/РОУТЕРЕ НЕ НАСТРОЕН DoH:[/bold yellow]")
if confirmed_stubs:
ips_text = [f"[red]{ip}[/red] у {c} доменов" for ip, c in confirmed_stubs.items()]
console.print(f"DNS вернул IP заглушки: {', '.join(ips_text)}")
if isp_stubs or local_stubs or fakeip_stubs or dns_fail_count > 0:
console.print(f"\n[bold yellow][i][!] ИНФОРМАЦИЯ О DNS РЕЗОЛВЕ:[/bold yellow]")
if fakeip_stubs:
total_fake = sum(fakeip_stubs.values())
console.print(f"Трафик перехватывается Fake-IP: у [green]{total_fake}[/green] доменов")
if isp_stubs:
total_isp = sum(isp_stubs.values())
if len(isp_stubs) <= 3:
ips_text = [f"[red]{ip}[/red]" for ip in isp_stubs.keys()]
console.print(f"DNS вернул IP заглушки провайдера ({', '.join(ips_text)}): у {total_isp} доменов")
else:
console.print(f"DNS вернул IP заглушки провайдера: у [red]{total_isp}[/red] доменов")
if local_stubs:
total_local = sum(local_stubs.values())
if len(local_stubs) <= 3:
ips_text = [f"[yellow]{ip}[/yellow]" for ip in local_stubs.keys()]
console.print(f"DNS вернул локальные IP (работает AdGuard/hosts?): ({', '.join(ips_text)}): у {total_local} доменов")
else:
console.print(f"DNS вернул локальные IP (AdGuard/hosts/Pi-hole?): у [yellow]{total_local}[/yellow] доменов")
if dns_fail_count > 0:
console.print(f"У {dns_fail_count} сайтов обнаружен DNS FAIL (Домен не найден)")
console.print("[yellow]Рекомендация: Настройте DoH на вашем устройстве и роутере[/yellow]\n")
console.print("После настройки сбросьте кеш DNS:")
console.print("Windows: [dim]ipconfig /flushdns[/dim]")
console.print("MacOS: [dim]sudo dscacheutil -flushcache; sudo killall -HUP mDNSResponder[/dim]")
console.print("Linux: [dim]sudo resolvectl flush-caches[/dim]\n")
if isp_stubs or dns_fail_count > 0:
console.print("[yellow]Рекомендация: Настройте DoH на вашем устройстве и роутере[/yellow]\n")
console.print("После настройки сбросьте кеш DNS:")
console.print("Windows: [dim]ipconfig /flushdns[/dim]")
console.print("MacOS: [dim]sudo dscacheutil -flushcache; sudo killall -HUP mDNSResponder[/dim]")
console.print("Linux: [dim]sudo resolvectl flush-caches[/dim]\n")
block_markers = ("TLS DPI", "TLS MITM", "TLS BLOCK", "ISP PAGE", "BLOCKED", "TCP RST", "TCP ABORT")
return {
@ -402,7 +450,7 @@ async def run_whitelist_sni_test(semaphore: asyncio.Semaphore, tcp_items: list,
# Шаг 0: без SNI
try:
_a, st0, d0 = await check_tcp_16_20(ip, 443, "", semaphore, hint_rtt=hint)
_a, st0, d0, _rtt = await check_tcp_16_20(ip, 443, "", semaphore, hint_rtt=hint)
if "OK" in st0:
found.append(("(без SNI)", 0))
elif "DETECTED" not in st0 and "at " not in d0:
@ -422,7 +470,7 @@ async def run_whitelist_sni_test(semaphore: asyncio.Semaphore, tcp_items: list,
break
async def _one(sni: str):
_a, s, d = await check_tcp_16_20(ip, 443, sni, semaphore, hint_rtt=hint)
_a, s, d, _rtt = await check_tcp_16_20(ip, 443, sni, semaphore, hint_rtt=hint)
return sni, s, d
results = await asyncio.gather(

View file

@ -35,7 +35,12 @@ def build_domain_row(entry: dict) -> list:
if d12: details.append(f"T12:{d12}")
if d13: details.append(f"T13:{d13}")
times = [t for t in (t12_elapsed, t13_elapsed) if t > 0]
times = []
if "TIMEOUT" not in t12_status and t12_elapsed > 0:
times.append(t12_elapsed)
if "TIMEOUT" not in t13_status and t13_elapsed > 0:
times.append(t13_elapsed)
if times:
details.append(f"{min(times):.1f}s")
@ -44,9 +49,9 @@ def build_domain_row(entry: dict) -> list:
async def ask_test_selection() -> str:
# Алгоритмически строим все непустые подмножества цифр 16
# Алгоритмически строим все непустые подмножества цифр 17
from itertools import combinations
digits = "123456"
digits = "1234567"
valid = {
"".join(sorted(combo))
for r in range(1, len(digits) + 1)
@ -55,11 +60,12 @@ async def ask_test_selection() -> str:
console.print(
"\n[bold]Какие тесты запустить?[/bold]\n"
" [cyan]1[/cyan] — Проверка подмены DNS\n"
" [cyan]2[/cyan] — Проверка доступности доменов\n"
" [cyan]3[/cyan] — Проверка TCP 16-20KB блокировки\n"
" [cyan]4[/cyan] — Поиск белых SNI для ASN\n"
" [cyan]5[/cyan] — Проверка Telegram (замедление/блокировка)\n"
" [cyan]6[/cyan] — Легенда статусов\n"
" [cyan]2[/cyan] — Проверка доступности DNS-серверов\n"
" [cyan]3[/cyan] — Проверка доступности доменов\n"
" [cyan]4[/cyan] — Проверка TCP 16-20KB блокировки\n"
" [cyan]5[/cyan] — Поиск белых SNI для ASN\n"
" [cyan]6[/cyan] — Проверка Telegram (замедление/блокировка)\n"
" [cyan]7[/cyan] — Легенда статусов\n"
" [cyan]123[/cyan] — [dim](по умолчанию)[/dim]"
)
loop = asyncio.get_running_loop()
@ -75,7 +81,7 @@ async def ask_test_selection() -> str:
if raw in valid:
return raw
console.print("[yellow]Неверный ввод, запускаем все тесты.[/yellow]")
console.print("[yellow]Неверный ввод, запускаем тесты 1, 2, 3.[/yellow]")
return "123"
@ -88,6 +94,7 @@ def print_legend() -> None:
("TLS MITM", "Man-in-the-Middle: подменён сертификат (Unknown CA, Cert expired, Hostname mismatch)"),
("TLS BLOCK", "Блокировка версии TLS или протокола целиком (protocol_version alert)"),
("SSL ERR", "Прочие SSL ошибки: bad key share, record layer fail, internal error"),
("NO TLS1.3", "Сервер не поддерживает TLS 1.3 (норма для старых серверов)")
]),
("[bold cyan]— TCP / Соединение —[/bold cyan]", [
("TCP RST", "Соединение сброшено (TCP RST пакет от DPI или сервера)"),
@ -101,6 +108,9 @@ def print_legend() -> None:
("[bold cyan]— DNS —[/bold cyan]", [
("DNS FAIL", "Домен не разрешился через системный резолвер"),
("DNS FAKE", "IP домена совпадает с известной заглушкой провайдера"),
("TIMEOUT", "DNS-сервер не ответил в отведённое время"),
("BLOCKED", "DoH-сервер заблокирован провайдером (HTTP не прошёл)"),
("NXDOMAIN", "Домен не существует по мнению этого сервера"),
]),
("[bold cyan]— HTTP / Блокировки —[/bold cyan]", [
("BLOCKED", "HTTP 451 — Недоступно по юридическим причинам"),
@ -109,7 +119,7 @@ def print_legend() -> None:
"[red]Красный[/red] — редирект на чужой домен (подозрительно)"),
]),
("[bold cyan]— TCP 16-20KB тест —[/bold cyan]", [
("DETECTED", "Обрыв соединения после отправки 1620KB (FAT header блокировка)"),
("DETECTED", "Обрыв соединения после отправки 16KB+"),
("OK", "Все 16 запросов прошли без обрыва"),
]),
("[bold cyan]— Прочее —[/bold cyan]", [

View file

@ -83,4 +83,43 @@ DNS_DOH_SERVERS:
- ["https://cloudflare-dns.com/dns-query", "Cloudflare"]
- ["https://one.one.one.one/dns-query", "Cloudflare"]
- ["https://dns.adguard-dns.com/resolve", "AdGuard"]
- ["https://dns.alidns.com/resolve", "Alibaba"]
- ["https://dns.alidns.com/resolve", "Alibaba"]
# --- Тест 2: Проверка доступности DNS-серверов ---
DNS_AVAILABILITY_DOMAINS:
- "example.com"
- "vk.com"
- "ozon.ru"
- "habr.com"
- "mail.ru"
# Серверы для теста доступности. Каждый сервер — [адрес, имя, тип].
# Типы: "udp", "doh_json", "doh_wire"
DNS_AVAILABILITY_SERVERS:
# ── UDP ──────────────────────────────────────────────────────────────────────
- ["8.8.8.8", "Google", "udp"]
- ["1.1.1.1", "Cloudflare", "udp"]
- ["9.9.9.9", "Quad9", "udp"]
- ["94.140.14.14", "AdGuard", "udp"]
- ["77.88.8.8", "Yandex", "udp"]
- ["208.67.222.222", "OpenDNS", "udp"]
- ["76.76.2.0", "ControlD", "udp"]
- ["185.228.168.9", "CleanBrowsing", "udp"]
- ["76.223.122.150", "NextDNS", "udp"]
# - ["194.242.2.2", "Mullvad", "udp"] - походу заблочен
# - [ "https://doh.mullvad.net/dns-query", "Mullvad", "doh_wire" ]
# ── DoH Wire (RFC 8484, POST/GET application/dns-message) ───────────────────
- ["https://dns.google/dns-query", "Google", "doh_wire"]
- ["https://cloudflare-dns.com/dns-query", "Cloudflare", "doh_wire"]
- ["https://1.1.1.1/dns-query", "Cloudflare (IP)", "doh_wire"]
- ["https://dns.adguard-dns.com/dns-query", "AdGuard", "doh_wire"]
- ["https://dns.quad9.net/dns-query", "Quad9", "doh_wire"]
- ["https://doh.opendns.com/dns-query", "OpenDNS", "doh_wire"]
- ["https://common.dot.dns.yandex.net/dns-query", "Yandex", "doh_wire"]
- ["https://dns.nextdns.io/dns-query", "NextDNS", "doh_wire"]
- ["https://doh.cleanbrowsing.org/doh/security-filter", "CleanBrowsing", "doh_wire"]
- ["https://dns.sb/dns-query", "DNS.SB", "doh_wire"]
- ["https://doh.dns.sb/dns-query", "DNS.SB (alt)", "doh_wire"]
- ["https://doh.libredns.gr/dns-query", "LibreDNS", "doh_wire"]

File diff suppressed because it is too large Load diff

View file

@ -22,7 +22,11 @@ from utils import config
from cli.console import console
from cli.ui import ask_test_selection, print_legend
from cli.runners import run_domains_test, run_tcp_test, run_whitelist_sni_test, run_telegram_test
from core.dns_scanner import check_dns_integrity, collect_stub_ips_silently
from core.dns_scanner import (
check_dns_integrity,
check_dns_availability,
collect_stub_ips_silently,
)
from utils.files import load_domains, load_tcp_targets, load_whitelist_sni, get_base_dir
CURRENT_VERSION = "3.1.0"
@ -37,16 +41,15 @@ def parse_arguments():
description="DPI Detector — Анализатор блокировок трафика",
formatter_class=argparse.RawTextHelpFormatter
)
parser.add_argument("-t", "--tests", type=str, help="Список тестов для запуска (например: 123 или 24). Пропускает стартовое меню.")
parser.add_argument("-p", "--proxy", type=str, help="URL прокси (напр: socks5://127.0.0.1:1080) (PROXY_URL)")
parser.add_argument("-t", "--tests", type=str, help="Список тестов для запуска (например: 123 или 24). Пропускает стартовое меню.")
parser.add_argument("-p", "--proxy", type=str, help="URL прокси (напр: socks5://127.0.0.1:1080) (PROXY_URL)")
parser.add_argument("-c", "--concurrency", type=int, help="Максимальное количество параллельных запросов (MAX_CONCURRENT)")
parser.add_argument("-d", "--domain", type=str, action="append", help="Проверить конкретный домен(ы), игнорируя domains.txt.\nМожно указывать несколько раз: -d vk.com -d ya.ru")
parser.add_argument("-o", "--output", type=str, help="Путь для автосохранения отчета (например: report.txt).")
parser.add_argument("--batch", action="store_true", help="Отключает паузы и вопросы")
parser.add_argument("-d", "--domain", type=str, action="append", help="Проверить конкретный домен(ы), игнорируя domains.txt.\nМожно указывать несколько раз: -d vk.com -d ya.ru")
parser.add_argument("-o", "--output", type=str, help="Путь для автосохранения отчета (например: report.txt).")
parser.add_argument("--batch", action="store_true", help="Отключает паузы и вопросы")
return parser.parse_args()
async def _fetch_latest_version() -> Optional[str]:
"""Запрашивает последний тег с GitHub API. Возвращает строку версии или None."""
url = f"https://api.github.com/repos/{GITHUB_REPO}/releases/latest"
@ -63,12 +66,11 @@ async def _fetch_latest_version() -> Optional[str]:
def fast_exit_handler(sig, frame):
"""Принудительный выход по первому Ctrl+C."""
# Используем системный принт, т.к. rich может быть заблокирован
sys.stdout.write("\n\033[91m\033[1mПрервано пользователем.\033[0m\n")
sys.stdout.flush()
os._exit(0)
async def _readline_cancelable() -> str:
loop = asyncio.get_running_loop()
try:
@ -78,13 +80,12 @@ async def _readline_cancelable() -> str:
except asyncio.CancelledError:
raise KeyboardInterrupt
def _flush_stdin() -> None:
"""Сбрасывает накопившиеся данные в stdin чтобы буферные Enter не перезапускали тест."""
try:
import termios
termios.tcflush(sys.stdin, termios.TCIFLUSH)
except Exception:
# Windows и другие окружения — лучшее что можно сделать без блокировки
try:
import msvcrt
while msvcrt.kbhit():
@ -94,43 +95,70 @@ def _flush_stdin() -> None:
def _format_summary(
run_dns: bool, run_domains: bool, run_tcp: bool, run_telegram: bool,
dns_intercept: int, domain_stats, tcp_stats,
run_dns: bool,
run_dns_avail: bool,
run_domains: bool,
run_tcp: bool,
run_telegram: bool,
dns_intercept: int,
domain_stats,
tcp_stats,
telegram_stats=None,
doh_unavailable=False,
doh_unavailable: bool = False,
dns_avail_stats=None,
) -> List[str]:
lines = []
# ── Тест 1: подмена DNS ───────────────────────────────────────────────────
if run_dns:
total_dns = len(config.DNS_CHECK_DOMAINS)
ok_dns = total_dns - dns_intercept
if doh_unavailable:
lines.append(
f"[bold]DNS[/bold] "
f"[bold]DNS подмена[/bold] "
f"[red]× DoH заблокирован провайдером[/red]"
)
elif dns_intercept == 0:
lines.append(
f"[bold]DNS[/bold] "
f"[bold]DNS подмена[/bold] "
f"[green]√ {ok_dns}/{total_dns} не подменяется[/green]"
)
elif dns_intercept == total_dns:
lines.append(
f"[bold]DNS[/bold] "
f"[bold]DNS подмена[/bold] "
f"[red]× {dns_intercept}/{total_dns} подменяется провайдером[/red]"
)
else:
lines.append(
f"[bold]DNS[/bold] "
f"[bold]DNS подмена[/bold] "
f"[green]√ {ok_dns}/{total_dns} OK[/green]"
f" [red]× {dns_intercept}/{total_dns} подменяется провайдером[/red]"
f" [red]× {dns_intercept}/{total_dns} подменяется[/red]"
)
# ── Тест 2: доступность DNS ───────────────────────────────────────────────
if run_dns_avail:
if dns_avail_stats:
d = dns_avail_stats
doh_color = "green" if d["doh_ok"] == d["doh_total"] else (
"red" if d["doh_ok"] == 0 else "yellow"
)
udp_color = "green" if d["udp_ok"] == d["udp_total"] else (
"red" if d["udp_ok"] == 0 else "yellow"
)
lines.append(
f"[bold]DNS доступность[/bold] "
f"[{doh_color}]{d['doh_ok']}/{d['doh_total']} DoH[/{doh_color}]"
f" [{udp_color}]{d['udp_ok']}/{d['udp_total']} UDP[/{udp_color}]"
)
else:
lines.append("[bold]DNS доступность[/bold] [dim]—[/dim]")
# ── Тест 3: домены ────────────────────────────────────────────────────────
if domain_stats:
d = domain_stats
pct = int(d["ok"] / d["total"] * 100) if d["total"] else 0
line = (
f"[bold]Домены[/bold] "
f"[bold]Домены[/bold] "
f"[green]√ {d['ok']}/{d['total']} OK[/green]"
+ (f" [red]× {d['blocked']} блок.[/red]" if d['blocked'] else "")
+ (f" [yellow]⏱ {d['timeout']} таймаут[/yellow]" if d['timeout'] else "")
@ -138,11 +166,12 @@ def _format_summary(
)
lines.append(line)
# ── Тест 4: TCP 16-20KB ───────────────────────────────────────────────────
if tcp_stats:
t = tcp_stats
pct = int(t["ok"] / t["total"] * 100) if t["total"] else 0
line = (
f"[bold]TCP 16-20KB[/bold] "
f"[bold]TCP 16-20KB[/bold] "
f"[green]√ {t['ok']}/{t['total']} OK[/green]"
+ (f" [red]× {t['blocked']} блок.[/red]" if t['blocked'] else "")
+ (f" [yellow]≈ {t['mixed']} смеш.[/yellow]" if t['mixed'] else "")
@ -150,18 +179,18 @@ def _format_summary(
)
lines.append(line)
# ── Тест 6: Telegram ──────────────────────────────────────────────────────
if run_telegram and telegram_stats:
t = telegram_stats
dl_data = t.get("download", {})
ul_data = t.get("upload", {})
dc_r, dc_t = t.get("dc_reachable", 0), t.get("dc_total", 0)
def format_tg_line(label, data, speed_key, size_key):
st = data.get("status")
avg = data.get(speed_key, 0)
def _fmt_tg(label, data, speed_key, size_key):
st = data.get("status")
avg = data.get(speed_key, 0)
size = data.get(size_key, 0)
drop = data.get("drop_at_sec")
if st == "ok":
raw_st, color = "ОК", "green"
elif st == "stalled":
@ -172,18 +201,13 @@ def _format_summary(
raw_st, color = "НЕДОСТУПНО", "red"
else:
raw_st, color = "ОШИБКА", "red"
status_text = f"[{color}]{raw_st:<16}[/{color}]"
metrics = f"ср. {_tg_fmt(avg)}, {_tg_size(size)}"
if drop:
metrics += f", обрыв на {drop}с"
return f"[bold]{label:<13}[/bold] [{color}]{raw_st:<16}[/{color}] {metrics}"
return f"[bold]{label:<13}[/bold] {status_text} {metrics}"
lines.append(format_tg_line("TG Скачивание", dl_data, "avg_bps", "bytes_total"))
lines.append(format_tg_line("TG Загрузка", ul_data, "bps", "sent"))
lines.append(_fmt_tg("TG Скачивание", dl_data, "avg_bps", "bytes_total"))
lines.append(_fmt_tg("TG Загрузка", ul_data, "bps", "sent"))
dc_color = "green" if dc_r == dc_t else ("red" if dc_r == 0 else "yellow")
lines.append(f"[bold]{'TG Датацентры':<13}[/bold] [{dc_color}]ОК {dc_r}/{dc_t}[/{dc_color}]")
@ -191,17 +215,14 @@ def _format_summary(
def is_newer(latest: str, current: str) -> bool:
"""Сравнивает версии. Возвращает True, если на GitHub версия выше текущей."""
try:
def parse(v):
return tuple(int(x) for x in v.replace('v', '').split('.') if x.isdigit())
l_parts = parse(latest)
c_parts = parse(current)
return l_parts > c_parts
return parse(latest) > parse(current)
except Exception:
return False
async def main():
args = parse_arguments()
@ -231,13 +252,16 @@ async def main():
else:
selection = await ask_test_selection()
run_dns = "1" in selection
run_domains = "2" in selection
run_tcp = "3" in selection
run_wl_sni = "4" in selection
run_telegram = "5" in selection
run_legend = "6" in selection
only_legend = run_legend and not any([run_dns, run_domains, run_tcp, run_wl_sni, run_telegram])
run_dns = "1" in selection # Тест 1: подмена DNS
run_dns_avail = "2" in selection # Тест 2: доступность DNS-серверов
run_domains = "3" in selection # Тест 3: доступность доменов (TLS/HTTP)
run_tcp = "4" in selection # Тест 4: TCP 16-20KB блокировка
run_wl_sni = "5" in selection # Тест 5: белые SNI для ASN
run_telegram = "6" in selection # Тест 6: Telegram
run_legend = "7" in selection # Тест 7: Легенда
only_legend = run_legend and not any([
run_dns, run_dns_avail, run_domains, run_tcp, run_wl_sni, run_telegram
])
if only_legend:
print_legend()
@ -268,7 +292,7 @@ async def main():
semaphore = asyncio.Semaphore(config.MAX_CONCURRENT)
while True:
# ── DNS ───────────────────────────────────────────────────────────────
# ── Тест 1: подмена DNS ───────────────────────────────────────────────
stub_ips: set = set()
dns_intercept_count = 0
doh_unavailable = False
@ -284,36 +308,47 @@ async def main():
except asyncio.TimeoutError:
stub_ips = set()
# ── Домены ────────────────────────────────────────────────────────────
# ── Тест 2: доступность DNS-серверов ─────────────────────────────────
dns_avail_stats = None
if run_dns_avail:
dns_avail_stats = await check_dns_availability()
# ── Тест 3: домены ────────────────────────────────────────────────────
domain_stats = None
if run_domains:
domain_stats = await run_domains_test(semaphore, stub_ips, DOMAINS)
# ── TCP 16-20KB ───────────────────────────────────────────────────────
# ── Тест 4: TCP 16-20KB ───────────────────────────────────────────────
tcp_stats = None
if run_tcp:
tcp_stats = await run_tcp_test(semaphore, TCP_16_20_ITEMS)
# ── Белые SNI ─────────────────────────────────────────────────────────
# ── Тест 5: белые SNI ─────────────────────────────────────────────────
if run_wl_sni:
if WHITELIST_SNI:
await run_whitelist_sni_test(semaphore, TCP_16_20_ITEMS, WHITELIST_SNI)
else:
console.print("[yellow]Файл whitelist_sni.txt пуст или не найден — тест 4 пропущен.[/yellow]")
console.print("[yellow]Файл whitelist_sni.txt пуст или не найден — тест 5 пропущен.[/yellow]")
# ── Telegram ──────────────────────────────────────────────────────────
# ── Тест 6: Telegram ──────────────────────────────────────────────────
telegram_stats = None
if run_telegram:
telegram_stats = await run_telegram_test(semaphore)
# ── Итоговая сводка ───────────────────────────────────────────────────
active_tests = sum([run_dns, run_domains, run_tcp, run_wl_sni, run_telegram])
console.print()
summary_lines = _format_summary(
run_dns, run_domains, run_tcp, run_telegram,
dns_intercept_count, domain_stats, tcp_stats,
run_dns=run_dns,
run_dns_avail=run_dns_avail,
run_domains=run_domains,
run_tcp=run_tcp,
run_telegram=run_telegram,
dns_intercept=dns_intercept_count,
domain_stats=domain_stats,
tcp_stats=tcp_stats,
telegram_stats=telegram_stats,
doh_unavailable=doh_unavailable,
dns_avail_stats=dns_avail_stats,
)
console.print(Panel(
"\n".join(summary_lines),
@ -323,10 +358,9 @@ async def main():
expand=False,
))
console.print("\n[bold green]Проверка завершена.[/bold green]")
# ── Уведомление о новой версии ────────────────────────
# ── Уведомление о новой версии ────────────────────────────────────────
if not latest_version_notified:
try:
latest = await asyncio.wait_for(asyncio.shield(version_task), timeout=0.1)
@ -352,7 +386,7 @@ async def main():
"\nНажмите [bold green]Enter[/bold green] чтобы повторить проверку "
"или [bold red]Ctrl+C[/bold red] для выхода"
)
_flush_stdin() # сбрасываем накопившиеся Enter чтобы не было авто-перезапуска
_flush_stdin()
try:
await _readline_cancelable()
except KeyboardInterrupt:

View file

@ -134,7 +134,7 @@ def classify_ssl_error(error: ssl.SSLError, bytes_read: int) -> Tuple[str, str,
return ("[bold red]TLS MITM[/bold red]", "Cipher mismatch", bytes_read)
if "version" in msg or "protocol version" in msg:
return ("[bold red]BLOCK[/bold red]", "TLS version block", bytes_read)
return ("[bold red]NO TLS1.3[/bold red]", "Server has no TLS 1.3", bytes_read)
if isinstance(error, ssl.SSLZeroReturnError):
# Close notify в неожиданный момент — признак DPI или блокировки