dpi-detector/utils/error_classifier.py
Kirill Minovsky 8463fa078d feat(dns): Added DNS availability test and updated UI
- Implemented `check_dns_availability` to measure DNS server latency
- Added `DNS_AVAILABILITY_SERVERS` and `DNS_AVAILABILITY_DOMAINS` to config
- Updated `_format_summary` to include DNS availability statistics
- Integrated DNS availability results into the CLI menu (Test 2)
- Added DoH wire-format (RFC 8484) support for improved probe reliability
- Enhanced `_parse_dns_response` for better error handling and parsing
- Refactored `run_domains_test` to better classify ISP, local, and Fake-IP
2026-04-24 15:09:12 +03:00

255 lines
No EOL
12 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import ssl
import math
import errno
import socket
from typing import Tuple, Optional
import httpx
from utils import config
# ── Утилиты для обхода цепочки исключений ────────────────────────────────────
def find_cause(exc: Exception, target_type: type, max_depth: int = 10) -> Optional[Exception]:
"""Возвращает первое исключение заданного типа из цепочки, или None."""
current = exc
for _ in range(max_depth):
if isinstance(current, target_type):
return current
nxt = current.__cause__ or current.__context__
if nxt is None:
break
current = nxt
return None
def get_errno_from_chain(exc: Exception, max_depth: int = 10) -> Optional[int]:
current = exc
for _ in range(max_depth):
if isinstance(current, OSError) and current.errno is not None:
return current.errno
nxt = current.__cause__ or current.__context__
if nxt is None:
break
current = nxt
return None
def collect_error_text(exc: Exception, max_depth: int = 10) -> str:
parts = []
current = exc
for _ in range(max_depth):
parts.append(str(current).lower())
nxt = current.__cause__ or current.__context__
if nxt is None:
break
current = nxt
return " | ".join(parts)
# ── Форматирование деталей ────────────────────────────────────────────────────
def clean_detail(detail: str) -> str:
import re
if not detail or detail in ("OK", "Error"):
return ""
detail = detail.replace("The operation did not complete", "TLS Aborted")
detail = re.sub(r"\s*\(_*\s*$", "", detail)
detail = re.sub(r"\s+", " ", detail).strip()
detail = detail.replace("Err None: ", "").replace("Conn failed: ", "")
if re.match(r"^HTTP [23]\d\d$", detail):
return ""
return detail.strip()
# ── Классификаторы ошибок ─────────────────────────────────────────────────────
def classify_ssl_error(error: ssl.SSLError, bytes_read: int) -> Tuple[str, str, int]:
msg = str(error).lower()
# DPI: обрыв handshake / передачи данных
dpi_interruption = [
"eof", "unexpected eof", "eof occurred in violation",
"operation did not complete", "bad record mac",
"decryption failed", "decrypt"
]
if any(m in msg for m in dpi_interruption):
detail = "Обрыв при передаче" if bytes_read > 0 else "Handshake aborted"
return ("[bold red]TLS DPI[/bold red]", detail, bytes_read)
# DPI: манипуляции с handshake
if any(x in msg for x in [
"illegal parameter", "decode error", "decoding error",
"record overflow", "oversized",
"record layer failure", "record_layer_failure",
"bad key share", "bad_key_share"
]):
if "bad key share" in msg or "bad_key_share" in msg:
return ("[yellow]SSL ERR[/yellow]", "Bad key share", bytes_read)
if "record layer failure" in msg or "record_layer_failure" in msg:
return ("[yellow]SSL ERR[/yellow]", "Record layer fail", bytes_read)
return ("[bold red]TLS DPI[/bold red]", "Подмена handshake", bytes_read)
if "unrecognized name" in msg or "unrecognized_name" in msg:
return ("[bold red]TLS DPI[/bold red]", "SNI блок", bytes_read)
if "alert handshake" in msg or "sslv3_alert_handshake" in msg:
return ("[bold red]TLS DPI[/bold red]", "Handshake alert", bytes_read)
if "handshake" in msg:
if "unexpected" in msg:
return ("[bold red]TLS DPI[/bold red]", "HS подмена", bytes_read)
elif "failure" in msg:
return ("[bold red]TLS DPI[/bold red]", "HS failure", bytes_read)
if "wrong version number" in msg:
return ("[bold red]TLS DPI[/bold red]", "Non-TLS ответ", bytes_read)
# MITM: проблемы с сертификатом
if isinstance(error, ssl.SSLCertVerificationError):
verify_code = getattr(error, 'verify_code', None)
if verify_code == 10 or "expired" in msg:
return ("[bold red]TLS MITM[/bold red]", "Cert expired", bytes_read)
elif verify_code in (18, 19) or "self-signed" in msg or "self signed" in msg:
return ("[bold red]TLS MITM[/bold red]", "Self-signed cert", bytes_read)
elif verify_code == 20 or "unknown ca" in msg:
return ("[bold red]TLS MITM[/bold red]", "Unknown CA", bytes_read)
elif verify_code == 62 or "hostname mismatch" in msg:
return ("[bold red]TLS MITM[/bold red]", "Hostname mismatch", bytes_read)
else:
return ("[bold red]TLS MITM[/bold red]", "Cert fail", bytes_read)
if "certificate" in msg:
if "verify failed" in msg or "unknown ca" in msg:
return ("[bold red]TLS MITM[/bold red]", "Unknown CA", bytes_read)
elif "hostname mismatch" in msg or "name mismatch" in msg:
return ("[bold red]TLS MITM[/bold red]", "Cert mismatch", bytes_read)
elif "expired" in msg:
return ("[bold red]TLS MITM[/bold red]", "Cert expired", bytes_read)
else:
return ("[red]SSL CERT[/red]", "Cert error", bytes_read)
if "cipher" in msg or "no shared cipher" in msg:
return ("[bold red]TLS MITM[/bold red]", "Cipher mismatch", bytes_read)
if "version" in msg or "protocol version" in msg:
return ("[bold red]NO TLS1.3[/bold red]", "Server has no TLS 1.3", bytes_read)
if isinstance(error, ssl.SSLZeroReturnError):
# Close notify в неожиданный момент — признак DPI или блокировки
return ("[bold red]TLS DPI[/bold red]", "Неожиданный close notify", bytes_read)
if "internal error" in msg:
return ("[red]SSL ERR[/red]", "Internal error", bytes_read)
if "handshake" in msg:
return ("[red]TLS ERR[/red]", "Handshake error", bytes_read)
return ("[red]SSL ERR[/red]", clean_detail(str(error)[:40]), bytes_read)
def classify_connect_error(error: Exception, bytes_read: int) -> Tuple[str, str, int]:
"""Единая классификация ошибок установки соединения (L3/L4/DNS)."""
full_text = collect_error_text(error)
err_errno = get_errno_from_chain(error)
if isinstance(error, httpx.PoolTimeout) or "pool timeout" in full_text:
return ("[magenta]POOL TIMEOUT[/magenta]", "Нехватка сокетов, снизьте параллелизм", bytes_read)
if isinstance(error, httpx.ConnectTimeout) or "connect timeout" in full_text:
return ("[red]TIMEOUT[/red]", "TCP timeout (SYN Drop)", bytes_read)
# DNS
gai = find_cause(error, socket.gaierror)
if gai is not None:
gai_errno = getattr(gai, 'errno', None)
if gai_errno in (socket.EAI_NONAME, 11001):
return ("[yellow]DNS FAIL[/yellow]", "Домен не найден", bytes_read)
elif gai_errno in (getattr(socket, 'EAI_AGAIN', -3), 11002):
if "connection" in full_text and any(x in full_text for x in ("reset", "refused", "closed")):
return ("[yellow]DNS FAIL[/yellow]", "DNS ошибка/дроп", bytes_read)
return ("[yellow]DNS FAIL[/yellow]", "DNS таймаут/недоступен", bytes_read)
else:
return ("[yellow]DNS FAIL[/yellow]", "Ошибка DNS", bytes_read)
if any(x in full_text for x in [
"getaddrinfo failed", "name resolution", "11001", "11002",
"name or service not known", "nodename nor servname"
]):
return ("[yellow]DNS FAIL[/yellow]", "Ошибка DNS", bytes_read)
# TLS ALERT внутри ConnectError (DPI)
if "sslv3_alert" in full_text or "ssl alert" in full_text or ("alert" in full_text and "handshake" in full_text):
if "handshake_failure" in full_text or "handshake failure" in full_text:
return ("[bold red]TLS DPI[/bold red]", "Handshake alert", bytes_read)
elif "unrecognized_name" in full_text:
return ("[bold red]TLS DPI[/bold red]", "SNI alert", bytes_read)
elif "protocol_version" in full_text or "alert_protocol_version" in full_text:
return ("[bold red]TLS BLOCK[/bold red]", "Version alert", bytes_read)
else:
return ("[bold red]TLS DPI[/bold red]", "TLS alert", bytes_read)
ssl_err = find_cause(error, ssl.SSLError)
if ssl_err is not None:
return classify_ssl_error(ssl_err, bytes_read)
# TCP ОШИБКИ (L4)
if find_cause(error, ConnectionRefusedError) is not None or err_errno in (errno.ECONNREFUSED, config.WSAECONNREFUSED) or "refused" in full_text:
return ("[bold red]REFUSED[/bold red]", "TCP соединение отклонено", bytes_read)
if find_cause(error, ConnectionResetError) is not None or err_errno in (errno.ECONNRESET, config.WSAECONNRESET) or "connection reset" in full_text:
return ("[bold red]RST[/bold red]", "TCP соединение сброшено", bytes_read)
if find_cause(error, ConnectionAbortedError) is not None or err_errno in (getattr(errno, 'ECONNABORTED', 103), config.WSAECONNABORTED) or "connection aborted" in full_text:
return ("[bold red]ABORT[/bold red]", "TCP соединение прервано", bytes_read)
if find_cause(error, TimeoutError) is not None or err_errno in (errno.ETIMEDOUT, config.WSAETIMEDOUT) or "timed out" in full_text:
return ("[red]TIMEOUT[/red]", "TCP timeout (SYN Drop)", bytes_read)
if err_errno in (errno.ENETUNREACH, config.WSAENETUNREACH) or "network is unreachable" in full_text:
return ("[red]NET UNREACH[/red]", "Нет маршрута (ICMP unreach)", bytes_read)
if err_errno in (errno.EHOSTUNREACH, config.WSAEHOSTUNREACH) or "no route to host" in full_text:
return ("[red]HOST UNREACH[/red]", "Нет маршрута до хоста", bytes_read)
if "all connection attempts failed" in full_text:
return ("[bold red]REFUSED[/bold red]", "TCP соединение отклонено", bytes_read)
return ("[red]CONN ERR[/red]", clean_detail(str(error)[:40]), bytes_read)
def classify_read_error(error: Exception, bytes_read: int) -> Tuple[str, str, int]:
full_text = collect_error_text(error)
err_errno = get_errno_from_chain(error)
if find_cause(error, ConnectionResetError) is not None \
or err_errno in (errno.ECONNRESET, config.WSAECONNRESET) \
or "connection reset" in full_text:
return ("[bold red]RST[/bold red]", "TCP соединение сброшено", bytes_read)
if find_cause(error, ConnectionAbortedError) is not None \
or err_errno in (getattr(errno, 'ECONNABORTED', 103), config.WSAECONNABORTED) \
or "connection aborted" in full_text:
return ("[bold red]ABORT[/bold red]", "TCP соединение прервано", bytes_read)
if find_cause(error, BrokenPipeError) is not None \
or err_errno == errno.EPIPE \
or "broken pipe" in full_text:
return ("[bold red]RST[/bold red]", "Broken pipe", bytes_read)
if isinstance(error, httpx.RemoteProtocolError) or "remoteprotocolerror" in full_text:
if "peer closed" in full_text or "connection closed" in full_text:
return ("[bold red]ABORT[/bold red]", "Closed early", bytes_read)
elif "incomplete" in full_text:
return ("[bold red]ABORT[/bold red]", "Incomplete response", bytes_read)
else:
return ("[red]PROTO ERR[/red]", "Protocol error", bytes_read)
if isinstance(error, httpx.ReadError):
ssl_err = find_cause(error, ssl.SSLError)
if ssl_err is not None:
return classify_ssl_error(ssl_err, bytes_read)
return ("[red]READ ERR[/red]", "Read error", bytes_read)
return ("[red]READ ERR[/red]", f"{type(error).__name__}", bytes_read)