diff --git a/cli/runners.py b/cli/runners.py index 6e38ab8..fd10789 100644 --- a/cli/runners.py +++ b/cli/runners.py @@ -14,7 +14,7 @@ from cli.ui import clean_hostname, build_domain_row from core.tls_scanner import check_domain_tls, check_http_injection, create_dpi_client from core.tcp16_scanner import check_tcp_16_20, check_tcp_16_20_with_rtt from core.telegram_scanner import run_telegram_test as _run_telegram_test -from utils.network import get_resolved_ip +from utils.network import get_resolved_ip, get_fake_ip_type # ── Воркеры ────────────────────────────────────────────────────────────────── @@ -51,17 +51,29 @@ async def _resolve_worker(domain_raw: str, semaphore: asyncio.Semaphore, stub_ip entry["dns_fake"] = None return entry - if stub_ips and resolved_ipv4 in stub_ips: - fake = "[bold red]DNS FAKE[/bold red]" - detail = f"DNS подмена -> {resolved_ipv4}" - entry["t13v4_res"] = (fake, detail, 0.0) - entry["t12_res"] = (fake, detail, 0.0) - entry["http_res"] = (fake, detail) - entry["dns_fake"] = True + fake_type = get_fake_ip_type(resolved_ipv4) + if fake_type != "fakeip" and stub_ips and resolved_ipv4 in stub_ips: + fake_type = "isp" + + if fake_type == "isp": + fake = "[bold red]DNS FAKE[/bold red]" + detail = f"Заглушка провайдера -> {resolved_ipv4}" + entry["t13v4_res"] = (fake, detail, 0.0) + entry["t12_res"] = (fake, detail, 0.0) + entry["http_res"] = (fake, detail) + entry["dns_fake"] = True + elif fake_type == "local": + fake = "[bold yellow]LOCAL IP[/bold yellow]" + detail = f"Локальный IP -> {resolved_ipv4}" + entry["t13v4_res"] = (fake, detail, 0.0) + entry["t12_res"] = (fake, detail, 0.0) + entry["http_res"] = (fake, detail) + entry["dns_fake"] = True + + return entry return entry - async def _tls_worker( entry: dict, client: httpx.AsyncClient, @@ -104,7 +116,7 @@ async def _tcp16_worker(item: dict, semaphore: asyncio.Semaphore) -> list: port = int(item.get("port", 443)) sni = None if port == 80 else (item.get("sni") or config.FAT_DEFAULT_SNI) - alive_str, status, detail = await check_tcp_16_20(ip, port, sni, semaphore) + alive_str, status, detail, rtt = await check_tcp_16_20(ip, port, sni, semaphore) asn_raw = str(item.get("asn", "")).strip() asn_str = ( @@ -113,6 +125,11 @@ async def _tcp16_worker(item: dict, semaphore: asyncio.Semaphore) -> list: else asn_raw.upper() ) or "-" + if rtt is not None: + rtt_ms = f"{int(rtt * 1000)}мс" + detail = f"{detail}" + #detail = f"{detail} | {rtt_ms}" if detail else rtt_ms + return [item["id"], asn_str, item["provider"], alive_str, status, detail] @@ -187,13 +204,25 @@ async def run_domains_test(semaphore: asyncio.Semaphore, stub_ips: set, domains: await client_http.aclose() rows = sorted([build_domain_row(e) for e in entries], key=lambda x: x[0]) - dns_fail_count = 0 - resolved_ips_counter: dict = {} + isp_stubs = {} + local_stubs = {} + fakeip_stubs = {} + for r in rows: resolved_ip = r[5] if len(r) > 5 else None - if resolved_ip and stub_ips and resolved_ip in stub_ips: - resolved_ips_counter[resolved_ip] = resolved_ips_counter.get(resolved_ip, 0) + 1 + if resolved_ip: + ftype = get_fake_ip_type(resolved_ip) + if ftype != "fakeip" and stub_ips and resolved_ip in stub_ips: + ftype = "isp" + + if ftype == "isp": + isp_stubs[resolved_ip] = isp_stubs.get(resolved_ip, 0) + 1 + elif ftype == "local": + local_stubs[resolved_ip] = local_stubs.get(resolved_ip, 0) + 1 + elif ftype == "fakeip": + fakeip_stubs[resolved_ip] = fakeip_stubs.get(resolved_ip, 0) + 1 + if any("DNS FAIL" in r[col] for col in (1, 2, 3)): dns_fail_count += 1 @@ -201,19 +230,38 @@ async def run_domains_test(semaphore: asyncio.Semaphore, stub_ips: set, domains: table.add_row(*r[:5]) console.print(table) - confirmed_stubs = {ip: c for ip, c in resolved_ips_counter.items() if stub_ips and ip in stub_ips} - if confirmed_stubs or dns_fail_count > 0: - console.print(f"\n[bold yellow][i][!] НА ВАШЕМ УСТРОЙСТВЕ/РОУТЕРЕ НЕ НАСТРОЕН DoH:[/bold yellow]") - if confirmed_stubs: - ips_text = [f"[red]{ip}[/red] у {c} доменов" for ip, c in confirmed_stubs.items()] - console.print(f"DNS вернул IP заглушки: {', '.join(ips_text)}") + if isp_stubs or local_stubs or fakeip_stubs or dns_fail_count > 0: + console.print(f"\n[bold yellow][i][!] ИНФОРМАЦИЯ О DNS РЕЗОЛВЕ:[/bold yellow]") + + if fakeip_stubs: + total_fake = sum(fakeip_stubs.values()) + console.print(f"Трафик перехватывается Fake-IP: у [green]{total_fake}[/green] доменов") + + if isp_stubs: + total_isp = sum(isp_stubs.values()) + if len(isp_stubs) <= 3: + ips_text = [f"[red]{ip}[/red]" for ip in isp_stubs.keys()] + console.print(f"DNS вернул IP заглушки провайдера ({', '.join(ips_text)}): у {total_isp} доменов") + else: + console.print(f"DNS вернул IP заглушки провайдера: у [red]{total_isp}[/red] доменов") + + if local_stubs: + total_local = sum(local_stubs.values()) + if len(local_stubs) <= 3: + ips_text = [f"[yellow]{ip}[/yellow]" for ip in local_stubs.keys()] + console.print(f"DNS вернул локальные IP (работает AdGuard/hosts?): ({', '.join(ips_text)}): у {total_local} доменов") + else: + console.print(f"DNS вернул локальные IP (AdGuard/hosts/Pi-hole?): у [yellow]{total_local}[/yellow] доменов") + if dns_fail_count > 0: console.print(f"У {dns_fail_count} сайтов обнаружен DNS FAIL (Домен не найден)") - console.print("[yellow]Рекомендация: Настройте DoH на вашем устройстве и роутере[/yellow]\n") - console.print("После настройки сбросьте кеш DNS:") - console.print("Windows: [dim]ipconfig /flushdns[/dim]") - console.print("MacOS: [dim]sudo dscacheutil -flushcache; sudo killall -HUP mDNSResponder[/dim]") - console.print("Linux: [dim]sudo resolvectl flush-caches[/dim]\n") + + if isp_stubs or dns_fail_count > 0: + console.print("[yellow]Рекомендация: Настройте DoH на вашем устройстве и роутере[/yellow]\n") + console.print("После настройки сбросьте кеш DNS:") + console.print("Windows: [dim]ipconfig /flushdns[/dim]") + console.print("MacOS: [dim]sudo dscacheutil -flushcache; sudo killall -HUP mDNSResponder[/dim]") + console.print("Linux: [dim]sudo resolvectl flush-caches[/dim]\n") block_markers = ("TLS DPI", "TLS MITM", "TLS BLOCK", "ISP PAGE", "BLOCKED", "TCP RST", "TCP ABORT") return { @@ -402,7 +450,7 @@ async def run_whitelist_sni_test(semaphore: asyncio.Semaphore, tcp_items: list, # Шаг 0: без SNI try: - _a, st0, d0 = await check_tcp_16_20(ip, 443, "", semaphore, hint_rtt=hint) + _a, st0, d0, _rtt = await check_tcp_16_20(ip, 443, "", semaphore, hint_rtt=hint) if "OK" in st0: found.append(("(без SNI)", 0)) elif "DETECTED" not in st0 and "at " not in d0: @@ -422,7 +470,7 @@ async def run_whitelist_sni_test(semaphore: asyncio.Semaphore, tcp_items: list, break async def _one(sni: str): - _a, s, d = await check_tcp_16_20(ip, 443, sni, semaphore, hint_rtt=hint) + _a, s, d, _rtt = await check_tcp_16_20(ip, 443, sni, semaphore, hint_rtt=hint) return sni, s, d results = await asyncio.gather( diff --git a/cli/ui.py b/cli/ui.py index d3e65bc..7866bde 100644 --- a/cli/ui.py +++ b/cli/ui.py @@ -35,7 +35,12 @@ def build_domain_row(entry: dict) -> list: if d12: details.append(f"T12:{d12}") if d13: details.append(f"T13:{d13}") - times = [t for t in (t12_elapsed, t13_elapsed) if t > 0] + times = [] + if "TIMEOUT" not in t12_status and t12_elapsed > 0: + times.append(t12_elapsed) + if "TIMEOUT" not in t13_status and t13_elapsed > 0: + times.append(t13_elapsed) + if times: details.append(f"{min(times):.1f}s") @@ -44,9 +49,9 @@ def build_domain_row(entry: dict) -> list: async def ask_test_selection() -> str: - # Алгоритмически строим все непустые подмножества цифр 1–6 + # Алгоритмически строим все непустые подмножества цифр 1–7 from itertools import combinations - digits = "123456" + digits = "1234567" valid = { "".join(sorted(combo)) for r in range(1, len(digits) + 1) @@ -55,11 +60,12 @@ async def ask_test_selection() -> str: console.print( "\n[bold]Какие тесты запустить?[/bold]\n" " [cyan]1[/cyan] — Проверка подмены DNS\n" - " [cyan]2[/cyan] — Проверка доступности доменов\n" - " [cyan]3[/cyan] — Проверка TCP 16-20KB блокировки\n" - " [cyan]4[/cyan] — Поиск белых SNI для ASN\n" - " [cyan]5[/cyan] — Проверка Telegram (замедление/блокировка)\n" - " [cyan]6[/cyan] — Легенда статусов\n" + " [cyan]2[/cyan] — Проверка доступности DNS-серверов\n" + " [cyan]3[/cyan] — Проверка доступности доменов\n" + " [cyan]4[/cyan] — Проверка TCP 16-20KB блокировки\n" + " [cyan]5[/cyan] — Поиск белых SNI для ASN\n" + " [cyan]6[/cyan] — Проверка Telegram (замедление/блокировка)\n" + " [cyan]7[/cyan] — Легенда статусов\n" " [cyan]123[/cyan] — [dim](по умолчанию)[/dim]" ) loop = asyncio.get_running_loop() @@ -75,7 +81,7 @@ async def ask_test_selection() -> str: if raw in valid: return raw - console.print("[yellow]Неверный ввод, запускаем все тесты.[/yellow]") + console.print("[yellow]Неверный ввод, запускаем тесты 1, 2, 3.[/yellow]") return "123" @@ -88,6 +94,7 @@ def print_legend() -> None: ("TLS MITM", "Man-in-the-Middle: подменён сертификат (Unknown CA, Cert expired, Hostname mismatch)"), ("TLS BLOCK", "Блокировка версии TLS или протокола целиком (protocol_version alert)"), ("SSL ERR", "Прочие SSL ошибки: bad key share, record layer fail, internal error"), + ("NO TLS1.3", "Сервер не поддерживает TLS 1.3 (норма для старых серверов)") ]), ("[bold cyan]— TCP / Соединение —[/bold cyan]", [ ("TCP RST", "Соединение сброшено (TCP RST пакет от DPI или сервера)"), @@ -101,6 +108,9 @@ def print_legend() -> None: ("[bold cyan]— DNS —[/bold cyan]", [ ("DNS FAIL", "Домен не разрешился через системный резолвер"), ("DNS FAKE", "IP домена совпадает с известной заглушкой провайдера"), + ("TIMEOUT", "DNS-сервер не ответил в отведённое время"), + ("BLOCKED", "DoH-сервер заблокирован провайдером (HTTP не прошёл)"), + ("NXDOMAIN", "Домен не существует по мнению этого сервера"), ]), ("[bold cyan]— HTTP / Блокировки —[/bold cyan]", [ ("BLOCKED", "HTTP 451 — Недоступно по юридическим причинам"), @@ -109,7 +119,7 @@ def print_legend() -> None: "[red]Красный[/red] — редирект на чужой домен (подозрительно)"), ]), ("[bold cyan]— TCP 16-20KB тест —[/bold cyan]", [ - ("DETECTED", "Обрыв соединения после отправки 16–20KB (FAT header блокировка)"), + ("DETECTED", "Обрыв соединения после отправки 16KB+"), ("OK", "Все 16 запросов прошли без обрыва"), ]), ("[bold cyan]— Прочее —[/bold cyan]", [ diff --git a/config.yml b/config.yml index fa3cae9..09d7409 100644 --- a/config.yml +++ b/config.yml @@ -83,4 +83,43 @@ DNS_DOH_SERVERS: - ["https://cloudflare-dns.com/dns-query", "Cloudflare"] - ["https://one.one.one.one/dns-query", "Cloudflare"] - ["https://dns.adguard-dns.com/resolve", "AdGuard"] - - ["https://dns.alidns.com/resolve", "Alibaba"] \ No newline at end of file + - ["https://dns.alidns.com/resolve", "Alibaba"] + +# --- Тест 2: Проверка доступности DNS-серверов --- +DNS_AVAILABILITY_DOMAINS: + - "example.com" + - "vk.com" + - "ozon.ru" + - "habr.com" + - "mail.ru" + +# Серверы для теста доступности. Каждый сервер — [адрес, имя, тип]. +# Типы: "udp", "doh_json", "doh_wire" +DNS_AVAILABILITY_SERVERS: + # ── UDP ────────────────────────────────────────────────────────────────────── + - ["8.8.8.8", "Google", "udp"] + - ["1.1.1.1", "Cloudflare", "udp"] + - ["9.9.9.9", "Quad9", "udp"] + - ["94.140.14.14", "AdGuard", "udp"] + - ["77.88.8.8", "Yandex", "udp"] + - ["208.67.222.222", "OpenDNS", "udp"] + - ["76.76.2.0", "ControlD", "udp"] + - ["185.228.168.9", "CleanBrowsing", "udp"] + - ["76.223.122.150", "NextDNS", "udp"] + +# - ["194.242.2.2", "Mullvad", "udp"] - походу заблочен +# - [ "https://doh.mullvad.net/dns-query", "Mullvad", "doh_wire" ] + + # ── DoH Wire (RFC 8484, POST/GET application/dns-message) ─────────────────── + - ["https://dns.google/dns-query", "Google", "doh_wire"] + - ["https://cloudflare-dns.com/dns-query", "Cloudflare", "doh_wire"] + - ["https://1.1.1.1/dns-query", "Cloudflare (IP)", "doh_wire"] + - ["https://dns.adguard-dns.com/dns-query", "AdGuard", "doh_wire"] + - ["https://dns.quad9.net/dns-query", "Quad9", "doh_wire"] + - ["https://doh.opendns.com/dns-query", "OpenDNS", "doh_wire"] + - ["https://common.dot.dns.yandex.net/dns-query", "Yandex", "doh_wire"] + - ["https://dns.nextdns.io/dns-query", "NextDNS", "doh_wire"] + - ["https://doh.cleanbrowsing.org/doh/security-filter", "CleanBrowsing", "doh_wire"] + - ["https://dns.sb/dns-query", "DNS.SB", "doh_wire"] + - ["https://doh.dns.sb/dns-query", "DNS.SB (alt)", "doh_wire"] + - ["https://doh.libredns.gr/dns-query", "LibreDNS", "doh_wire"] \ No newline at end of file diff --git a/core/dns_scanner.py b/core/dns_scanner.py index d9d5b59..4d31cc1 100644 --- a/core/dns_scanner.py +++ b/core/dns_scanner.py @@ -2,15 +2,21 @@ import os import struct import socket import asyncio +import base64 +import time from typing import Tuple, List, Union, Optional import httpx from utils import config from cli.console import console +from utils.network import get_fake_ip_type +# ── DNS wire-format helpers ─────────────────────────────────────────────────── + def _build_dns_query(domain: str) -> bytes: + """Собирает DNS-запрос в wire-формате (RFC 1035).""" tx_id = os.urandom(2) - flags = b'\x01\x00' + flags = b'\x01\x00' # RD=1 qdcount = b'\x00\x01' ancount = nscount = arcount = b'\x00\x00' header = tx_id + flags + qdcount + ancount + nscount + arcount @@ -20,133 +26,201 @@ def _build_dns_query(domain: str) -> bytes: qname += bytes([len(part)]) + part.encode('ascii') qname += b'\x00' - return header + qname + b'\x00\x01' + b'\x00\x01' # QTYPE=A, QCLASS=IN + qtype = b'\x00\x01' # A + qclass = b'\x00\x01' # IN + question = qname + qtype + qclass + + return header + question -def _parse_dns_response(data: bytes, tx_id: bytes) -> Union[List[str], str]: +def _parse_dns_response(data: bytes, expected_tx_id: bytes) -> Union[List[str], str]: + """ + Парсит DNS-ответ wire-формата. + Возвращает список IPv4-адресов, "NXDOMAIN", или "PARSE_ERR". + """ if len(data) < 12: - raise ValueError("Слишком короткий ответ") - if data[:2] != tx_id: - raise ValueError("ID транзакции не совпадает") + return "PARSE_ERR" + if data[:2] != expected_tx_id: + return "PARSE_ERR" + + flags = struct.unpack(">H", data[2:4])[0] + rcode = flags & 0x0F + ancount = struct.unpack(">H", data[6:8])[0] - flags = struct.unpack(">H", data[2:4])[0] - rcode = flags & 0x000F if rcode == 3: return "NXDOMAIN" - if rcode != 0: - raise ValueError(f"RCODE ошибки: {rcode}") + if rcode != 0 or ancount == 0: + return "PARSE_ERR" - qdcount, ancount, _, _ = struct.unpack(">HHHH", data[4:12]) + # Пропускаем заголовок (12) + вопрос offset = 12 - - def skip_name(pos): + try: while True: - if pos >= len(data): - break - if (data[pos] & 0xC0) == 0xC0: - return pos + 2 - length = data[pos] + if offset >= len(data): + return "PARSE_ERR" + length = data[offset] if length == 0: - return pos + 1 - pos += length + 1 - return pos - - for _ in range(qdcount): - offset = skip_name(offset) - offset += 4 # QTYPE + QCLASS + offset += 1 + break + if length & 0xC0 == 0xC0: # pointer + offset += 2 + break + offset += length + 1 + offset += 4 # qtype + qclass + except IndexError: + return "PARSE_ERR" ips = [] for _ in range(ancount): - offset = skip_name(offset) - if offset + 10 > len(data): + try: + if offset >= len(data): + break + # Имя (может быть pointer) + if data[offset] & 0xC0 == 0xC0: + offset += 2 + else: + while offset < len(data) and data[offset] != 0: + offset += data[offset] + 1 + offset += 1 + + if offset + 10 > len(data): + break + rtype = struct.unpack(">H", data[offset:offset+2])[0] + rdlen = struct.unpack(">H", data[offset+8:offset+10])[0] + offset += 10 + + if rtype == 1 and rdlen == 4: # A record + ip = ".".join(str(b) for b in data[offset:offset+4]) + ips.append(ip) + offset += rdlen + except (IndexError, struct.error): break - atype, aclass, _, rdlength = struct.unpack(">HHIH", data[offset:offset + 10]) - offset += 10 - rdata = data[offset:offset + rdlength] - offset += rdlength - if atype == 1 and aclass == 1 and rdlength == 4: - ips.append(socket.inet_ntoa(rdata)) - return ips if ips else "EMPTY" + return ips if ips else "PARSE_ERR" -class _DNSDatagramProtocol(asyncio.DatagramProtocol): - def __init__(self): - self.future = asyncio.get_event_loop().create_future() - - def connection_made(self, transport): - self.transport = transport - - def datagram_received(self, data, addr): - if not self.future.done(): - self.future.set_result(data) - - def error_received(self, exc): - if not self.future.done(): - self.future.set_exception(exc) - - def connection_lost(self, exc): - pass - +# ── UDP low-level ──────────────────────────────────────────────────────────── async def _resolve_udp_native(nameserver: str, domain: str, timeout: float) -> Union[List[str], str]: - loop = asyncio.get_running_loop() - req_data = _build_dns_query(domain) - tx_id = req_data[:2] + """ + UDP DNS-запрос напрямую через asyncio DatagramProtocol. + Возвращает список IP или "NXDOMAIN"/"PARSE_ERR" при ошибке. + """ + query = _build_dns_query(domain) + tx_id = query[:2] - transport, protocol = await loop.create_datagram_endpoint( - lambda: _DNSDatagramProtocol(), - remote_addr=(nameserver, 53), + loop = asyncio.get_running_loop() + future: asyncio.Future = loop.create_future() + + class _Proto(asyncio.DatagramProtocol): + def datagram_received(self, data, addr): + if not future.done(): + future.set_result(data) + def error_received(self, exc): + if not future.done(): + future.set_exception(exc) + def connection_lost(self, exc): + if not future.done(): + future.set_exception(exc or ConnectionError("UDP closed")) + + transport, _ = await loop.create_datagram_endpoint( + _Proto, remote_addr=(nameserver, 53) ) try: - transport.sendto(req_data) - resp_data = await asyncio.wait_for(protocol.future, timeout) + transport.sendto(query) + resp_data = await asyncio.wait_for(future, timeout=timeout) return _parse_dns_response(resp_data, tx_id) finally: transport.close() -# ── Probe-функции ───────────────────────────────────────────────────────────── - +# ── Single-domain probes ───────────────────────────────────────────────────── async def _probe_udp_single(nameserver: str, domain: str) -> Optional[List[str]]: - """Резолвит один домен через UDP. Возвращает список IP или None при ошибке.""" - try: - res = await _resolve_udp_native(nameserver, domain, config.DNS_CHECK_TIMEOUT) - return res if isinstance(res, list) else None - except Exception: - return None + """UDP DNS — один домен (до 2 попыток).""" + for attempt in range(2): + try: + res = await _resolve_udp_native(nameserver, domain, config.DNS_CHECK_TIMEOUT) + if isinstance(res, list): + return res + except Exception: + pass + if attempt == 0: + await asyncio.sleep(0.5) + return None - -async def _probe_doh_single(doh_url: str, domain: str) -> Optional[List[str]]: - """Резолвит один домен через DoH. Возвращает список IP или None при ошибке.""" +async def _probe_doh_json_single(doh_url: str, domain: str) -> Optional[List[str]]: + """DoH JSON API (?name=…&type=A) — один домен (до 2 попыток).""" headers = {"Accept": "application/dns-json", "User-Agent": config.USER_AGENT} - try: - proxy_url = getattr(config, "PROXY_URL", None) - async with httpx.AsyncClient( - timeout=config.DNS_CHECK_TIMEOUT, - verify=False, - headers=headers, - proxy=proxy_url, - trust_env=False - ) as client: - resp = await client.get(doh_url, params={"name": domain, "type": "A"}) - if resp.status_code != 200: - return None - data = resp.json() - if data.get("Status") == 3: - return None - ips = [a["data"] for a in data.get("Answer", []) if a.get("type") == 1] - return ips if ips else None - except Exception: - return None + for attempt in range(2): + try: + proxy_url = getattr(config, "PROXY_URL", None) + async with httpx.AsyncClient( + timeout=config.DNS_CHECK_TIMEOUT, verify=False, + headers=headers, proxy=proxy_url, trust_env=False + ) as client: + resp = await client.get(doh_url, params={"name": domain, "type": "A"}) + if resp.status_code == 200: + data = resp.json() + if data.get("Status") != 3: # Не NXDOMAIN + ips = [a["data"] for a in data.get("Answer", []) if a.get("type") == 1] + if ips: + return ips + except Exception: + pass + if attempt == 0: + await asyncio.sleep(0.5) + return None +async def _probe_doh_wire_single(doh_url: str, domain: str) -> Optional[List[str]]: + """DoH wire-format (RFC 8484) — один домен (до 2 попыток).""" + query = _build_dns_query(domain) + tx_id = query[:2] + for attempt in range(2): + try: + proxy_url = getattr(config, "PROXY_URL", None) + async with httpx.AsyncClient( + timeout=config.DNS_CHECK_TIMEOUT, verify=False, + proxy=proxy_url, trust_env=False, http2=True + ) as client: + resp = await client.post( + doh_url, content=query, + headers={ + "Content-Type": "application/dns-message", + "Accept": "application/dns-message", + "User-Agent": config.USER_AGENT, + }, + ) + if resp.status_code != 200: + dns_b64 = base64.urlsafe_b64encode(query).rstrip(b'=').decode() + resp = await client.get( + doh_url, params={"dns": dns_b64}, + headers={ + "Accept": "application/dns-message", + "User-Agent": config.USER_AGENT, + }, + ) + if resp.status_code == 200: + result = _parse_dns_response(resp.content, tx_id) + if isinstance(result, list): + return result + except Exception: + pass + if attempt == 0: + await asyncio.sleep(0.5) + return None + + +# ── Batch probes ────────────────────────────────────────────────────────────── async def _probe_udp_all(nameserver: str, domains: list) -> dict: - """Параллельно резолвит все домены через UDP DNS.""" async def _query(domain): try: res = await _resolve_udp_native(nameserver, domain, config.DNS_CHECK_TIMEOUT) - return domain, "OK", res + if isinstance(res, list): + return domain, "OK", res + if res == "NXDOMAIN": + return domain, "NXDOMAIN", None + return domain, "ERROR", None except asyncio.TimeoutError: return domain, "TIMEOUT", None except Exception: @@ -170,8 +244,8 @@ async def _probe_udp_all(nameserver: str, domains: list) -> dict: return {"ok": ok, "timeout": timeout_cnt, "error": error, "results": results} -async def _probe_doh_all(doh_url: str, domains: list) -> dict: - """Параллельно резолвит все домены через DoH.""" +async def _probe_doh_json_all(doh_url: str, domains: list) -> dict: + """Параллельно резолвит все домены через DoH JSON API.""" headers = {"Accept": "application/dns-json", "User-Agent": config.USER_AGENT} async def _query(client, domain): @@ -191,11 +265,72 @@ async def _probe_doh_all(doh_url: str, domains: list) -> dict: proxy_url = getattr(config, "PROXY_URL", None) async with httpx.AsyncClient( - timeout=config.DNS_CHECK_TIMEOUT, - verify=False, - headers=headers, - proxy=proxy_url, - trust_env=False + timeout=config.DNS_CHECK_TIMEOUT, verify=False, + headers=headers, proxy=proxy_url, trust_env=False, + ) as client: + completed = await asyncio.gather(*[_query(client, d) for d in domains]) + + ok = timeout_cnt = blocked = 0 + results = {} + for domain, status, res in completed: + if status in ("OK", "NXDOMAIN"): + results[domain] = res if status == "OK" else "NXDOMAIN" + ok += 1 + elif status == "TIMEOUT": + results[domain] = "TIMEOUT" + timeout_cnt += 1 + else: + results[domain] = "BLOCKED" + blocked += 1 + + return {"ok": ok, "timeout": timeout_cnt, "blocked": blocked, "results": results} + + +async def _probe_doh_wire_all(doh_url: str, domains: list) -> dict: + """Параллельно резолвит все домены через DoH wire-format (RFC 8484).""" + + async def _query(client, domain): + try: + query = _build_dns_query(domain) + tx_id = query[:2] + + # ── POST ── + resp = await client.post( + doh_url, content=query, + headers={ + "Content-Type": "application/dns-message", + "Accept": "application/dns-message", + "User-Agent": config.USER_AGENT, + }, + ) + if resp.status_code != 200: + # ── GET fallback ── + dns_b64 = base64.urlsafe_b64encode(query).rstrip(b'=').decode() + resp = await client.get( + doh_url, params={"dns": dns_b64}, + headers={ + "Accept": "application/dns-message", + "User-Agent": config.USER_AGENT, + }, + ) + if resp.status_code != 200: + return domain, "BLOCKED", None + + result = _parse_dns_response(resp.content, tx_id) + if result == "NXDOMAIN": + return domain, "NXDOMAIN", None + if isinstance(result, list): + return domain, "OK", result + return domain, "EMPTY", None + except httpx.TimeoutException: + return domain, "TIMEOUT", None + except Exception: + return domain, "BLOCKED", None + + proxy_url = getattr(config, "PROXY_URL", None) + async with httpx.AsyncClient( + timeout=config.DNS_CHECK_TIMEOUT, verify=False, + proxy=proxy_url, trust_env=False, http2=True ) as client: completed = await asyncio.gather(*[_query(client, d) for d in domains]) @@ -236,7 +371,9 @@ async def collect_stub_ips_silently() -> set: return {ip for ip, count in ip_count.items() if count >= 2} -async def check_dns_integrity() -> Tuple[set, int]: +# ── Тест 1: Проверка подмены DNS ───────────────────────────────────────────── + +async def check_dns_integrity() -> Tuple[set, int, bool]: total = len(config.DNS_CHECK_DOMAINS) probe_domain = config.DNS_CHECK_DOMAINS[0] @@ -246,123 +383,138 @@ async def check_dns_integrity() -> Tuple[set, int]: ) console.print("[dim]Проверяем, перехватывает ли провайдер DNS запросы...[/dim]\n") - # ── Фаза 1: быстрый параллельный пинг всех серверов одним доменом ──────── - udp_servers = config.DNS_UDP_SERVERS # [(ip, name), ...] - doh_servers = config.DNS_DOH_SERVERS # [(url, name), ...] + # ── Списки серверов ─────────────────────────────────────────────────────── + udp_servers = config.DNS_UDP_SERVERS # [(ip, name)] + doh_json_servers = config.DNS_DOH_SERVERS # [(url, name)] — JSON API + doh_wire_servers = getattr(config, 'DNS_DOH_WIRE_SERVERS', []) # [(url, name)] — RFC 8484 - async def _quick_udp(ip, name): - res = await _probe_udp_single(ip, probe_domain) - return ip, name, res # res = List[str] или None + # ── Фаза 1: быстрый параллельный пинг одним доменом ────────────────────── + async def _qp_udp(ip, n): + return ip, n, await _probe_udp_single(ip, probe_domain) - async def _quick_doh(url, name): - res = await _probe_doh_single(url, probe_domain) - return url, name, res + async def _qp_json(u, n): + return u, n, await _probe_doh_json_single(u, probe_domain) - quick_udp_tasks = [_quick_udp(ip, name) for ip, name in udp_servers] - quick_doh_tasks = [_quick_doh(url, name) for url, name in doh_servers] + async def _qp_wire(u, n): + return u, n, await _probe_doh_wire_single(u, probe_domain) - quick_results = await asyncio.gather(*quick_udp_tasks, *quick_doh_tasks) + quick = await asyncio.gather( + *[_qp_udp(ip, n) for ip, n in udp_servers], + *[_qp_json(u, n) for u, n in doh_json_servers], + *[_qp_wire(u, n) for u, n in doh_wire_servers], + ) - n_udp = len(udp_servers) - udp_quick = quick_results[:n_udp] # (ip, name, ips_or_None) - doh_quick = quick_results[n_udp:] # (url, name, ips_or_None) + n_u, n_j = len(udp_servers), len(doh_json_servers) + udp_quick = quick[:n_u] + json_quick = quick[n_u:n_u + n_j] + wire_quick = quick[n_u + n_j:] - # ── Фаза 2: для тех, кто вернул None на быстром пинге — полный тест ────── - # (параллельно для всех «сомнительных») + # ── Фаза 2: полный тест для «молчащих» серверов ────────────────────────── + async def _full_udp(ip, n): + p = await _probe_udp_all(ip, config.DNS_CHECK_DOMAINS) + return ip, n, p["ok"] > 0 - async def _full_udp_check(ip, name): - probe = await _probe_udp_all(ip, config.DNS_CHECK_DOMAINS) - return ip, name, probe["ok"] > 0 + async def _full_json(u, n): + p = await _probe_doh_json_all(u, config.DNS_CHECK_DOMAINS) + return u, n, p["ok"] > 0 - async def _full_doh_check(url, name): - probe = await _probe_doh_all(url, config.DNS_CHECK_DOMAINS) - return url, name, probe["ok"] > 0 + async def _full_wire(u, n): + p = await _probe_doh_wire_all(u, config.DNS_CHECK_DOMAINS) + return u, n, p["ok"] > 0 - needs_full_udp = [(ip, name) for ip, name, res in udp_quick if res is None] - needs_full_doh = [(url, name) for url, name, res in doh_quick if res is None] + need_u = [(k, n) for k, n, r in udp_quick if r is None] + need_j = [(k, n) for k, n, r in json_quick if r is None] + need_w = [(k, n) for k, n, r in wire_quick if r is None] - full_udp_results = {} - full_doh_results = {} + full_u = full_j = full_w = {} - if needs_full_udp or needs_full_doh: - full_tasks = ( - [_full_udp_check(ip, name) for ip, name in needs_full_udp] + - [_full_doh_check(url, name) for url, name in needs_full_doh] + if need_u or need_j or need_w: + done = await asyncio.gather( + *[_full_udp(k, n) for k, n in need_u], + *[_full_json(k, n) for k, n in need_j], + *[_full_wire(k, n) for k, n in need_w], ) - full_done = await asyncio.gather(*full_tasks) - n_full_udp = len(needs_full_udp) - for ip, name, ok in full_done[:n_full_udp]: - full_udp_results[(ip, name)] = ok - for url, name, ok in full_done[n_full_udp:]: - full_doh_results[(url, name)] = ok + nu, nj = len(need_u), len(need_j) + full_u = {(k, n): ok for k, n, ok in done[:nu]} + full_j = {(k, n): ok for k, n, ok in done[nu:nu + nj]} + full_w = {(k, n): ok for k, n, ok in done[nu + nj:]} - # ── Определяем финальный статус каждого сервера ─────────────────────────── - # UDP - udp_working = [] - udp_log_lines = [] - for ip, name, quick_res in udp_quick: - if quick_res is not None: - udp_working.append((ip, name)) - else: - if full_udp_results.get((ip, name), False): - udp_working.append((ip, name)) + # ── Классификация серверов ──────────────────────────────────────────────── + def _classify(quick_list, full_map, label): + working, log = [], [] + for key, name, qr in quick_list: + if qr is not None or full_map.get((key, name), False): + working.append((key, name)) else: - udp_log_lines.append(f"[dim]• UDP [yellow]{ip} ({name})[/yellow] недоступен[/dim]") + log.append(f"[dim]• {label} [yellow]{key} ({name})[/yellow] недоступен[/dim]") + return working, log - # DoH - doh_working = [] - doh_log_lines = [] - for url, name, quick_res in doh_quick: - if quick_res is not None: - doh_working.append((url, name)) - else: - if full_doh_results.get((url, name), False): - doh_working.append((url, name)) - else: - doh_log_lines.append(f"[dim]• DoH [yellow]{url} ({name})[/yellow] недоступен[/dim]") + udp_working, udp_log = _classify(udp_quick, full_u, "UDP") + json_working, json_log = _classify(json_quick, full_j, "DoH JSON") + wire_working, wire_log = _classify(wire_quick, full_w, "DoH Wire") - # Выводим список только если есть проблемные серверы - all_log = udp_log_lines + doh_log_lines + all_log = udp_log + json_log + wire_log if all_log: for line in all_log: console.print(line) console.print() - # ── Выбираем по одному серверу для полного теста ────────────────────────── - # Всегда берём первый из конфига если он рабочий, иначе первый из рабочих - def _pick_preferred(working_list, all_list): - if not working_list: + # ── Выбор по одному серверу каждого типа ───────────────────────────────── + def _pick(working, all_list): + if not working: return None, None - first_key = all_list[0][0] - for key, name in working_list: - if key == first_key: - return key, name - return working_list[0] + first = all_list[0][0] + for k, n in working: + if k == first: + return k, n + return working[0] - udp_key, udp_name_chosen = _pick_preferred(udp_working, udp_servers) - doh_key, doh_name_chosen = _pick_preferred(doh_working, doh_servers) + udp_key, udp_name = _pick(udp_working, udp_servers) + json_key, json_name = _pick(json_working, doh_json_servers) + wire_key, wire_name = ( + _pick(wire_working, doh_wire_servers) if doh_wire_servers else (None, None) + ) # ── Полный тест выбранными серверами ────────────────────────────────────── - udp_probe = None - doh_probe = None + _unavail = lambda: {"results": {d: "UNAVAIL" for d in config.DNS_CHECK_DOMAINS}} + # UDP if udp_key: - console.print(f"[dim]Выбран UDP: [cyan]{udp_key} ({udp_name_chosen})[/cyan] — резолвим все домены...[/dim]") + console.print(f"[dim]UDP: [cyan]{udp_key} ({udp_name})[/cyan][/dim]") udp_probe = await _probe_udp_all(udp_key, config.DNS_CHECK_DOMAINS) udp_label = f"UDP {udp_key}" else: console.print("[red]× Все UDP DNS-серверы недоступны[/red]") - udp_probe = {"results": {d: "UNAVAIL" for d in config.DNS_CHECK_DOMAINS}} - udp_label = "UDP DNS (недоступен)" + udp_probe = _unavail() + udp_label = "UDP (—)" - if doh_key: - console.print(f"[dim]Выбран DoH: [cyan]{doh_key} ({doh_name_chosen})[/cyan] — резолвим все домены...[/dim]") - doh_probe = await _probe_doh_all(doh_key, config.DNS_CHECK_DOMAINS) - doh_label = f"DoH {doh_name_chosen}" + # DoH JSON + if json_key: + console.print(f"[dim]DoH JSON: [cyan]{json_key} ({json_name})[/cyan][/dim]") + json_probe = await _probe_doh_json_all(json_key, config.DNS_CHECK_DOMAINS) + json_label = f"DoH JSON ({json_name})" else: - console.print("[red]× Все DoH-серверы недоступны[/red]") - doh_probe = {"results": {d: "UNAVAIL" for d in config.DNS_CHECK_DOMAINS}} - doh_label = "DoH (недоступен)" + console.print("[red]× Все DoH JSON-серверы недоступны[/red]") + json_probe = _unavail() + json_label = "DoH JSON (—)" + + # DoH Wire (RFC 8484) + has_wire = bool(doh_wire_servers) + if has_wire: + if wire_key: + console.print( + f"[dim]DoH Wire [italic](RFC 8484)[/italic]: " + f"[cyan]{wire_key} ({wire_name})[/cyan][/dim]" + ) + wire_probe = await _probe_doh_wire_all(wire_key, config.DNS_CHECK_DOMAINS) + wire_label = f"DoH Wire ({wire_name})" + else: + console.print("[red]× Все DoH Wire-серверы (RFC 8484) недоступны[/red]") + wire_probe = _unavail() + wire_label = "DoH Wire (—)" + else: + wire_probe = None + wire_label = "" console.print() @@ -372,50 +524,80 @@ async def check_dns_integrity() -> Tuple[set, int]: rows = [] for domain in config.DNS_CHECK_DOMAINS: - udp_res = udp_probe["results"].get(domain) - doh_res = doh_probe["results"].get(domain) + udp_res = udp_probe["results"].get(domain) + json_res = json_probe["results"].get(domain) + wire_res = wire_probe["results"].get(domain) if wire_probe else None - udp_ips = udp_res if isinstance(udp_res, list) else None - doh_ips = doh_res if isinstance(doh_res, list) else None + udp_ips = udp_res if isinstance(udp_res, list) else None + json_ips = json_res if isinstance(json_res, list) else None + wire_ips = wire_res if isinstance(wire_res, list) else None if udp_ips: udp_ips_collection[domain] = udp_ips - udp_str = ", ".join(udp_ips[:2]) if udp_ips else str(udp_res or "—") - doh_str = ", ".join(doh_ips[:2]) if doh_ips else str(doh_res or "—") + udp_str = ", ".join(udp_ips[:2]) if udp_ips else str(udp_res or "—") + json_str = ", ".join(json_ips[:2]) if json_ips else str(json_res or "—") + wire_str = ( + ", ".join(wire_ips[:2]) if wire_ips else str(wire_res or "—") + ) if has_wire else None - if doh_res == "BLOCKED": + if json_res == "BLOCKED": + doh_blocked_count += 1 + if has_wire and wire_res == "BLOCKED": doh_blocked_count += 1 - # Логика статуса: OK только если оба ответили и IP совпали - if doh_ips and udp_ips: - if set(doh_ips) == set(udp_ips): + # Доверенные IP = объединение ответов обоих DoH-методов + trusted = set() + if json_ips: + trusted.update(json_ips) + if wire_ips: + trusted.update(wire_ips) + + # Проверяем, вернул ли UDP адрес Fake-IP + udp_is_fakeip = False + if udp_ips: + for ip in udp_ips: + if get_fake_ip_type(ip) == "fakeip": + udp_is_fakeip = True + break + + # ── Определяем статус ───────────────────────────────────────────── + if trusted and udp_ips: + if set(udp_ips) & trusted: # есть пересечение → ОК row_status = "[green]√ DNS OK[/green]" + elif udp_is_fakeip: # это FakeIP от VPN + row_status = "[green]√ FAKE-IP[/green]" else: row_status = "[red]× DNS ПОДМЕНА[/red]" dns_intercept_count += 1 - elif doh_ips and not udp_ips: - # DoH работает, UDP нет — UDP перехвачен/заблокирован - intercept_labels = { + elif trusted and not udp_ips: + labels = { "TIMEOUT": "[red]× DNS ПЕРЕХВАТ[/red]", "NXDOMAIN": "[red]× FAKE NXDOMAIN[/red]", "EMPTY": "[red]× FAKE EMPTY[/red]", "UNAVAIL": "[yellow]× UDP недоступен[/yellow]", } - row_status = intercept_labels.get(str(udp_res), "[red]× UDP БЛОК[/red]") - if udp_res not in ("UNAVAIL",): + row_status = labels.get(str(udp_res), "[red]× UDP БЛОК[/red]") + if udp_res != "UNAVAIL": dns_intercept_count += 1 - elif udp_ips and not doh_ips: - # UDP работает, DoH нет — DoH заблокирован провайдером - reason = "заблокирован" if doh_res == "BLOCKED" else "недоступен" + elif udp_ips and not trusted: + is_blocked = ( + json_res == "BLOCKED" + or (has_wire and wire_res == "BLOCKED") + ) + reason = "заблокирован" if is_blocked else "недоступен" row_status = f"[red]× DoH {reason}[/red]" - dns_intercept_count += 1 + if not udp_is_fakeip: + dns_intercept_count += 1 else: - # Оба не ответили row_status = "[red]× Оба недоступны[/red]" dns_intercept_count += 1 - rows.append([domain, doh_str, udp_str, row_status]) + # Строка таблицы + if has_wire: + rows.append([domain, json_str, wire_str, udp_str, row_status]) + else: + rows.append([domain, json_str, udp_str, row_status]) # ── Заглушки ────────────────────────────────────────────────────────────── ip_count: dict = {} @@ -426,26 +608,403 @@ async def check_dns_integrity() -> Tuple[set, int]: # ── Таблица ─────────────────────────────────────────────────────────────── from rich.table import Table - dns_table = Table(show_header=True, header_style="bold magenta", border_style="dim") - dns_table.add_column("Домен", style="cyan") - dns_table.add_column(doh_label, style="dim") - dns_table.add_column(udp_label, style="dim") - dns_table.add_column("Статус") + t = Table(show_header=True, header_style="bold magenta", border_style="dim") + t.add_column("Домен", style="cyan") + t.add_column(json_label, style="dim") + if has_wire: + t.add_column(wire_label, style="dim") + t.add_column(udp_label, style="dim") + t.add_column("Статус") for row in rows: - dns_table.add_row(*row) - console.print(dns_table) + t.add_row(*row) + console.print(t) console.print() # ── Диагностика ─────────────────────────────────────────────────────────── if dns_intercept_count > 0: console.print("[bold red][!] Ваш интернет-провайдер перехватывает DNS-запросы[/bold red]") - console.print("Провайдер подменяет ответы UDP DNS на заглушки или ложные NXDOMAIN/EMPTY\n") console.print( - "[bold yellow]ВНИМАНИЕ: Это независимая проверка и она не использует ваши настроенные DNS![/bold yellow]\n" + "Провайдер подменяет ответы UDP DNS на заглушки " + "или ложные NXDOMAIN/EMPTY\n" + ) + console.print( + "[bold yellow]ВНИМАНИЕ: Это независимая проверка и она не использует " + "ваши настроенные DNS![/bold yellow]\n" "[bold yellow]Рекомендация:[/bold yellow] Настройте DoH на устройстве и роутере\n" "[bold green]Если DoH уже настроен — игнорируйте эту проверку.[/bold green]\n" ) if doh_blocked_count > 0: - console.print("[bold red][!] DoH заблокирован[/bold red] — провайдер блокирует зашифрованный DNS\n") + console.print( + "[bold red][!] DoH заблокирован[/bold red] — " + "провайдер блокирует зашифрованный DNS\n" + ) - return stub_ips, dns_intercept_count, not bool(doh_working) \ No newline at end of file + all_doh_unavailable = not bool(json_working) and not bool(wire_working) + return stub_ips, dns_intercept_count, all_doh_unavailable + + +# ── Тест 2: Проверка доступности DNS-серверов ──────────────────────────────── + +async def check_dns_availability() -> dict: + """ + Тест 2: Проверяет доступность DNS-серверов и замеряет время резолва. + + Использует только DoH Wire (RFC 8484) — метод _probe_doh_json оставлен + для возможного использования, но не вызывается. + + Вывод: + 1. Список эндпоинтов по именам + 2. Прогресс-строка (обновляется в процессе проверки) + 3. Сводная таблица: Провайдер | DoH avg | UDP avg + — прочерк = нет сервера этого типа + — TIMEOUT = сервер есть, но все запросы провалились + — Nмс [k/n] = среднее по успешным; k/n если не все ответили + 4. Возвращает dict с итогами для _format_summary + + Правило таймингов: каждый DoH-сервер получает свой изолированный + httpx.AsyncClient — честный замер без конкуренции за пул соединений. + """ + servers = getattr(config, "DNS_AVAILABILITY_SERVERS", []) + domains = getattr(config, "DNS_AVAILABILITY_DOMAINS", config.DNS_CHECK_DOMAINS) + timeout = getattr(config, "DNS_AVAILABILITY_TIMEOUT", config.DNS_CHECK_TIMEOUT) + + if not servers: + console.print("[yellow]DNS_AVAILABILITY_SERVERS не задан в config.yml — тест пропущен.[/yellow]") + return {"doh_ok": 0, "doh_total": 0, "udp_ok": 0, "udp_total": 0} + + proxy_url = getattr(config, "PROXY_URL", None) + + # ── Группируем серверы ──────────────────────────────────────────────────── + udp_servers = [(a, n) for a, n, k in servers if k == "udp"] + # doh_json_servers — оставлены для возможного использования, но не запускаются + wire_servers = [(a, n) for a, n, k in servers if k == "doh_wire"] + doh_servers = wire_servers # только Wire + + # ── Уникальные имена провайдеров в порядке появления ───────────────────── + all_names: list[str] = [] + seen: set[str] = set() + for _, n in (doh_servers + udp_servers): + if n not in seen: + all_names.append(n) + seen.add(n) + + doh_by_name: dict[str, list[str]] = {} + udp_by_name: dict[str, list[str]] = {} + for a, n in doh_servers: + doh_by_name.setdefault(n, []).append(a) + for a, n in udp_servers: + udp_by_name.setdefault(n, []).append(a) + + # ── Заголовок ───────────────────────────────────────────────────────────── + console.print( + f"\n[bold]Проверка доступности DNS-серверов[/bold] " + f"[dim]DoH: {len(doh_servers)} | UDP: {len(udp_servers)}" + f" | Доменов: {len(domains)} | timeout: {timeout}s[/dim]" + ) + console.print() + + # ── Таблица эндпоинтов ──────────────────────────────────────────────────── + from rich.table import Table as _Table + ep_table = _Table(show_header=True, header_style="bold magenta", + border_style="dim", box=None, pad_edge=False) + ep_table.add_column("Провайдер", style="bold cyan", no_wrap=True, min_width=16) + ep_table.add_column("DoH эндпоинты", style="dim", no_wrap=False) + ep_table.add_column("UDP", style="dim", no_wrap=True) + + for name in all_names: + doh_urls = doh_by_name.get(name, []) + udp_ips = udp_by_name.get(name, []) + doh_str = "\n".join(doh_urls) if doh_urls else "[dim]—[/dim]" + udp_str = ", ".join(udp_ips) if udp_ips else "[dim]—[/dim]" + ep_table.add_row(name, doh_str, udp_str) + + console.print(ep_table) + console.print() + + # ── Счётчик прогресса ───────────────────────────────────────────────────── + total_probes = len(doh_servers) + len(udp_servers) + done_count = 0 + progress_lock = asyncio.Lock() + + def _redraw_progress(): + # \r без \n — перезаписывает текущую строку + import sys + bar = f" Проверка серверов... {done_count}/{total_probes}" + sys.stderr.write(f"\r{bar} ") + sys.stderr.flush() + + async def _tick(): + nonlocal done_count + async with progress_lock: + done_count += 1 + _redraw_progress() + + # ── raw[(kind, addr, name)][domain] = elapsed_ms or None ───────────────── + # None = сервер есть, но запрос не прошёл (TIMEOUT/ERR/NXDOMAIN) + # Ключ отсутствует = этот тип у данного имени не определён + raw: dict[tuple, dict[str, Optional[int]]] = {} + + # ── UDP probe ───────────────────────────────────────────────────────────── + # Фиксы: + # 1. t_recv снимается прямо в datagram_received (до event-loop планировщика) + # 2. pending[tid] записывается ДО sendto — нет race condition + # 3. tid из инкрементного счётчика — нет коллизий os.urandom(2) + # 4. Семафор ограничивает параллельный залп (защита от packet drop) + async def _probe_udp(addr: str, name: str) -> None: + key = ("udp", addr, name) + loop = asyncio.get_running_loop() + udp_sem = asyncio.Semaphore(15) # Ограничиваем кол-во одновременных сокетов + + # Выносим класс протокола наружу, чтобы передавать ему конкретный future + class _SingleQueryProto(asyncio.DatagramProtocol): + def __init__(self, fut): + self.fut = fut + + def datagram_received(self, data, _addr): + # Идеально точное время прямо в момент получения пакета ОС + t_recv = time.perf_counter() + if not self.fut.done(): + self.fut.set_result((data, t_recv)) + + def error_received(self, exc): + pass + + def connection_lost(self, exc): + err = exc or ConnectionError("Socket closed") + if not self.fut.done(): + self.fut.set_exception(err) + + async def _wait(domain: str) -> tuple[str, Optional[float]]: + async with udp_sem: + q = _build_dns_query(domain) + tx_id = q[:2] + fut = loop.create_future() + transport = None + t0 = time.perf_counter() + + try: + # Создаем уникальный сокет для каждого домена + transport, _ = await loop.create_datagram_endpoint( + lambda: _SingleQueryProto(fut), remote_addr=(addr, 53) + ) + transport.sendto(q) + + data, t_recv = await asyncio.wait_for(fut, timeout=timeout) + elapsed_ms = round((t_recv - t0) * 1000, 1) + parsed = _parse_dns_response(data, tx_id) + + return domain, elapsed_ms if isinstance(parsed, list) else None + except Exception: + return domain, None + finally: + if transport: + transport.close() + + pairs = await asyncio.gather(*[_wait(d) for d in domains]) + raw[key] = dict(pairs) + await _tick() + + # ── DoH JSON probe ──────────────────────────────────────────────────────── + # Один клиент на сервер — один TLS handshake. + # Прогревочный запрос исключает handshake из замера боевых запросов. + async def _probe_doh_json(addr: str, name: str) -> None: + key = ("doh_json", addr, name) + cli_timeout = httpx.Timeout(timeout, connect=timeout, pool=2.0) + doh_sem = asyncio.Semaphore(20) + + async def _one(domain: str, client: httpx.AsyncClient) -> tuple[str, Optional[float]]: + async with doh_sem: + t0 = time.perf_counter() + try: + resp = await client.get(addr, params={"name": domain, "type": "A"}) + elapsed_ms = round((time.perf_counter() - t0) * 1000, 2) + if resp.status_code != 200: + return domain, None + data = resp.json() + if data.get("Status") == 3: + return domain, None + ips = [a["data"] for a in data.get("Answer", []) if a.get("type") == 1] + return domain, elapsed_ms if ips else None + except Exception: + return domain, None + + try: + async with httpx.AsyncClient( + timeout=cli_timeout, verify=False, + headers={"Accept": "application/dns-json", "User-Agent": config.USER_AGENT}, + proxy=proxy_url, trust_env=False, + ) as client: + # Прогрев — устанавливаем TLS-соединение до боевых замеров + try: + warmup_q = domains[0] if domains else "google.com" + await client.get(addr, params={"name": warmup_q, "type": "A"}) + except Exception: + pass + pairs = await asyncio.gather(*[_one(d, client) for d in domains]) + except Exception: + pairs = [(d, None) for d in domains] + raw[key] = dict(pairs) + await _tick() + + # ── DoH Wire probe ──────────────────────────────────────────────────────── + # Один клиент на сервер. http2=True → ALPN. + # Прогрев исключает TLS handshake из замера. + # Fallback POST→GET сбрасывает таймер — замеряется только успешный метод. + # ── DoH Wire probe ──────────────────────────────────────────────────────── + async def _probe_doh_wire(addr: str, name: str) -> None: + key = ("doh_wire", addr, name) + cli_timeout = httpx.Timeout(timeout, connect=timeout, pool=2.0) + doh_sem = asyncio.Semaphore(20) + + # Выносим всю логику во внутреннюю функцию, чтобы обернуть её в жесткий таймаут + async def _do_probe() -> list: + async def _one(domain: str, client: httpx.AsyncClient) -> tuple[str, Optional[float]]: + async with doh_sem: + query = _build_dns_query(domain) + tx_id = query[:2] + try: + t0 = time.perf_counter() + resp = await client.post( + addr, content=query, + headers={ + "Content-Type": "application/dns-message", + "Accept": "application/dns-message", + "User-Agent": config.USER_AGENT, + }, + ) + if resp.status_code != 200: + dns_b64 = base64.urlsafe_b64encode(query).rstrip(b'=').decode() + t0 = time.perf_counter() + resp = await client.get( + addr, params={"dns": dns_b64}, + headers={"Accept": "application/dns-message", + "User-Agent": config.USER_AGENT}, + ) + elapsed_ms = round((time.perf_counter() - t0) * 1000, 1) + if resp.status_code != 200: + return domain, None + result = _parse_dns_response(resp.content, tx_id) + return domain, elapsed_ms if isinstance(result, list) else None + except Exception: + return domain, None + + try: + async with httpx.AsyncClient( + timeout=cli_timeout, verify=False, + proxy=proxy_url, trust_env=False, http2=True, + ) as client: + # 1. Прогрев с Fail-Fast (Защита от двойного таймаута) + try: + warmup = _build_dns_query(domains[0] if domains else "google.com") + await client.post( + addr, content=warmup, + headers={"Content-Type": "application/dns-message", + "Accept": "application/dns-message", + "User-Agent": config.USER_AGENT}, + ) + except (httpx.ConnectError, httpx.ConnectTimeout, httpx.TimeoutException): + # Сервер физически недоступен. Нет смысла спамить боевыми запросами. + return [(d, None) for d in domains] + except Exception: + pass # Сервер жив, но вернул 400/500 — продолжаем (м.б. GET сработает) + + # 2. Боевые запросы (запустятся только если сервер жив) + return await asyncio.gather(*[_one(d, client) for d in domains]) + except Exception: + return [(d, None) for d in domains] + + # 3. Жесткая гильотина: ограничиваем всё время проверки провайдера + try: + # Даем времени чуть больше, чтобы httpx успел закрыться сам штатно + pairs = await asyncio.wait_for(_do_probe(), timeout=timeout + 2.0) + raw[key] = dict(pairs) + except (asyncio.TimeoutError, Exception): + raw[key] = {d: None for d in domains} + finally: + await _tick() + + # ── Запускаем: только Wire + UDP ───────────────────────────────────────── + _redraw_progress() + if udp_servers: + await asyncio.gather(*[_probe_udp(a, n) for a, n in udp_servers]) + + if wire_servers: + await asyncio.gather(*[_probe_doh_wire(a, n) for a, n in wire_servers]) + + # Завершаем прогресс-строку — переходим на новую строку + import sys + sys.stderr.write(f"\r Проверено серверов: {done_count}/{total_probes} \n") + sys.stderr.flush() + + # ── Агрегируем по имени провайдера ──────────────────────────────────────── + # Возвращает (avg_ms, ok, total) или ("TIMEOUT", 0, total) если сервер есть + # но все провалились, или None если нет серверов этого типа вообще. + def _aggregate(name: str, kind: str) -> Optional[tuple]: + entries = [ + (k_addr, domain_map) + for (k_kind, k_addr, k_name), domain_map in raw.items() + if k_kind == kind and k_name == name + ] + if not entries: + return None # нет серверов этого типа — прочерк + + # Берём лучший (минимальный avg) среди рабочих + best = None + for _addr, domain_map in entries: + vals = [v for v in domain_map.values() if v is not None] + total = len(domain_map) + ok = len(vals) + if ok > 0: + avg = round(sum(vals) / ok, 1) + if best is None or avg < best[0]: + best = (avg, ok, total) + + if best is not None: + return best # (avg_ms, ok, total) + + # Серверы есть, но все провалились → TIMEOUT + total = len(next(iter(entries))[1]) + return ("TIMEOUT", 0, total) + + # ── Формируем ячейку таблицы ────────────────────────────────────────────── + def _cell(agg: Optional[tuple], has_server: bool) -> str: + if not has_server or agg is None: + return "[dim]—[/dim]" + if agg[0] == "TIMEOUT": + return "[red]TIMEOUT[/red]" + avg, ok, total = agg + ms_str = f"[green]{avg}мс[/green]" + ratio = f" [dim]{ok}/{total}[/dim]" if ok < total else "" + return ms_str + ratio + + # ── Таблица ─────────────────────────────────────────────────────────────── + from rich.table import Table + t = Table(show_header=True, header_style="bold magenta", border_style="dim") + t.add_column("Провайдер", style="cyan", no_wrap=True, min_width=16) + t.add_column("DoH avg", justify="right", no_wrap=True, min_width=12) + t.add_column("UDP avg", justify="right", no_wrap=True, min_width=12) + + doh_ok_names = udp_ok_names = 0 + doh_total_names = len({n for _, n in doh_servers}) + udp_total_names = len({n for _, n in udp_servers}) + + for name in all_names: + has_doh = name in doh_by_name + has_udp = name in udp_by_name + doh_agg = _aggregate(name, "doh_wire") + udp_agg = _aggregate(name, "udp") + if doh_agg and doh_agg[0] != "TIMEOUT": + doh_ok_names += 1 + if udp_agg and udp_agg[0] != "TIMEOUT": + udp_ok_names += 1 + t.add_row(name, _cell(doh_agg, has_doh), _cell(udp_agg, has_udp)) + + console.print(t) + console.print() + + return { + "doh_ok": doh_ok_names, + "doh_total": doh_total_names, + "udp_ok": udp_ok_names, + "udp_total": udp_total_names, + } \ No newline at end of file diff --git a/dpi_detector.py b/dpi_detector.py index b944a84..8633848 100644 --- a/dpi_detector.py +++ b/dpi_detector.py @@ -22,7 +22,11 @@ from utils import config from cli.console import console from cli.ui import ask_test_selection, print_legend from cli.runners import run_domains_test, run_tcp_test, run_whitelist_sni_test, run_telegram_test -from core.dns_scanner import check_dns_integrity, collect_stub_ips_silently +from core.dns_scanner import ( + check_dns_integrity, + check_dns_availability, + collect_stub_ips_silently, +) from utils.files import load_domains, load_tcp_targets, load_whitelist_sni, get_base_dir CURRENT_VERSION = "3.1.0" @@ -37,16 +41,15 @@ def parse_arguments(): description="DPI Detector — Анализатор блокировок трафика", formatter_class=argparse.RawTextHelpFormatter ) - - parser.add_argument("-t", "--tests", type=str, help="Список тестов для запуска (например: 123 или 24). Пропускает стартовое меню.") - parser.add_argument("-p", "--proxy", type=str, help="URL прокси (напр: socks5://127.0.0.1:1080) (PROXY_URL)") + parser.add_argument("-t", "--tests", type=str, help="Список тестов для запуска (например: 123 или 24). Пропускает стартовое меню.") + parser.add_argument("-p", "--proxy", type=str, help="URL прокси (напр: socks5://127.0.0.1:1080) (PROXY_URL)") parser.add_argument("-c", "--concurrency", type=int, help="Максимальное количество параллельных запросов (MAX_CONCURRENT)") - parser.add_argument("-d", "--domain", type=str, action="append", help="Проверить конкретный домен(ы), игнорируя domains.txt.\nМожно указывать несколько раз: -d vk.com -d ya.ru") - parser.add_argument("-o", "--output", type=str, help="Путь для автосохранения отчета (например: report.txt).") - parser.add_argument("--batch", action="store_true", help="Отключает паузы и вопросы") - + parser.add_argument("-d", "--domain", type=str, action="append", help="Проверить конкретный домен(ы), игнорируя domains.txt.\nМожно указывать несколько раз: -d vk.com -d ya.ru") + parser.add_argument("-o", "--output", type=str, help="Путь для автосохранения отчета (например: report.txt).") + parser.add_argument("--batch", action="store_true", help="Отключает паузы и вопросы") return parser.parse_args() + async def _fetch_latest_version() -> Optional[str]: """Запрашивает последний тег с GitHub API. Возвращает строку версии или None.""" url = f"https://api.github.com/repos/{GITHUB_REPO}/releases/latest" @@ -63,12 +66,11 @@ async def _fetch_latest_version() -> Optional[str]: def fast_exit_handler(sig, frame): - """Принудительный выход по первому Ctrl+C.""" - # Используем системный принт, т.к. rich может быть заблокирован sys.stdout.write("\n\033[91m\033[1mПрервано пользователем.\033[0m\n") sys.stdout.flush() os._exit(0) + async def _readline_cancelable() -> str: loop = asyncio.get_running_loop() try: @@ -78,13 +80,12 @@ async def _readline_cancelable() -> str: except asyncio.CancelledError: raise KeyboardInterrupt + def _flush_stdin() -> None: - """Сбрасывает накопившиеся данные в stdin чтобы буферные Enter не перезапускали тест.""" try: import termios termios.tcflush(sys.stdin, termios.TCIFLUSH) except Exception: - # Windows и другие окружения — лучшее что можно сделать без блокировки try: import msvcrt while msvcrt.kbhit(): @@ -94,43 +95,70 @@ def _flush_stdin() -> None: def _format_summary( - run_dns: bool, run_domains: bool, run_tcp: bool, run_telegram: bool, - dns_intercept: int, domain_stats, tcp_stats, + run_dns: bool, + run_dns_avail: bool, + run_domains: bool, + run_tcp: bool, + run_telegram: bool, + dns_intercept: int, + domain_stats, + tcp_stats, telegram_stats=None, - doh_unavailable=False, + doh_unavailable: bool = False, + dns_avail_stats=None, ) -> List[str]: lines = [] + # ── Тест 1: подмена DNS ─────────────────────────────────────────────────── if run_dns: total_dns = len(config.DNS_CHECK_DOMAINS) ok_dns = total_dns - dns_intercept if doh_unavailable: lines.append( - f"[bold]DNS[/bold] " + f"[bold]DNS подмена[/bold] " f"[red]× DoH заблокирован провайдером[/red]" ) elif dns_intercept == 0: lines.append( - f"[bold]DNS[/bold] " + f"[bold]DNS подмена[/bold] " f"[green]√ {ok_dns}/{total_dns} не подменяется[/green]" ) elif dns_intercept == total_dns: lines.append( - f"[bold]DNS[/bold] " + f"[bold]DNS подмена[/bold] " f"[red]× {dns_intercept}/{total_dns} подменяется провайдером[/red]" ) else: lines.append( - f"[bold]DNS[/bold] " + f"[bold]DNS подмена[/bold] " f"[green]√ {ok_dns}/{total_dns} OK[/green]" - f" [red]× {dns_intercept}/{total_dns} подменяется провайдером[/red]" + f" [red]× {dns_intercept}/{total_dns} подменяется[/red]" ) + # ── Тест 2: доступность DNS ─────────────────────────────────────────────── + if run_dns_avail: + if dns_avail_stats: + d = dns_avail_stats + doh_color = "green" if d["doh_ok"] == d["doh_total"] else ( + "red" if d["doh_ok"] == 0 else "yellow" + ) + udp_color = "green" if d["udp_ok"] == d["udp_total"] else ( + "red" if d["udp_ok"] == 0 else "yellow" + ) + lines.append( + f"[bold]DNS доступность[/bold] " + f"[{doh_color}]{d['doh_ok']}/{d['doh_total']} DoH[/{doh_color}]" + f" [{udp_color}]{d['udp_ok']}/{d['udp_total']} UDP[/{udp_color}]" + ) + else: + lines.append("[bold]DNS доступность[/bold] [dim]—[/dim]") + + # ── Тест 3: домены ──────────────────────────────────────────────────────── if domain_stats: d = domain_stats pct = int(d["ok"] / d["total"] * 100) if d["total"] else 0 line = ( - f"[bold]Домены[/bold] " + f"[bold]Домены[/bold] " f"[green]√ {d['ok']}/{d['total']} OK[/green]" + (f" [red]× {d['blocked']} блок.[/red]" if d['blocked'] else "") + (f" [yellow]⏱ {d['timeout']} таймаут[/yellow]" if d['timeout'] else "") @@ -138,11 +166,12 @@ def _format_summary( ) lines.append(line) + # ── Тест 4: TCP 16-20KB ─────────────────────────────────────────────────── if tcp_stats: t = tcp_stats pct = int(t["ok"] / t["total"] * 100) if t["total"] else 0 line = ( - f"[bold]TCP 16-20KB[/bold] " + f"[bold]TCP 16-20KB[/bold] " f"[green]√ {t['ok']}/{t['total']} OK[/green]" + (f" [red]× {t['blocked']} блок.[/red]" if t['blocked'] else "") + (f" [yellow]≈ {t['mixed']} смеш.[/yellow]" if t['mixed'] else "") @@ -150,18 +179,18 @@ def _format_summary( ) lines.append(line) + # ── Тест 6: Telegram ────────────────────────────────────────────────────── if run_telegram and telegram_stats: t = telegram_stats dl_data = t.get("download", {}) ul_data = t.get("upload", {}) dc_r, dc_t = t.get("dc_reachable", 0), t.get("dc_total", 0) - def format_tg_line(label, data, speed_key, size_key): - st = data.get("status") - avg = data.get(speed_key, 0) + def _fmt_tg(label, data, speed_key, size_key): + st = data.get("status") + avg = data.get(speed_key, 0) size = data.get(size_key, 0) drop = data.get("drop_at_sec") - if st == "ok": raw_st, color = "ОК", "green" elif st == "stalled": @@ -172,18 +201,13 @@ def _format_summary( raw_st, color = "НЕДОСТУПНО", "red" else: raw_st, color = "ОШИБКА", "red" - - status_text = f"[{color}]{raw_st:<16}[/{color}]" - metrics = f"ср. {_tg_fmt(avg)}, {_tg_size(size)}" if drop: metrics += f", обрыв на {drop}с" + return f"[bold]{label:<13}[/bold] [{color}]{raw_st:<16}[/{color}] {metrics}" - return f"[bold]{label:<13}[/bold] {status_text} {metrics}" - - lines.append(format_tg_line("TG Скачивание", dl_data, "avg_bps", "bytes_total")) - lines.append(format_tg_line("TG Загрузка", ul_data, "bps", "sent")) - + lines.append(_fmt_tg("TG Скачивание", dl_data, "avg_bps", "bytes_total")) + lines.append(_fmt_tg("TG Загрузка", ul_data, "bps", "sent")) dc_color = "green" if dc_r == dc_t else ("red" if dc_r == 0 else "yellow") lines.append(f"[bold]{'TG Датацентры':<13}[/bold] [{dc_color}]ОК {dc_r}/{dc_t}[/{dc_color}]") @@ -191,17 +215,14 @@ def _format_summary( def is_newer(latest: str, current: str) -> bool: - """Сравнивает версии. Возвращает True, если на GitHub версия выше текущей.""" try: def parse(v): return tuple(int(x) for x in v.replace('v', '').split('.') if x.isdigit()) - - l_parts = parse(latest) - c_parts = parse(current) - return l_parts > c_parts + return parse(latest) > parse(current) except Exception: return False + async def main(): args = parse_arguments() @@ -231,13 +252,16 @@ async def main(): else: selection = await ask_test_selection() - run_dns = "1" in selection - run_domains = "2" in selection - run_tcp = "3" in selection - run_wl_sni = "4" in selection - run_telegram = "5" in selection - run_legend = "6" in selection - only_legend = run_legend and not any([run_dns, run_domains, run_tcp, run_wl_sni, run_telegram]) + run_dns = "1" in selection # Тест 1: подмена DNS + run_dns_avail = "2" in selection # Тест 2: доступность DNS-серверов + run_domains = "3" in selection # Тест 3: доступность доменов (TLS/HTTP) + run_tcp = "4" in selection # Тест 4: TCP 16-20KB блокировка + run_wl_sni = "5" in selection # Тест 5: белые SNI для ASN + run_telegram = "6" in selection # Тест 6: Telegram + run_legend = "7" in selection # Тест 7: Легенда + only_legend = run_legend and not any([ + run_dns, run_dns_avail, run_domains, run_tcp, run_wl_sni, run_telegram + ]) if only_legend: print_legend() @@ -268,7 +292,7 @@ async def main(): semaphore = asyncio.Semaphore(config.MAX_CONCURRENT) while True: - # ── DNS ─────────────────────────────────────────────────────────────── + # ── Тест 1: подмена DNS ─────────────────────────────────────────────── stub_ips: set = set() dns_intercept_count = 0 doh_unavailable = False @@ -284,36 +308,47 @@ async def main(): except asyncio.TimeoutError: stub_ips = set() - # ── Домены ──────────────────────────────────────────────────────────── + # ── Тест 2: доступность DNS-серверов ───────────────────────────────── + dns_avail_stats = None + if run_dns_avail: + dns_avail_stats = await check_dns_availability() + + # ── Тест 3: домены ──────────────────────────────────────────────────── domain_stats = None if run_domains: domain_stats = await run_domains_test(semaphore, stub_ips, DOMAINS) - # ── TCP 16-20KB ─────────────────────────────────────────────────────── + # ── Тест 4: TCP 16-20KB ─────────────────────────────────────────────── tcp_stats = None if run_tcp: tcp_stats = await run_tcp_test(semaphore, TCP_16_20_ITEMS) - # ── Белые SNI ───────────────────────────────────────────────────────── + # ── Тест 5: белые SNI ───────────────────────────────────────────────── if run_wl_sni: if WHITELIST_SNI: await run_whitelist_sni_test(semaphore, TCP_16_20_ITEMS, WHITELIST_SNI) else: - console.print("[yellow]Файл whitelist_sni.txt пуст или не найден — тест 4 пропущен.[/yellow]") + console.print("[yellow]Файл whitelist_sni.txt пуст или не найден — тест 5 пропущен.[/yellow]") - - # ── Telegram ────────────────────────────────────────────────────────── + # ── Тест 6: Telegram ────────────────────────────────────────────────── telegram_stats = None if run_telegram: telegram_stats = await run_telegram_test(semaphore) + # ── Итоговая сводка ─────────────────────────────────────────────────── - active_tests = sum([run_dns, run_domains, run_tcp, run_wl_sni, run_telegram]) console.print() summary_lines = _format_summary( - run_dns, run_domains, run_tcp, run_telegram, - dns_intercept_count, domain_stats, tcp_stats, + run_dns=run_dns, + run_dns_avail=run_dns_avail, + run_domains=run_domains, + run_tcp=run_tcp, + run_telegram=run_telegram, + dns_intercept=dns_intercept_count, + domain_stats=domain_stats, + tcp_stats=tcp_stats, telegram_stats=telegram_stats, doh_unavailable=doh_unavailable, + dns_avail_stats=dns_avail_stats, ) console.print(Panel( "\n".join(summary_lines), @@ -323,10 +358,9 @@ async def main(): expand=False, )) - console.print("\n[bold green]Проверка завершена.[/bold green]") - # ── Уведомление о новой версии ──────────────────────── + # ── Уведомление о новой версии ──────────────────────────────────────── if not latest_version_notified: try: latest = await asyncio.wait_for(asyncio.shield(version_task), timeout=0.1) @@ -352,7 +386,7 @@ async def main(): "\nНажмите [bold green]Enter[/bold green] чтобы повторить проверку " "или [bold red]Ctrl+C[/bold red] для выхода" ) - _flush_stdin() # сбрасываем накопившиеся Enter чтобы не было авто-перезапуска + _flush_stdin() try: await _readline_cancelable() except KeyboardInterrupt: diff --git a/utils/error_classifier.py b/utils/error_classifier.py index 10138c6..ebf5004 100644 --- a/utils/error_classifier.py +++ b/utils/error_classifier.py @@ -134,7 +134,7 @@ def classify_ssl_error(error: ssl.SSLError, bytes_read: int) -> Tuple[str, str, return ("[bold red]TLS MITM[/bold red]", "Cipher mismatch", bytes_read) if "version" in msg or "protocol version" in msg: - return ("[bold red]BLOCK[/bold red]", "TLS version block", bytes_read) + return ("[bold red]NO TLS1.3[/bold red]", "Server has no TLS 1.3", bytes_read) if isinstance(error, ssl.SSLZeroReturnError): # Close notify в неожиданный момент — признак DPI или блокировки