feat(scanner): Enhance block and redirect detection logic

- Introduce `stub_ips` for early ISP page detection via DNS.
- Refactor redirect classification.
- Differentiate between same-domain/subdomain and external redirects.
- Remove `config.BLOCK_MARKERS` and `BODY_BLOCK_MARKERS` usage.
- Update `cli/ui` legend for new ISP page and redirect statuses.
- Integrate `stub_ips` into TLS and HTTP scanning workers.
- Simplify HTTP/TLS status reporting for redirects and connections.
This commit is contained in:
Kirill Minovsky 2026-03-13 23:37:24 +03:00
parent 769413637a
commit fb5e5f7dbf
4 changed files with 60 additions and 105 deletions

View file

@ -66,12 +66,16 @@ async def _tls_worker(
client: httpx.AsyncClient,
tls_key: str,
semaphore: asyncio.Semaphore,
stub_ips: set = None,
) -> None:
"""Фаза TLS: пишет результат в entry in-place."""
if entry["dns_fake"] is not False:
return
try:
result = await check_domain_tls(entry["domain"], client, semaphore)
result = await check_domain_tls(
entry["domain"], client, semaphore,
stub_ips=stub_ips, resolved_ip=entry.get("resolved_ipv4")
)
except Exception:
result = ("[dim]ERR[/dim]", "Unknown error", 0.0)
entry[tls_key] = result
@ -81,13 +85,14 @@ async def _http_worker(
entry: dict,
client: httpx.AsyncClient,
semaphore: asyncio.Semaphore,
stub_ips: set = None,
) -> None:
"""Фаза HTTP: пишет результат в entry in-place."""
if entry["dns_fake"] is not False:
return
async with semaphore:
try:
result = await check_http_injection(entry["domain"], client, semaphore)
result = await check_http_injection(entry["domain"], client, semaphore, stub_ips=stub_ips)
except Exception:
result = ("[dim]ERR[/dim]", "Unknown error")
entry["http_res"] = result
@ -164,15 +169,15 @@ async def run_domains_test(semaphore: asyncio.Semaphore, stub_ips: set, domains:
try:
await _run_phase_with_progress(
[_tls_worker(e, client_t13, "t13v4_res", semaphore) for e in entries],
[_tls_worker(e, client_t13, "t13v4_res", semaphore, stub_ips) for e in entries],
"Фаза 1/3: TLS 1.3..."
)
await _run_phase_with_progress(
[_tls_worker(e, client_t12, "t12_res", semaphore) for e in entries],
[_tls_worker(e, client_t12, "t12_res", semaphore, stub_ips) for e in entries],
"Фаза 2/3: TLS 1.2..."
)
await _run_phase_with_progress(
[_http_worker(e, client_http, semaphore) for e in entries],
[_http_worker(e, client_http, semaphore, stub_ips) for e in entries],
"Фаза 3/3: HTTP..."
)
finally:

View file

@ -79,11 +79,12 @@ def print_legend() -> None:
("TLS MITM", "Man-in-the-Middle: подмена/проблемы с сертификатом"),
("TLS BLOCK", "Блокировка версии TLS или протокола"),
("SSL ERR", "SSL/TLS ошибка (часто проблемы совместимости CDN/сервера)"),
("ISP PAGE", "Редирект на страницу провайдера или блок-страница"),
("ISP PAGE", "Запрос идёт на IP-заглушку провайдера (DNS подмена)"),
("BLOCKED", "HTTP 451 (Недоступно по юридическим причинам)"),
("REDIR", "[green]Зелёный[/green] = редирект на тот же домен/поддомен [red]Красный[/red] = редирект на чужой домен"),
("TIMEOUT", "Таймаут соединения или чтения"),
("DNS FAIL", "Не удалось разрешить доменное имя"),
("OK / REDIR", "Сайт доступен (может быть редирект)"),
("OK", "Сайт доступен"),
]
for term, desc in legend:
console.print(f"[dim]• [cyan]{term:<12}[/cyan] = {desc}[/dim]")

View file

@ -29,37 +29,8 @@ FAT_DEFAULT_SNI: "example.com"
FAT_CONNECT_TIMEOUT: 8.0
FAT_READ_TIMEOUT: 12.0
# === Отображение и анализ ===
BODY_INSPECT_LIMIT: 8192
USER_AGENT: "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/133.0.0.0 Safari/537.36"
# === Маркеры блокировок HTTP (в URL и заголовках) ===
BLOCK_MARKERS:
- "lawfilter"
- "warning.rt.ru"
- "blocked"
- "access-denied"
- "eais"
- "zapret-info"
- "rkn.gov.ru"
- "mvd.ru"
# === Маркеры блокировок в теле страницы (HTML) ===
BODY_BLOCK_MARKERS:
- "blocked"
- "заблокирован"
- "запрещён"
- "запрещен"
- "ограничен"
- "единый реестр"
- "роскомнадзор"
- "rkn.gov.ru"
- "nap.gov.ru"
- "eais.rkn.gov.ru"
- "warning.rt.ru"
- "blocklist"
- "решению суда"
# === Windows-специфичные errno коды ===
WSAECONNRESET: 10054
WSAECONNREFUSED: 10061

View file

@ -67,15 +67,24 @@ async def _check_tls_single(
client: httpx.AsyncClient,
semaphore: asyncio.Semaphore,
resolved_ip: str = None,
stub_ips: set = None,
) -> Tuple[str, str, int, float]:
"""
Одна попытка TLS-проверки. Клиент передаётся снаружи и переиспользуется.
resolved_ip: если передан, подключаемся к нему напрямую (нужно для IPv6 системный
резолвер может вернуть IPv4 даже если домен имеет AAAA запись).
stub_ips: если передан, редирект на IP из этого набора помечается как ISP PAGE.
resolved_ip: если передан, подключаемся к нему напрямую.
Логика редиректов:
- Редирект на тот же домен или поддомен зелёный REDIR (ОК)
- Редирект на чужой домен красный REDIR (подозрительно)
- Если resolved_ip входит в stub_ips ISP PAGE
"""
bytes_read = 0
url = f"https://{domain}"
if stub_ips and resolved_ip and resolved_ip in stub_ips:
return ("[bold red]ISP PAGE[/bold red]", f"DNS заглушка {resolved_ip}", 0, 0.0)
async with semaphore:
start = time.time()
@ -97,66 +106,31 @@ async def _check_tls_single(
await response.aclose()
return ("[bold red]BLOCKED[/bold red]", "HTTP 451", bytes_read, time.time() - start)
if location:
location_lower = location.lower()
if any(m in location_lower for m in config.BLOCK_MARKERS):
await response.aclose()
return ("[bold red]ISP PAGE[/bold red]", "Редирект на блок-страницу", bytes_read, time.time() - start)
if location and 300 <= status_code < 400:
await response.aclose()
elapsed = time.time() - start
try:
parsed_loc = urlparse(
location if location.startswith('http') else f'https://{location}'
)
loc_domain = parsed_loc.netloc.lower()
clean_domain = domain.lower().replace('www.', '')
clean_loc = loc_domain.replace('www.', '')
loc_domain = parsed_loc.netloc.lower().split(':')[0]
clean_domain = domain.lower()
norm_loc = loc_domain.removeprefix('www.')
norm_dom = clean_domain.removeprefix('www.')
if loc_domain and clean_loc != clean_domain \
and not clean_loc.endswith('.' + clean_domain):
cdn_patterns = [
'cloudflare', 'akamai', 'fastly', 'cdn', 'cloudfront',
'auth', 'login', 'accounts', 'id.', 'sso.',
]
if not any(p in clean_loc for p in cdn_patterns):
await response.aclose()
return (
"[bold red]ISP PAGE[/bold red]",
f"{loc_domain[:20]}",
bytes_read,
time.time() - start,
)
if norm_loc == norm_dom or norm_loc.endswith('.' + norm_dom):
return ("[green]REDIR[/green]", f"{loc_domain[:30]}", bytes_read, elapsed)
else:
return ("[bold red]REDIR[/bold red]", f"{loc_domain[:30]}", bytes_read, elapsed)
except Exception:
pass
return ("[bold red]REDIR[/bold red]", f"{location[:30]}", bytes_read, elapsed)
if 300 <= status_code < 400:
await response.aclose()
return ("[green]OK[/green]", "", bytes_read, time.time() - start)
elapsed = time.time() - start
if status_code == 200:
content_length = response.headers.get("content-length", "")
try:
content_len = int(content_length) if content_length else 0
except Exception:
content_len = 0
if 0 < content_len < config.BODY_INSPECT_LIMIT:
body = b""
try:
async for chunk in response.aiter_bytes(chunk_size=128):
body += chunk
if len(body) >= config.BODY_INSPECT_LIMIT:
break
except Exception:
pass
body_text = body.decode("utf-8", errors="ignore").lower()
if any(m in body_text for m in config.BODY_BLOCK_MARKERS):
await response.aclose()
return ("[bold red]ISP PAGE[/bold red]", "Блок-страница в теле", len(body), elapsed)
return ("[green]REDIR[/green]", "", bytes_read, time.time() - start)
await response.aclose()
elapsed = time.time() - start
if 200 <= status_code < 500:
return ("[green]OK[/green]", "", bytes_read, elapsed)
@ -204,10 +178,13 @@ async def check_domain_tls(
domain: str,
client: httpx.AsyncClient,
semaphore: asyncio.Semaphore,
stub_ips: set = None,
resolved_ip: str = None,
) -> Tuple[str, str, float]:
"""Одна TLS-проверка. Возвращает (status, detail, elapsed)."""
status, detail, _, elapsed = await _check_tls_single(domain, client, semaphore)
status, detail, _, elapsed = await _check_tls_single(
domain, client, semaphore, resolved_ip=resolved_ip, stub_ips=stub_ips
)
return (status, detail, elapsed)
@ -215,6 +192,7 @@ async def check_http_injection(
domain: str,
client: httpx.AsyncClient,
semaphore: asyncio.Semaphore,
stub_ips: set = None,
) -> Tuple[str, str]:
"""Проверяет HTTP-инжекцию (plain HTTP). Клиент передаётся снаружи."""
clean_domain = domain.replace("https://", "").replace("http://", "")
@ -237,31 +215,31 @@ async def check_http_injection(
await response.aclose()
return ("[bold red]BLOCKED[/bold red]", "HTTP 451")
if any(m in location.lower() for m in config.BLOCK_MARKERS):
if location and 300 <= status_code < 400:
await response.aclose()
return ("[bold red]ISP PAGE[/bold red]", "Блок-страница")
if 200 <= status_code < 300:
body = b""
try:
async for chunk in response.aiter_bytes(chunk_size=128):
body += chunk
if len(body) >= config.BODY_INSPECT_LIMIT:
break
parsed_loc = urlparse(
location if location.startswith('http') else f'https://{location}'
)
loc_domain = parsed_loc.netloc.lower().split(':')[0]
norm_loc = loc_domain.removeprefix('www.')
norm_dom = clean_domain.lower().removeprefix('www.')
if norm_loc == norm_dom or norm_loc.endswith('.' + norm_dom):
return ("[green]REDIR[/green]", f"{status_code}")
else:
return ("[bold red]REDIR[/bold red]", f"{loc_domain[:30]}")
except Exception:
pass
await response.aclose()
body_text = body.decode("utf-8", errors="ignore").lower()
if any(m in body_text for m in config.BODY_BLOCK_MARKERS):
return ("[bold red]ISP PAGE[/bold red]", "Блок-страница (HTTP)")
return ("[green]OK[/green]", f"{status_code}")
return ("[bold red]REDIR[/bold red]", f"{location[:30]}")
if 300 <= status_code < 400:
await response.aclose()
return ("[green]REDIR[/green]", f"{status_code}")
await response.aclose()
if 200 <= status_code < 300:
return ("[green]OK[/green]", f"{status_code}")
return ("[green]OK[/green]", f"{status_code}")
except (httpx.ConnectTimeout, httpx.ConnectError) as e: