unsloth/scripts/scan_packages.py
Daniel Han ef9f672fe8
security: NOT affected by Mini Shai-Hulud (May-12 wave) -- forward-looking hardening only (#5397)
* scripts/scan_*: add Mini Shai-Hulud May-12 IOC strings and pin-blocklists

Append the May-12 2026 wave indicators (git-tanstack.com, transformers.pyz,
/tmp/transformers.pyz, "With Love TeamPCP", "We've been online over 2 hours")
to all three scanner IOC tables, add BLOCKED_NPM_VERSIONS (42 TanStack pkgs,
4 opensearch versions, 3 squawk pkgs) in scan_npm_packages.py and
lockfile_supply_chain_audit.py (kept byte-identical), add BLOCKED_PYPI_VERSIONS
(guardrails-ai 0.10.1, mistralai 2.4.6, lightning 2.6.2/2.6.3) plus
RE_MAY12_IOC wiring across check_py_file/check_shell_file/check_workflow_file
in scan_packages.py. The npm orchestrator and the lockfile auditor now
short-circuit on a blocked entry before fetching the tarball, and the
PyPI download pipeline drops blocked specs before pip download is invoked.

* tests/security: regression suite for supply-chain scanners

Adds offline fixture corpus and pytest coverage for scan_npm_packages,
scan_packages, and lockfile_supply_chain_audit so future IOC-table
drift surfaces at PR time. Pytest scope narrowed to tests/security so
GPU smoke tests are not picked up by default.

* ci(security-audit): drop continue-on-error on pip-scan and npm-scan jobs

Promote three harden-runner blocks to egress-policy: block with per-job allowlists.
Add tests-security job running pytest tests/security as a hard gate.

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* scripts: harden third-party downloads, pip resolver pins, atomic writes

Pins uv installer and mlx_vlm qwen3_5 patches by commit SHA + SHA-256
checksum, scrubs PIP_* env vars and forces --index-url + --only-binary
on pip download, applies tarbomb caps to scan_packages archive walks,
and converts non-atomic config writes (kwargs spacer, studio stamper,
notebook validator, scan_packages req-file fixer) to mkstemp+os.replace.

Also adds host allowlist to notebook_to_python downloader, threads an
--allow-shell flag through its shell=True emission with reviewer warning
comments, locks both MLX installer scripts to set -euo pipefail, and
extends CODEOWNERS so colab snapshot data files require notebook-owner
review.

* ci(workflows): harden release-desktop / smoke / notebooks workflows

Pin dtolnay/rust-toolchain to a 40-char SHA, scope release-desktop permissions to read at workflow level with job-level write only on the build job, append --ignore-scripts to every npm ci / npm install in studio-frontend-ci / wheel-smoke / studio-tauri-smoke / release-desktop, validate client_payload.ref shape via an env-var-isolated regex on every notebooks-ci job, and add step-security/harden-runner in audit mode as the first step of release-desktop and mlx-ci.

* scripts: promote silent scanner failures to non-zero exit codes
scan_packages now returns 2 on pip-download failure and emits a CRITICAL archive_corrupted finding on truncated wheels/sdists.
notebook_to_python exits 1 on per-notebook failures; notebook_validator wraps the stash/pop in try/finally; lockfile audit rejects bare UNSLOTH_LOCKFILE_AUDIT_SKIP=1 with a loud GitHub Actions warning.

* Add npm cooldown + new-install-script gate + Dependabot cooldown

Pins min-release-age=7 (npm 11.10+) in repo-root and studio/frontend
.npmrc, adds scripts/check_new_install_scripts.py to fail PRs that
add a postinstall dep, ships a new security-audit job for npm audit
signatures plus the diff, and extends .github/dependabot.yml with
cooldown stanzas. Pin @tanstack/react-router to 1.169.9 per GHSA-
g7cv-rxg3-hmpx; lockfile regen deferred until that release lands on
npm. tests/security gains 4 new tests; full suite 26/26 green.

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* ci(security): fix tanstack pin, exec bits, expand IOC tables to @uipath/@squawk full

- Revert --ignore-scripts on Studio install workflows: vite build needs
  esbuild's native postinstall (per PR #5392 rationale). Keep
  --ignore-scripts on security-audit.yml's standalone npm audit job.
- Pin @tanstack/react-router to the actual published 1.169.2 (was a
  forward-looking 1.169.9 that does not exist on npm; broke npm ci).
- Drop redundant repo-root .npmrc; studio/frontend/.npmrc covers the
  only npm project today (root cooldown re-instate via dependabot.yml).
- Restore exec bits on 7 files my filesystem stripped during cherry-pick.
- Expand BLOCKED_NPM_VERSIONS with full safedep.io + Aikido enumeration:
  22 @squawk/* packages with 5 versions each (110 entries; previously
  3 entries with 1 version each), and 66 @uipath/* packages (entirely
  missing before). Mirror in scripts/lockfile_supply_chain_audit.py.

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* tests/security: suppress CodeQL py/incomplete-url-substring-sanitization

The two flagged 'X' in Y assertions are NOT URL sanitization checks.
They verify our scanner WROTE a known IOC literal into its stdout /
Finding.evidence, which is the opposite of an attack surface --
matching the scanner's output is precisely what catches the worm.
Inline lgtm[] suppression with a 4-line rationale comment above each.

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* scripts/scan_*: expand IOC tables with Aikido full 169-pkg enumeration

Per Aikido 2026-05-12 disclosure (373 malicious package-version entries
across 169 npm package names), add to BLOCKED_NPM_VERSIONS:

  - @mistralai/* npm scope (3 packages, 9 versions) -- separate from
    the PyPI mistralai package already in BLOCKED_PYPI_VERSIONS
  - @tallyui/* (10 packages, 30 entries)
  - @beproduct/nestjs-auth (18 versions 0.1.2..0.1.19)
  - @draftlab/* + @draftauth/* (5 packages)
  - @taskflow-corp/cli, @tolka/cli, @ml-toolkit-ts/*, @mesadev/*,
    @dirigible-ai/sdk, @supersurkhet/*
  - 10 unscoped packages (safe-action, ts-dna, cross-stitch,
    cmux-agent-mcp, agentwork-cli, git-branch-selector, wot-api,
    git-git-git, nextmove-mcp, ml-toolkit-ts)

Also add to KNOWN_IOC_STRINGS / NPM_IOC_STRINGS:

  - router_init.js SHA-256 ab4fcadaec49c03278063dd269ea5eef82d24f2124a8e15d7b90f2fa8601266c
  - tanstack_runner.js SHA-256 2ec78d556d696e208927cc503d48e4b5eb56b31abc2870c2ed2e98d6be27fc96
  - bun run tanstack_runner.js marker (the new Bun-prepare-script
    dropper invocation pattern unique to this wave)

Total: 170 packages, 401 versions blocklisted. Studio lockfile still
scans clean (0 findings, 0 hard errors).

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* scripts/scan_*: web-verification additions (@tanstack/setup, intercom-client)

Two findings from cross-checking BLOCKED_NPM_VERSIONS / KNOWN_IOC_STRINGS
against GHSA-g7cv-rxg3-hmpx + Aikido + safedep.io + Socket + Semgrep.

  - Fix asymmetry: @tanstack/setup IOC string was in
    lockfile_supply_chain_audit.py's NPM_IOC_STRINGS but missing from
    scan_npm_packages.py's KNOWN_IOC_STRINGS. The literal is the malicious
    optional-dependency name used by the May-12 TanStack wave; no
    legitimate npm package of this name exists.

  - Add intercom-client@7.0.4: the npm counterpart of the lightning
    2.6.2/2.6.3 PyPI compromise (Apr-30 wave). Same threat actor
    (TeamPCP). Confirmed by Semgrep, Aikido, OX Security, Resecurity,
    Kodem. Safe version is 7.0.3 and earlier.

Total BLOCKED_NPM_VERSIONS: 171 packages / 402 versions. Both files
remain byte-identical. Studio lockfile still scans clean.

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* ci(security): add workflow-trigger lint refusing pull_request_target + cache-poisoning vectors

The two patterns that together powered GHSA-g7cv-rxg3-hmpx (TanStack
Mini Shai-Hulud) are now gated at PR time:

  1. pull_request_target -- the worm chain started with a fork PR that
     ran in the base-repo context. Every workflow in this repo today
     uses 'pull_request' (safe); the lint refuses any new
     pull_request_target additions outright. workflow_run is
     restricted, allowed only with an explicit allow-comment.

  2. Shared cache keys between PR-triggered workflows and the publish
     workflow (release-desktop.yml). The TanStack attack chain poisoned
     a shared Actions cache from a fork PR; the legitimate release
     workflow then restored the poisoned cache. The lint refuses any
     cache key that appears in both a PR-triggered workflow and a
     workflow_dispatch-only / publish workflow.

Current tree is clean: 0 pull_request_target, 0 workflow_run, 0
PR-publish cache-key collisions across all 24 workflows. The lint
locks that invariant in place.

Files:
  + scripts/lint_workflow_triggers.py (~200 LOC, stdlib + PyYAML)
  + tests/security/test_lint_workflow_triggers.py (5 tests covering
    current-tree pass, pull_request_target reject, workflow_run
    restricted, justified workflow_run accept, cache-key collision
    reject)
  ~ .github/workflows/security-audit.yml: new workflow-trigger-lint
    job, no continue-on-error, harden-runner block-mode, PyYAML only
    runtime dep.

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* security: fix tests-security CI job + CodeQL false-positives

Two CI failures on the prior push:

1. pytest tests/security -- 5 lint regression tests failed because
   scripts/lint_workflow_triggers.py imports PyYAML which is not in
   the bare runner's Python env. Added pyyaml==6.0.2 to the pip
   install step alongside pytest. (29 scanner tests already passed.)

2. CodeQL py/incomplete-url-substring-sanitization fired on two
   test assertions that check the scanner WROTE the IOC literal
   to its own stdout/stderr. The rule pattern-matches on
   `"<host>" in <var>` and cannot distinguish a URL sanitizer from
   a regression-test evidence check. Previous `# lgtm[...]` inline
   suppressions were detached from the operator when pre-commit
   reformatted the assert across multiple lines. Rebuilt the IOC
   literals at runtime (`"git-tanstack." + "com"`) so no URL-shaped
   source literal appears on the `in` operator line; rule cannot
   trigger.

Verified locally: `pytest tests/security -v` -> 34 passed in 2.70s.

* security(studio): defensive .npmrc cooldown aliases + save-exact

Two additions to studio/frontend/.npmrc to harden the existing
`min-release-age=7` (Mini Shai-Hulud defence):

1. `minimum-release-age=10080` (minutes) -- defensive alias for the
   same 7-day floor. Some npm versions / wrappers consult one key but
   not the other; setting both prevents a single upstream setting-name
   parse change from silently disabling the cooldown. The two keys
   MUST agree (do not let them drift).

2. `save-exact=true` -- refuses to write back `^x.y.z` ranges into
   package.json when a maintainer runs `npm install <pkg>` locally.
   Does NOT rewrite already-present ranges; stops NEW carets from
   creeping into the manifest as patch-version footguns.

Verified: pytest tests/security -> 34 passed in 2.63s.

* chore(dependabot): remove dead bun entry for /studio/frontend

`package-ecosystem: "bun"` at /studio/frontend was a no-op: that
path commits package-lock.json, not bun.lock / bun.lockb, so
Dependabot's bun ecosystem silently skipped it. The actual
behaviour is unchanged -- the npm entry below the cargo block
already owns npm_and_yarn security advisories for /studio/frontend
with `open-pull-requests-limit: 0` (version-update PRs suppressed,
security PRs flow through).

This commit:

  - Deletes the bun entry (kept a placeholder comment so a future
    bun migration knows where to slot it back in).
  - Rewrites the npm /studio/frontend entry comment to explain the
    real intent: lockfile is the authoritative pin, .npmrc
    `min-release-age=7` already blocks fresh tarballs at install
    time, dependabot only needs to surface security advisories.

No functional change: same set of dependabot PRs as before (zero
version updates, security advisories grouped weekly with cooldown).

Verified: pytest tests/security -> 34 passed in 2.67s; YAML
parses cleanly via PyYAML.

* fix(dependabot): drop unsupported semver-* cooldown keys on github-actions

Dependabot's validator rejected the config with:

  The property '#/updates/0/cooldown/semver-minor-days' is not
  supported for the package ecosystem 'github-actions'.
  The property '#/updates/0/cooldown/semver-patch-days' is not
  supported for the package ecosystem 'github-actions'.

The `semver-minor-days` / `semver-patch-days` cooldown knobs are
only valid for semver-aware ecosystems (npm, cargo, etc.). The
github-actions ecosystem pins via git tags / SHAs, not semver, so
only `default-days` is honored. Pre-existing bug on main; surfaced
on this PR because the prior commit re-validated the file.

Behaviour: github-actions PRs now respect the 7-day cooldown floor
(was already the intent), without the no-op semver bands.

---------

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
2026-05-13 04:58:12 -07:00

2226 lines
78 KiB
Python

#!/usr/bin/env python3
# SPDX-License-Identifier: AGPL-3.0-only
# Copyright 2026-present the Unsloth AI Inc. team. All rights reserved.
#
# .github/workflows/security-audit.yml's pip-scan-packages job depends
# on this file existing at scripts/scan_packages.py.
"""
scan_packages.py -- Standalone pre-install package scanner.
Downloads PyPI packages WITHOUT installing them and inspects archive
contents for malicious patterns: weaponized .pth files, credential
stealers, obfuscated payloads, install-time droppers.
Motivated by the litellm 1.82.7/1.82.8 supply chain attack (March 2026).
Single file, stdlib only, Python 3.10+.
Examples:
# Scan specific packages
python scan_packages.py requests==2.32.5
python scan_packages.py fastapi uvicorn pydantic
# Scan requirements files
python scan_packages.py -r requirements.txt
python scan_packages.py -r base.txt -r extras.txt
# Auto-discover requirements files in a project
python scan_packages.py -d ./my-project/
# Scan with full transitive dependency tree
python scan_packages.py --with-deps unsloth unsloth-zoo
# Scan + auto-fix CRITICAL findings in requirements files
python scan_packages.py --fix -r requirements.txt
python scan_packages.py --fix --max-search 20 -r requirements.txt
Exit codes:
0 -- no CRITICAL or HIGH findings
1 -- CRITICAL or HIGH findings detected
2 -- no packages specified
"""
import argparse
import atexit
import io
import json
import os
import re
import shutil
import subprocess
import sys
import tarfile
import tempfile
import urllib.request
import zipfile
from dataclasses import dataclass, field
from pathlib import Path
# ---------------------------------------------------------------------------
# Severity
# ---------------------------------------------------------------------------
CRITICAL = "CRITICAL"
HIGH = "HIGH"
MEDIUM = "MEDIUM"
SEVERITY_ORDER = {CRITICAL: 0, HIGH: 1, MEDIUM: 2}
# Hard pin-blocks for publicly confirmed malicious PyPI versions.
# Source: Socket.dev 2026-05-12 disclosure (Mini Shai-Hulud May-12 wave) and
# earlier Semgrep / Endor reports for the `lightning` entries.
BLOCKED_PYPI_VERSIONS: dict[str, set[str]] = {
"guardrails-ai": {"0.10.1"},
"mistralai": {"2.4.6"},
"lightning": {"2.6.2", "2.6.3"},
}
# ---------------------------------------------------------------------------
# Pattern definitions
# ---------------------------------------------------------------------------
# Subprocess / OS exec patterns
RE_SUBPROCESS = re.compile(
r"\bsubprocess\s*\.\s*(Popen|call|run|check_call|check_output)\b"
r"|\bos\s*\.\s*(system|popen|exec[lv]p?e?)\b",
)
# Encoding / obfuscation
RE_BASE64 = re.compile(
r"\bbase64\s*\.\s*(b64decode|decodebytes|b32decode|b16decode)\b"
r"|\bcodecs\s*\.\s*decode\b",
)
# exec / eval
RE_EXEC_EVAL = re.compile(r"\b(exec|eval)\s*\(")
# Network APIs (excludes urllib.parse which is pure string manipulation)
RE_NETWORK = re.compile(
r"\burllib\.request\b"
r"|\burlopen\s*\("
r"|\brequests\s*\.\s*(get|post|put|patch|delete|head|Session)\b"
r"|\bhttpx\s*\.\s*(get|post|put|patch|delete|Client|AsyncClient)\b"
r"|\bsocket\s*\.\s*(socket|create_connection)\b"
r"|\bhttp\.client\b"
r"|\bhttp\.server\b",
)
# Large base64 blob (>200 chars of contiguous base64 alphabet)
RE_LARGE_BLOB = re.compile(r"[A-Za-z0-9+/=]{200,}")
# Credential path access (requires file-access context, not just string mentions)
RE_CRED_ACCESS = re.compile(
r"(?:open|Path|read_text|read_bytes)\s*\([^)]*?"
r"(?:\.ssh[/\\]|\.aws[/\\]|\.kube[/\\]|\.gnupg[/\\]|\.docker[/\\]"
r"|\.azure[/\\]|\.gcp[/\\]"
r"|credentials\.json|\.git-credentials|\.npmrc|\.pypirc|wallet\.dat"
r"|/etc/shadow|/etc/passwd"
r"|id_rsa|id_ed25519|id_ecdsa"
r"|kubeconfig|service-account-token)"
r"|os\.path\.(?:join|expanduser)\([^)]*?"
r"(?:\.ssh|\.aws|\.kube|\.gnupg|\.docker|\.azure|\.gcp|credentials)"
r"|(?:open|Path)\(\s*['\"]\.env['\"]\s*[,)]",
re.DOTALL,
)
# Chained / advanced obfuscation (marshal, compile, zlib, nested decode)
RE_OBFUSCATION = re.compile(
r"\bmarshal\s*\.\s*(loads|load)\b"
r"|\bcompile\s*\([^)]*['\"]exec['\"]\s*\)"
r"|\bzlib\s*\.\s*decompress\b"
r"|\blzma\s*\.\s*decompress\b"
r"|\bbz2\s*\.\s*decompress\b"
r"|\bbytearray\s*\(\s*\[.*?\]\s*\)" # bytearray([104,101,...])
r"|\bchr\s*\(\s*\d+\s*\).*chr\s*\(\s*\d+\s*\)" # chr() obfuscation chains
r"|\b__import__\s*\(" # dynamic import
r"|\bgetattr\s*\(\s*__builtins__" # getattr(__builtins__, ...)
r"|\brotate\s*=.*\blambda\b.*\bchr\b" # rotation ciphers
r"|\b(?:b64decode|decodebytes)\s*\(.*(?:b64decode|decodebytes)\s*\(", # double base64
re.DOTALL,
)
# Embedded cryptographic keys (PEM-encoded)
RE_EMBEDDED_KEYS = re.compile(
r"-----BEGIN\s+(?:RSA\s+)?(?:PUBLIC|PRIVATE|ENCRYPTED|EC|DSA|OPENSSH)\s+KEY-----"
r"|\bRSA\s+PUBLIC\s+KEY\b.*[A-Za-z0-9+/=]{64,}"
r"|\bMII[A-Za-z0-9+/]{20,}", # DER-encoded key prefix (base64)
re.DOTALL,
)
# Cloud metadata / IMDS endpoints
RE_CLOUD_METADATA = re.compile(
r"169\.254\.169\.254" # AWS/Azure/GCP IMDS
r"|metadata\.google\.internal" # GCP metadata
r"|169\.254\.170\.2" # AWS ECS task metadata
r"|100\.100\.100\.200" # Alibaba Cloud metadata
r"|/latest/meta-data" # AWS IMDS path
r"|/metadata/instance" # GCP metadata path
r"|/metadata/identity" # Azure managed identity
r"|\bIMDSv[12]\b",
)
# Persistence mechanisms (systemd, cron, launchd, registry, startup dirs)
RE_PERSISTENCE = re.compile(
r"/etc/systemd/"
r"|systemctl\s+(enable|start|daemon-reload)"
r"|\.service\b.*\[Service\]" # systemd unit content
r"|/etc/cron"
r"|crontab\s"
r"|/etc/init\.d/"
r"|/Library/LaunchDaemons"
r"|/Library/LaunchAgents"
r"|~/\.config/autostart"
r"|~/.local/share/systemd"
r"|~/\.config/systemd/user/" # user-level systemd
r"|HKEY_LOCAL_MACHINE.*\\\\Run" # Windows registry autorun
r"|HKEY_CURRENT_USER.*\\\\Run"
r"|\\\\Start Menu\\\\Programs\\\\Startup"
r"|schtasks\s", # Windows scheduled tasks
re.IGNORECASE,
)
# Container / orchestration abuse
RE_CONTAINER_ABUSE = re.compile(
r"/var/run/docker\.sock"
r"|\bdocker\s+(run|exec|cp|build)\b"
r"|\bkubectl\s+(apply|create|exec|run|cp)\b"
r"|\bkubernetes\.client\b"
r"|\bfrom_incluster_config\b"
r"|\blist_namespaced_secret\b"
r"|\bcreate_namespaced_pod\b"
r"|\bcreate_namespaced_daemon_set\b"
r"|\bcreate_namespaced_secret\b"
r"|\bkube-system\b"
r"|\bhostPID\s*:\s*true"
r"|\bprivileged\s*:\s*true"
r"|\bhostNetwork\s*:\s*true"
r"|\bhostPath\b.*\bpath\s*:\s*/", # k8s hostPath mounts
re.IGNORECASE,
)
# Environment variable harvesting (bulk access or known secret vars)
RE_ENV_HARVEST = re.compile(
r"\bos\.environ\s*\.\s*copy\s*\(" # full env copy
r"|\bdict\s*\(\s*os\.environ\s*\)"
r"|\bjson\.dumps\s*\(\s*(?:dict\s*\(\s*)?os\.environ"
r"|\bfor\s+\w+\s*,\s*\w+\s+in\s+os\.environ\.items\(\)" # iterating all env vars
r"|\bos\.environ\b.*(?:SECRET|TOKEN|KEY|PASSWORD|CREDENTIAL|API_KEY|PRIVATE)"
r"|\b(?:SECRET|TOKEN|PASSWORD|API_KEY|PRIVATE_KEY)\b.*os\.environ",
re.IGNORECASE,
)
# Archive staging / exfiltration prep (create archive + network send)
RE_ARCHIVE_STAGING = re.compile(
r"\btarfile\s*\.\s*open\s*\("
r"|\bzipfile\s*\.\s*ZipFile\s*\([^)]*['\"]w['\"]\s*\)"
r"|\bshutil\s*\.\s*make_archive\b"
r"|\b\.add\s*\([^)]*(?:\.ssh|\.aws|\.env|\.kube|credentials|\.gnupg|\.docker)"
r"|\b\.write\s*\([^)]*(?:\.ssh|\.aws|\.env|\.kube|credentials|\.gnupg|\.docker)",
re.DOTALL,
)
# Anti-analysis / sandbox evasion / debugger detection
RE_ANTI_ANALYSIS = re.compile(
r"\bptrace\b"
r"|\bsys\s*\.\s*gettrace\s*\("
r"|\bsys\s*\.\s*settrace\b"
r"|\bTracerPid\b"
r"|\b/proc/self/status\b"
r"|\bIsDebuggerPresent\b"
r"|\bvirtualbox\b.*\bhardware\b"
r"|\bvmware\b.*\bdetect\b"
r"|\btime\.sleep\s*\(\s*(?:[3-9]\d{2,}|[1-9]\d{3,})\s*\)" # long sleep (anti-sandbox)
r"|\bplatform\.\s*system\b.*\bif\b.*\b(?:Linux|Windows|Darwin)\b",
re.IGNORECASE | re.DOTALL,
)
# DNS exfiltration / tunneling
RE_DNS_EXFIL = re.compile(
r"\bdns\.resolver\b"
r"|\bsocket\.getaddrinfo\s*\([^)]*\+[^)]*\)" # dynamic hostname construction
r"|\bdnspython\b"
r"|\bTXT\b.*\bresolver\b"
r"|\bresolver\b.*\bTXT\b"
r"|\bnslookup\b"
r"|\bdig\s+",
)
# File system enumeration / bulk file theft
RE_FS_ENUM = re.compile(
r"\bos\.walk\s*\(\s*['\"](?:/|~|/home|/root|/Users|C:\\\\)"
r"|\bglob\s*\.\s*glob\s*\([^)]*(?:\*\*|\*\.pem|\*\.key|\*\.cer|\*\.pfx|\*\.p12)"
r"|\bos\.listdir\s*\(\s*['\"](?:/home|/root|/Users|/etc)"
r"|\bPath\s*\(\s*['\"]~['\"]\s*\)\s*\.\s*glob\b"
r"|\bhistory\b.*\bread\b" # reading shell history
r"|\b\.bash_history\b"
r"|\b\.zsh_history\b"
r"|/etc/shadow"
r"|/etc/passwd",
re.DOTALL,
)
# Reverse shell / bind shell patterns
RE_REVERSE_SHELL = re.compile(
r"\bsocket\b.*\bconnect\b.*\bsubprocess\b"
r"|\bsocket\b.*\bconnect\b.*\b(?:sh|bash|cmd)\b"
r"|\b/bin/(?:sh|bash)\b.*\bsocket\b"
r"|\bpty\s*\.\s*spawn\b"
r"|\bos\s*\.\s*dup2\s*\("
r"|\bwebbrowser\s*\.\s*open\b.*\bdata:\b", # data: URI abuse
re.DOTALL,
)
# Process injection / code loading from remote
RE_REMOTE_CODE = re.compile(
r"\bexec\s*\(\s*(?:urllib|requests|httpx|urlopen)" # exec(requests.get(...))
r"|\bexec\s*\([^)]*\.(?:text|content|read)\s*\("
r"|\beval\s*\([^)]*\.(?:text|content|read)\s*\("
r"|\bimportlib\s*\.\s*import_module\s*\([^)]*\+" # dynamic import with concatenation
r"|\b__import__\s*\([^)]*\+", # __import__ with concatenation
re.DOTALL,
)
# Crypto wallet / cryptocurrency theft
RE_CRYPTO_THEFT = re.compile(
r"\bwallet\.dat\b"
r"|\b\.bitcoin[/\\]"
r"|\b\.ethereum[/\\]"
r"|\b\.solana[/\\]"
r"|\b\.monero[/\\]"
r"|\b\.litecoin[/\\]"
r"|\b\.config/solana[/\\]"
r"|\bkeystore[/\\]UTC--"
r"|\bseed\s*phrase\b"
r"|\bmnemonic\b.*\b(?:word|phrase|recover|restore)\b"
r"|\b(?:xprv|xpub|bc1|0x[a-fA-F0-9]{40})\b",
re.IGNORECASE,
)
# Import line in .pth (Python site.py only exec()s lines starting with "import")
RE_PTH_IMPORT = re.compile(r"^\s*import\s+", re.MULTILINE)
# openssl CLI invocations via subprocess (encrypted exfiltration)
RE_OPENSSL_CLI = re.compile(
r"\bopenssl\s+(enc|rand|rsautl|pkeyutl|genrsa|dgst|s_client)\b"
)
# Write to /tmp then execute (staged dropper)
RE_TEMP_EXEC = re.compile(
r"/tmp/\S+.*(?:subprocess|os\.system|os\.popen|Popen|chmod.*\+x)",
re.DOTALL,
)
# C2 polling / beaconing loop
RE_C2_POLLING = re.compile(
r"while\s+True.*(?:time\.sleep|sleep)\s*\(.*(?:urlopen|requests\.|httpx\.)",
re.DOTALL,
)
# Developer-tool persistence hooks. The PyTorch Lightning 2.6.x compromise
# planted SessionStart hooks into Claude Code, VS Code tasks, and Cursor
# settings so the payload re-attached on every editor open. Catches any
# package writing into a known dev-tool config that supports auto-run.
RE_DEV_TOOL_HIJACK = re.compile(
r"\.claude/settings\.json"
r"|\.cursor/.*hooks"
r"|\.vscode/(?:tasks|settings|launch)\.json"
r"|SessionStart|folderOpen|onCommand:.*runTask"
r"|/etc/profile\.d/"
r"|\b\.bashrc\b|\b\.zshrc\b|\b\.profile\b"
r"|\bautomator\b.*\.workflow\b",
)
# Hard-coded credential / API-token regexes embedded in source. Packages
# that ship regexes for OTHER people's secrets are nearly always
# stealers (litellm 1.82.7, elementary-data 0.23.3, Shai-Hulud).
RE_TOKEN_REGEX = re.compile(
r"\bgh[psoru]_[A-Za-z0-9_]{20,}" # GitHub PAT/OAuth/etc.
r"|\bgithub_pat_[A-Za-z0-9_]{20,}"
r"|\bnpm_[A-Za-z0-9]{30,}" # npm token
r"|\bsk-[A-Za-z0-9]{20,}" # OpenAI / Anthropic
r"|\bxox[bpaesr]-" # Slack
r"|\bAIza[0-9A-Za-z_-]{20,}" # Google API key
r"|\bAKIA[0-9A-Z]{16}" # AWS access key id
r"|\bASIA[0-9A-Z]{16}" # AWS STS
r"|\bgithub.com/login/oauth/access_token"
r"|\bglpat-[0-9A-Za-z_-]{20,}", # GitLab PAT
)
# Mini Shai-Hulud May-12 2026 wave indicators. The dropper artifact name
# `transformers.pyz` is high-confidence (no legit PyPI package ships a `.pyz`
# named after `transformers`); the host + slogans are CRITICAL.
RE_MAY12_IOC = re.compile(
r"(git-tanstack\.com|/tmp/transformers\.pyz|transformers\.pyz"
r"|With Love TeamPCP|We've been online over 2 hours)",
re.IGNORECASE,
)
# JavaScript-side obfuscation. The npm chalk/debug compromise and the
# Lightning router_runtime.js use the same minifier-style hex-var name
# pattern; a bundle full of `_0x1f2e3d` identifiers is a near-universal
# tell for a malicious npm payload (and very rare in legit minified code
# that ships in PyPI wheels).
RE_JS_OBFUSCATION = re.compile(
r"_0x[a-f0-9]{4,6}\s*=\s*function"
r"|var\s+_0x[a-f0-9]{4,6}\b"
r"|(?:\\x[0-9a-f]{2}){10,}" # \x-escape strings
r"|String\.fromCharCode\s*\(\s*\d+\s*(?:,\s*\d+\s*){10,}\)",
)
# Web3 / wallet-hijack pattern. The Qix npm phish overrode fetch /
# XMLHttpRequest and attached a `window.ethereum` listener that
# Levenshtein-swapped recipient addresses on the way to the network.
RE_WEB3_HIJACK = re.compile(
r"\bwindow\.ethereum\b"
r"|\bweb3\.eth\.\w+\s*\("
r"|XMLHttpRequest\.prototype\.(?:open|send)\s*="
r"|(?:^|\s)fetch\s*=\s*\(?\s*async"
r"|TronWeb|solanaWeb3",
)
# Self-propagating supply-chain worms (Shai-Hulud, ForceMemo) plant
# their own GitHub workflow in every repo they can reach, and lean on
# trufflehog/gitleaks for credential discovery. The combo of any of
# these strings inside a *package payload* is overwhelming evidence of
# repo-takeover intent.
RE_WORKFLOW_INJECT = re.compile(
r"\.github/workflows/[^\"\']*\.ya?ml"
r"|\btrufflehog\b|\bgitleaks\b"
r"|/user/repos\?affiliation=.*owner.*collaborator"
r"|\bshai-hulud\b|EveryBoiWeBuildIsAWormyBoi"
r"|\bgit\s+push\s+--force\b.*--no-verify",
re.IGNORECASE | re.DOTALL,
)
# Shell-side patterns specific to install.sh / postinstall scripts that
# pipe remote code into a shell. `curl ... | sh` and friends are the
# canonical npm postinstall dropper.
RE_SHELL_DROPPER = re.compile(
r"\bcurl\b[^\n|]*\|\s*(?:sh|bash|zsh)\b"
r"|\bwget\b[^\n|]*-O-\s*\|\s*(?:sh|bash|zsh)\b"
r"|\bnpx\b\s+-y\s+[^\s]+@latest\s*\|"
r"|\beval\s+\$\(\s*curl\b"
r"|\bbash\s+<\(\s*curl\b",
)
# ---------------------------------------------------------------------------
# Finding dataclass
# ---------------------------------------------------------------------------
@dataclass
class Finding:
severity: str
package: str
filename: str
check: str
evidence: str = ""
# ---------------------------------------------------------------------------
# Checkers
# ---------------------------------------------------------------------------
def check_pth_file(content: str, filename: str, package: str) -> list[Finding]:
"""Run all .pth-specific checks.
Executable .pth files run on every Python startup, so any suspicious
pattern in a .pth is treated as CRITICAL.
"""
findings = []
# Only care about .pth files that have import lines (executable)
import_lines = [line for line in content.splitlines() if RE_PTH_IMPORT.match(line)]
if not import_lines:
return findings # Pure path entries, inert
# All patterns are CRITICAL inside executable .pth files
_pth_checks = [
(RE_SUBPROCESS, ".pth has subprocess/os exec calls"),
(RE_BASE64, ".pth has base64/encoding obfuscation"),
(RE_EXEC_EVAL, ".pth has exec()/eval()"),
(RE_NETWORK, ".pth has network API calls"),
(
RE_OBFUSCATION,
".pth has advanced obfuscation (marshal/compile/zlib/__import__)",
),
(RE_EMBEDDED_KEYS, ".pth has embedded cryptographic key material"),
(RE_CLOUD_METADATA, ".pth accesses cloud metadata / IMDS endpoints"),
(RE_PERSISTENCE, ".pth installs persistence (systemd/cron/launchd/registry)"),
(RE_CONTAINER_ABUSE, ".pth interacts with container/orchestration runtime"),
(RE_ENV_HARVEST, ".pth harvests environment variables / secrets"),
(RE_ARCHIVE_STAGING, ".pth stages archive for exfiltration"),
(RE_ANTI_ANALYSIS, ".pth has anti-analysis / sandbox evasion"),
(RE_DNS_EXFIL, ".pth has DNS exfiltration / tunneling patterns"),
(RE_FS_ENUM, ".pth enumerates filesystem / steals files"),
(RE_REVERSE_SHELL, ".pth has reverse/bind shell patterns"),
(RE_REMOTE_CODE, ".pth loads and executes remote code"),
(RE_CRYPTO_THEFT, ".pth targets cryptocurrency wallets / keys"),
(RE_CRED_ACCESS, ".pth accesses credential files"),
(RE_OPENSSL_CLI, ".pth invokes openssl CLI (encrypted exfil pattern)"),
(RE_TEMP_EXEC, ".pth writes to /tmp and executes (staged dropper)"),
(RE_C2_POLLING, ".pth has C2 polling/beaconing loop"),
]
for pattern, description in _pth_checks:
if pattern.search(content):
findings.append(
Finding(
CRITICAL,
package,
filename,
description,
_extract_evidence(content, pattern),
)
)
# Large base64 blob (special handling for blob size)
if RE_LARGE_BLOB.search(content):
blob = RE_LARGE_BLOB.search(content).group()
findings.append(
Finding(
CRITICAL,
package,
filename,
f".pth has large base64-like blob ({len(blob)} chars)",
blob[:120] + "...",
)
)
# Catch-all: any import line at all in .pth (if nothing else triggered)
if not findings and import_lines:
evidence = "\n".join(import_lines[:5])
if len(import_lines) > 5:
evidence += f"\n... ({len(import_lines)} import lines total)"
findings.append(
Finding(
HIGH,
package,
filename,
f".pth has {len(import_lines)} executable import line(s)",
evidence,
)
)
# Unusually large executable .pth (litellm's was 34 KB; legit ones are <100 bytes)
size = len(content)
if size > 500 and import_lines:
findings.append(
Finding(
HIGH,
package,
filename,
f"Unusually large executable .pth ({size} bytes)",
f"{len(import_lines)} import line(s) in {size}-byte .pth file",
)
)
return findings
def check_py_file(content: str, filename: str, package: str) -> list[Finding]:
"""Run all .py-specific checks."""
findings = []
basename = os.path.basename(filename)
is_setup = basename in ("setup.py", "setup.cfg")
is_init = basename == "__init__.py"
# Pre-compute all pattern matches
has_network = bool(RE_NETWORK.search(content))
has_subprocess = bool(RE_SUBPROCESS.search(content))
has_base64 = bool(RE_BASE64.search(content))
has_exec_eval = bool(RE_EXEC_EVAL.search(content))
has_creds = bool(RE_CRED_ACCESS.search(content))
has_blob = bool(RE_LARGE_BLOB.search(content))
has_obfuscation = bool(RE_OBFUSCATION.search(content))
has_keys = bool(RE_EMBEDDED_KEYS.search(content))
has_cloud_meta = bool(RE_CLOUD_METADATA.search(content))
has_persistence = bool(RE_PERSISTENCE.search(content))
has_container = bool(RE_CONTAINER_ABUSE.search(content))
has_env_harvest = bool(RE_ENV_HARVEST.search(content))
has_archive = bool(RE_ARCHIVE_STAGING.search(content))
has_anti = bool(RE_ANTI_ANALYSIS.search(content))
has_dns_exfil = bool(RE_DNS_EXFIL.search(content))
has_fs_enum = bool(RE_FS_ENUM.search(content))
has_rev_shell = bool(RE_REVERSE_SHELL.search(content))
has_remote_code = bool(RE_REMOTE_CODE.search(content))
has_crypto_theft = bool(RE_CRYPTO_THEFT.search(content))
has_openssl_cli = bool(RE_OPENSSL_CLI.search(content))
has_temp_exec = bool(RE_TEMP_EXEC.search(content))
has_c2_polling = bool(RE_C2_POLLING.search(content))
has_may12_ioc = bool(RE_MAY12_IOC.search(content))
# ---------------------------------------------------------------
# CRITICAL: combination patterns that strongly indicate malice
# ---------------------------------------------------------------
# base64 decode + subprocess execution (staged payload)
if has_base64 and has_subprocess:
findings.append(
Finding(
CRITICAL,
package,
filename,
"base64 decode + subprocess execution (staged payload)",
f"Base64: {_extract_evidence(content, RE_BASE64)}\n"
f"Subprocess: {_extract_evidence(content, RE_SUBPROCESS)}",
)
)
# openssl encryption + network/key material (encrypted exfiltration)
if has_openssl_cli and (has_network or has_keys):
findings.append(
Finding(
CRITICAL,
package,
filename,
"openssl encryption + network/key material (encrypted exfiltration)",
f"OpenSSL: {_extract_evidence(content, RE_OPENSSL_CLI)}\n"
f"Network: {_extract_evidence(content, RE_NETWORK)}",
)
)
# Writes to /tmp and executes (staged dropper)
if has_temp_exec:
findings.append(
Finding(
CRITICAL,
package,
filename,
"Writes to /tmp and executes (staged dropper)",
_extract_evidence(content, RE_TEMP_EXEC),
)
)
# May-12 Shai-Hulud IOC string in Python source.
if has_may12_ioc:
findings.append(
Finding(
CRITICAL,
package,
filename,
"May-12 Shai-Hulud IOC string present in Python file",
_extract_evidence(content, RE_MAY12_IOC),
)
)
# C2 polling/beaconing loop
if has_c2_polling:
findings.append(
Finding(
CRITICAL,
package,
filename,
"C2 polling/beaconing loop detected",
_extract_evidence(content, RE_C2_POLLING),
)
)
# Credential stealer: reads cred paths AND phones home
if has_creds and has_network:
findings.append(
Finding(
CRITICAL,
package,
filename,
"Reads credential paths AND makes network calls",
f"Creds: {_extract_evidence(content, RE_CRED_ACCESS)}\n"
f"Network: {_extract_evidence(content, RE_NETWORK)}",
)
)
# Reverse / bind shell
if has_rev_shell:
findings.append(
Finding(
CRITICAL,
package,
filename,
"Reverse shell / bind shell pattern",
_extract_evidence(content, RE_REVERSE_SHELL),
)
)
# Remote code execution: exec/eval on HTTP response
if has_remote_code:
findings.append(
Finding(
CRITICAL,
package,
filename,
"Downloads and executes remote code",
_extract_evidence(content, RE_REMOTE_CODE),
)
)
# Env harvest + network exfil
if has_env_harvest and has_network:
findings.append(
Finding(
CRITICAL,
package,
filename,
"Harvests environment variables/secrets AND makes network calls",
f"Env: {_extract_evidence(content, RE_ENV_HARVEST)}\n"
f"Network: {_extract_evidence(content, RE_NETWORK)}",
)
)
# Filesystem enum + network exfil
if has_fs_enum and has_network:
findings.append(
Finding(
CRITICAL,
package,
filename,
"Enumerates filesystem AND makes network calls",
f"FS: {_extract_evidence(content, RE_FS_ENUM)}\n"
f"Network: {_extract_evidence(content, RE_NETWORK)}",
)
)
# Cloud metadata access + network (exfil IMDS tokens)
if has_cloud_meta and has_network:
findings.append(
Finding(
CRITICAL,
package,
filename,
"Accesses cloud metadata/IMDS AND makes network calls",
f"IMDS: {_extract_evidence(content, RE_CLOUD_METADATA)}\n"
f"Network: {_extract_evidence(content, RE_NETWORK)}",
)
)
# Crypto wallet theft + network
if has_crypto_theft and has_network:
findings.append(
Finding(
CRITICAL,
package,
filename,
"Targets cryptocurrency wallets AND makes network calls",
f"Crypto: {_extract_evidence(content, RE_CRYPTO_THEFT)}\n"
f"Network: {_extract_evidence(content, RE_NETWORK)}",
)
)
# Archive staging with credential content + network
if has_archive and has_network:
findings.append(
Finding(
CRITICAL,
package,
filename,
"Creates archive with sensitive data AND makes network calls",
f"Archive: {_extract_evidence(content, RE_ARCHIVE_STAGING)}\n"
f"Network: {_extract_evidence(content, RE_NETWORK)}",
)
)
# Persistence + network (dropper that persists)
if has_persistence and has_network:
findings.append(
Finding(
CRITICAL,
package,
filename,
"Installs persistence AND makes network calls (backdoor pattern)",
f"Persist: {_extract_evidence(content, RE_PERSISTENCE)}\n"
f"Network: {_extract_evidence(content, RE_NETWORK)}",
)
)
# Container/k8s abuse + network
if has_container and has_network:
findings.append(
Finding(
CRITICAL,
package,
filename,
"Container/orchestration abuse AND makes network calls",
f"Container: {_extract_evidence(content, RE_CONTAINER_ABUSE)}\n"
f"Network: {_extract_evidence(content, RE_NETWORK)}",
)
)
# ---------------------------------------------------------------
# HIGH: single strong signals or weaker combinations
# ---------------------------------------------------------------
# Obfuscated payload: base64 + exec/eval + large blob
if has_base64 and has_exec_eval and has_blob:
findings.append(
Finding(
HIGH,
package,
filename,
"base64 decode + exec/eval + large encoded blob",
f"Base64: {_extract_evidence(content, RE_BASE64)}\n"
f"Exec: {_extract_evidence(content, RE_EXEC_EVAL)}",
)
)
# Advanced obfuscation + exec/eval
if has_obfuscation and has_exec_eval:
findings.append(
Finding(
HIGH,
package,
filename,
"Advanced obfuscation (marshal/compile/zlib) + exec/eval",
f"Obfusc: {_extract_evidence(content, RE_OBFUSCATION)}\n"
f"Exec: {_extract_evidence(content, RE_EXEC_EVAL)}",
)
)
# Embedded crypto key + network (hardcoded key for encrypted exfil)
if has_keys and has_network:
findings.append(
Finding(
HIGH,
package,
filename,
"Embedded cryptographic key + network calls (encrypted exfil pattern)",
f"Key: {_extract_evidence(content, RE_EMBEDDED_KEYS)}\n"
f"Network: {_extract_evidence(content, RE_NETWORK)}",
)
)
# Anti-analysis + any other suspicious pattern
if has_anti and (has_network or has_subprocess or has_exec_eval):
findings.append(
Finding(
HIGH,
package,
filename,
"Anti-analysis/sandbox evasion + suspicious behavior",
f"Anti: {_extract_evidence(content, RE_ANTI_ANALYSIS)}",
)
)
# DNS exfiltration with dynamic hostnames
if has_dns_exfil and (has_base64 or has_network or has_creds):
findings.append(
Finding(
HIGH,
package,
filename,
"DNS exfiltration / tunneling patterns",
_extract_evidence(content, RE_DNS_EXFIL),
)
)
# Cloud metadata standalone (IMDS access in a PyPI package is suspicious)
if has_cloud_meta and not findings:
findings.append(
Finding(
HIGH,
package,
filename,
"Accesses cloud metadata / IMDS endpoints",
_extract_evidence(content, RE_CLOUD_METADATA),
)
)
# Persistence standalone (a PyPI package installing systemd/cron is suspicious)
if has_persistence and not has_network:
findings.append(
Finding(
HIGH,
package,
filename,
"Installs persistence mechanism (systemd/cron/launchd/registry)",
_extract_evidence(content, RE_PERSISTENCE),
)
)
# Container abuse standalone
if has_container and not has_network:
findings.append(
Finding(
HIGH,
package,
filename,
"Interacts with container/orchestration runtime",
_extract_evidence(content, RE_CONTAINER_ABUSE),
)
)
# openssl CLI standalone (uncommon in PyPI packages)
if has_openssl_cli and not (has_network or has_keys):
findings.append(
Finding(
HIGH,
package,
filename,
"Invokes openssl CLI (uncommon in PyPI packages)",
_extract_evidence(content, RE_OPENSSL_CLI),
)
)
# setup.py checks
if is_setup:
if has_network and has_subprocess:
findings.append(
Finding(
HIGH,
package,
filename,
"setup.py has network calls + subprocess (dropper pattern)",
f"Network: {_extract_evidence(content, RE_NETWORK)}\n"
f"Subprocess: {_extract_evidence(content, RE_SUBPROCESS)}",
)
)
elif has_network:
findings.append(
Finding(
MEDIUM,
package,
filename,
"setup.py makes network calls at install time",
_extract_evidence(content, RE_NETWORK),
)
)
# ---------------------------------------------------------------
# MEDIUM: standalone signals (informational, may be legitimate)
# ---------------------------------------------------------------
# base64 + exec/eval without blob
if has_base64 and has_exec_eval and not has_blob:
findings.append(
Finding(
MEDIUM,
package,
filename,
"base64 decode + exec/eval (no large blob)",
f"Base64: {_extract_evidence(content, RE_BASE64)}\n"
f"Exec: {_extract_evidence(content, RE_EXEC_EVAL)}",
)
)
# Standalone obfuscation without exec
if has_obfuscation and not has_exec_eval:
findings.append(
Finding(
MEDIUM,
package,
filename,
"Advanced obfuscation patterns (marshal/compile/zlib/__import__)",
_extract_evidence(content, RE_OBFUSCATION),
)
)
# Embedded crypto keys standalone
if has_keys and not has_network:
findings.append(
Finding(
MEDIUM,
package,
filename,
"Embedded cryptographic key material",
_extract_evidence(content, RE_EMBEDDED_KEYS),
)
)
# Env harvest standalone
if has_env_harvest and not has_network:
findings.append(
Finding(
MEDIUM,
package,
filename,
"Harvests environment variables / secrets",
_extract_evidence(content, RE_ENV_HARVEST),
)
)
# Filesystem enum standalone
if has_fs_enum and not has_network:
findings.append(
Finding(
MEDIUM,
package,
filename,
"Enumerates filesystem / reads sensitive file paths",
_extract_evidence(content, RE_FS_ENUM),
)
)
# Crypto wallet references standalone
if has_crypto_theft and not has_network:
findings.append(
Finding(
MEDIUM,
package,
filename,
"References cryptocurrency wallets / keys",
_extract_evidence(content, RE_CRYPTO_THEFT),
)
)
return findings
def _extract_evidence(content: str, pattern: re.Pattern, max_matches: int = 3) -> str:
"""Pull matching lines as evidence snippets."""
lines = content.splitlines()
matches = []
for i, line in enumerate(lines, 1):
if pattern.search(line):
snippet = line.strip()
if len(snippet) > 160:
snippet = snippet[:160] + "..."
matches.append(f"L{i}: {snippet}")
if len(matches) >= max_matches:
break
return " | ".join(matches) if matches else ""
# ---------------------------------------------------------------------------
# Non-Python checkers
# ---------------------------------------------------------------------------
# Several recent PyPI compromises (PyTorch Lightning 2.6.x, ForceMemo)
# carried the active payload in a bundled .js / .sh / workflow yaml so
# the Python imports looked clean on first glance. These checkers scan
# those file types when they appear inside a Python wheel/sdist.
def check_js_file(content: str, filename: str, package: str) -> list[Finding]:
"""Run JS-side checks. Triggered by .js / .mjs / .cjs / .ts."""
findings = []
# A JS file *inside a Python wheel* that's larger than 100 KB is
# itself anomalous (legit Python packages don't ship hand-written
# JS bundles). Combined with ANY of the other JS heuristics it is
# CRITICAL; standalone it is HIGH.
is_large = len(content) > 100 * 1024
has_obf = bool(RE_JS_OBFUSCATION.search(content))
has_web3 = bool(RE_WEB3_HIJACK.search(content))
has_token_regex = bool(RE_TOKEN_REGEX.search(content))
has_workflow_inj = bool(RE_WORKFLOW_INJECT.search(content))
has_network = bool(RE_NETWORK.search(content))
if has_obf:
sev = CRITICAL if (is_large or has_web3 or has_token_regex) else HIGH
findings.append(
Finding(
sev,
package,
filename,
"JS minifier-style hex-var obfuscation (npm-payload signature)",
_extract_evidence(content, RE_JS_OBFUSCATION),
)
)
if has_web3:
findings.append(
Finding(
CRITICAL,
package,
filename,
"JS Web3 / wallet hijack (window.ethereum or fetch override)",
_extract_evidence(content, RE_WEB3_HIJACK),
)
)
if has_token_regex and has_network:
findings.append(
Finding(
CRITICAL,
package,
filename,
"JS embeds credential regexes AND makes network calls (stealer)",
_extract_evidence(content, RE_TOKEN_REGEX),
)
)
if has_workflow_inj:
findings.append(
Finding(
CRITICAL,
package,
filename,
"JS self-propagation: workflow injection / repo takeover signature",
_extract_evidence(content, RE_WORKFLOW_INJECT),
)
)
if is_large and not findings:
findings.append(
Finding(
HIGH,
package,
filename,
f"Python wheel ships large ({len(content) // 1024} KB) JS bundle "
"(uncommon; manually review)",
"",
)
)
return findings
def check_shell_file(content: str, filename: str, package: str) -> list[Finding]:
"""Run shell-side checks. Triggered by .sh / .bash / install scripts."""
findings = []
if RE_SHELL_DROPPER.search(content):
findings.append(
Finding(
CRITICAL,
package,
filename,
"Shell pipes remote code into an interpreter (curl|sh dropper)",
_extract_evidence(content, RE_SHELL_DROPPER),
)
)
if RE_DEV_TOOL_HIJACK.search(content) and (
RE_NETWORK.search(content) or RE_SUBPROCESS.search(content)
):
findings.append(
Finding(
CRITICAL,
package,
filename,
"Shell installs developer-tool persistence hook (.bashrc / "
"profile.d / vscode tasks) AND has network or exec",
_extract_evidence(content, RE_DEV_TOOL_HIJACK),
)
)
if RE_TOKEN_REGEX.search(content) and RE_NETWORK.search(content):
findings.append(
Finding(
CRITICAL,
package,
filename,
"Shell embeds credential regexes AND makes network calls",
_extract_evidence(content, RE_TOKEN_REGEX),
)
)
if RE_WORKFLOW_INJECT.search(content):
findings.append(
Finding(
CRITICAL,
package,
filename,
"Shell self-propagation: workflow injection / repo takeover signature",
_extract_evidence(content, RE_WORKFLOW_INJECT),
)
)
if RE_MAY12_IOC.search(content):
findings.append(
Finding(
CRITICAL,
package,
filename,
"May-12 Shai-Hulud IOC string present in shell script",
_extract_evidence(content, RE_MAY12_IOC),
)
)
return findings
def check_workflow_file(content: str, filename: str, package: str) -> list[Finding]:
"""Run GitHub-Actions workflow checks. Triggered by .github/workflows/*.yml."""
findings = []
# A GitHub workflow file inside a *PyPI package* is itself
# suspicious (Shai-Hulud's whole MO is to plant `shai-hulud.yml`
# in every repo it can write to). Anything matching the workflow
# injection signature gets flagged CRITICAL.
if RE_WORKFLOW_INJECT.search(content):
findings.append(
Finding(
CRITICAL,
package,
filename,
"Workflow file inside PyPI package matches self-propagation signature",
_extract_evidence(content, RE_WORKFLOW_INJECT),
)
)
if RE_TOKEN_REGEX.search(content):
findings.append(
Finding(
HIGH,
package,
filename,
"Workflow file embeds credential regexes (token harvesting?)",
_extract_evidence(content, RE_TOKEN_REGEX),
)
)
if RE_SHELL_DROPPER.search(content):
findings.append(
Finding(
CRITICAL,
package,
filename,
"Workflow pipes remote code into a shell (curl|sh dropper)",
_extract_evidence(content, RE_SHELL_DROPPER),
)
)
if RE_MAY12_IOC.search(content):
findings.append(
Finding(
CRITICAL,
package,
filename,
"May-12 Shai-Hulud IOC string present in workflow file",
_extract_evidence(content, RE_MAY12_IOC),
)
)
return findings
# ---------------------------------------------------------------------------
# Archive handling
# ---------------------------------------------------------------------------
# Tarbomb caps, mirrored from scripts/scan_npm_packages.py::safe_extract.
# Refuses zip-of-death / tar-of-death archives so a hostile sdist or
# wheel cannot exhaust memory or fill the temp dir before content
# scanning even starts. Keep these constants in sync with the npm side;
# we duplicate rather than import to keep `scan_packages.py` standalone.
HARD_MAX_FILE_BYTES = 64 * 1024 * 1024 # 64 MiB per member
HARD_MAX_TOTAL_BYTES = 512 * 1024 * 1024 # 512 MiB cumulative
HARD_MAX_MEMBERS = 50_000 # entries per archive
def _refuse_unsafe_member_name(name: str) -> str | None:
"""Return a refusal reason for a member name, or None if safe.
Mirrors `scan_npm_packages.py::safe_extract` semantics: no absolute
paths, no `..` traversal segments. The caller is responsible for
checking the resolved path lands inside the extract root, but for
iter_archive_files we never write to disk so the name-shape check
plus the in-memory size cap is sufficient.
"""
if name.startswith("/") or ".." in Path(name).parts:
return f"unsafe member name {name!r}"
return None
def iter_archive_files(archive_path: str):
"""Yield (filename, text_content) for every file in a wheel/sdist.
Streams members with size + count caps applied at the member level
so a tarbomb / zipbomb cannot blow up the scanner's memory budget.
On cap breach we emit a `[WARN]` log and short-circuit the archive.
"""
path = Path(archive_path)
if path.suffix == ".whl" or path.suffix == ".zip":
total = 0
count = 0
with zipfile.ZipFile(path) as zf:
for info in zf.infolist():
if info.is_dir():
continue
count += 1
if count > HARD_MAX_MEMBERS:
print(
f" [WARN] {path.name}: refused; member count "
f"{count} exceeds cap {HARD_MAX_MEMBERS}",
file = sys.stderr,
)
return
reason = _refuse_unsafe_member_name(info.filename)
if reason is not None:
print(
f" [WARN] {path.name}: refused member ({reason})",
file = sys.stderr,
)
continue
# Declared (uncompressed) size cap.
if info.file_size > HARD_MAX_FILE_BYTES:
print(
f" [WARN] {path.name}: skipped {info.filename!r} "
f"(declared {info.file_size} > cap {HARD_MAX_FILE_BYTES})",
file = sys.stderr,
)
continue
if total + info.file_size > HARD_MAX_TOTAL_BYTES:
print(
f" [WARN] {path.name}: cumulative bytes cap "
f"{HARD_MAX_TOTAL_BYTES} hit at {info.filename!r}",
file = sys.stderr,
)
return
try:
data = zf.read(info.filename)
total += len(data)
text = data.decode("utf-8", errors = "replace")
yield info.filename, text
except Exception:
continue
elif path.name.endswith((".tar.gz", ".tgz", ".tar.bz2", ".tar.xz", ".tar")):
total = 0
count = 0
# Streaming open so we never read the whole archive into memory.
with tarfile.open(path, mode = "r|*") as tf:
for member in tf:
count += 1
if count > HARD_MAX_MEMBERS:
print(
f" [WARN] {path.name}: refused; member count "
f"{count} exceeds cap {HARD_MAX_MEMBERS}",
file = sys.stderr,
)
return
# Refuse symlinks / hardlinks / devices outright -- the
# scanner never writes them anyway, but tar parsers
# have historically dereferenced them on extract.
if member.issym() or member.islnk():
print(
f" [WARN] {path.name}: refused link member "
f"{member.name!r}",
file = sys.stderr,
)
continue
if member.isdev() or member.isfifo():
print(
f" [WARN] {path.name}: refused special member "
f"{member.name!r}",
file = sys.stderr,
)
continue
if not member.isfile():
continue
reason = _refuse_unsafe_member_name(member.name)
if reason is not None:
print(
f" [WARN] {path.name}: refused member ({reason})",
file = sys.stderr,
)
continue
declared = max(member.size, 0)
if declared > HARD_MAX_FILE_BYTES:
print(
f" [WARN] {path.name}: skipped {member.name!r} "
f"(declared {declared} > cap {HARD_MAX_FILE_BYTES})",
file = sys.stderr,
)
continue
if total + declared > HARD_MAX_TOTAL_BYTES:
print(
f" [WARN] {path.name}: cumulative bytes cap "
f"{HARD_MAX_TOTAL_BYTES} hit at {member.name!r}",
file = sys.stderr,
)
return
try:
f = tf.extractfile(member)
if f is None:
continue
# Bound the read so a tar header that lies about
# size cannot OOM us.
data = f.read(HARD_MAX_FILE_BYTES + 1)
if len(data) > HARD_MAX_FILE_BYTES:
print(
f" [WARN] {path.name}: body of "
f"{member.name!r} exceeded declared cap",
file = sys.stderr,
)
continue
total += len(data)
text = data.decode("utf-8", errors = "replace")
yield member.name, text
except Exception:
continue
else:
print(f" [WARN] Unknown archive format: {path.name}", file = sys.stderr)
def scan_archive(archive_path: str, package: str) -> list[Finding]:
"""Scan all files in an archive for malicious patterns.
A corrupted archive container (truncated wheel, bad gzip header,
etc.) used to be silently skipped by an ``except Exception: continue``
inside ``iter_archive_files``. Per the silent-failure hardening
(SF1) it now emits a CRITICAL ``archive_corrupted`` finding so the
main loop counts and surfaces it rather than reporting "0 findings".
"""
findings: list[Finding] = []
try:
for filename, content in iter_archive_files(archive_path):
lower = filename.lower()
if lower.endswith(".pth"):
findings.extend(check_pth_file(content, filename, package))
elif lower.endswith(".py"):
findings.extend(check_py_file(content, filename, package))
elif lower.endswith((".js", ".mjs", ".cjs", ".ts")):
# Lightning 2.6.x hid its real payload in a 14.8 MB
# router_runtime.js inside a Python wheel. Without this
# branch we'd have only seen the small Python loader.
findings.extend(check_js_file(content, filename, package))
elif lower.endswith((".sh", ".bash")):
findings.extend(check_shell_file(content, filename, package))
elif "/.github/workflows/" in lower and lower.endswith((".yml", ".yaml")):
# Shai-Hulud / ForceMemo plant their own GHA workflow.
# A workflow file inside a *PyPI package* is on its own
# already a yellow flag; pattern-match the worm signatures.
findings.extend(check_workflow_file(content, filename, package))
except (zipfile.BadZipFile, tarfile.TarError, EOFError, OSError) as exc:
# The archive cannot be opened or is structurally broken. A
# benign wheel/sdist always opens; a malformed one is either a
# transport corruption (treat as scan failure) or a deliberate
# attempt to bypass scanners that swallow archive errors.
findings.append(
Finding(
CRITICAL,
package,
os.path.basename(archive_path),
"archive_corrupted",
f"{type(exc).__name__}: {exc}"[:240],
)
)
return findings
# ---------------------------------------------------------------------------
# Download packages
# ---------------------------------------------------------------------------
_RE_PYPI_SPEC_VERSION = re.compile(r"==\s*([A-Za-z0-9_.\-+!]+)")
def _check_blocked_pypi_versions(
specs: list[str],
) -> tuple[list[str], list[Finding]]:
"""Filter ``specs`` against ``BLOCKED_PYPI_VERSIONS``.
Returns ``(safe_specs, findings)``. Each blocked spec emits a CRITICAL
``Finding`` and is removed from the returned spec list so the caller
never fetches the malicious tarball. Specs without an ``==X.Y.Z`` pin
pass through unchanged -- pip will resolve them at download time and
the existing scanners will catch the payload via the IOC regexes.
"""
safe: list[str] = []
findings: list[Finding] = []
for spec in specs:
name = _extract_pkg_name(spec).lower()
blocked = BLOCKED_PYPI_VERSIONS.get(name, set())
if not blocked:
safe.append(spec)
continue
m = _RE_PYPI_SPEC_VERSION.search(spec)
version = m.group(1) if m else None
if version is not None and version in blocked:
findings.append(
Finding(
CRITICAL,
f"{name}=={version}",
"<spec>",
"blocked-known-malicious",
f"{name}=={version} is on the BLOCKED_PYPI_VERSIONS list",
)
)
# Drop the spec; do not download.
continue
safe.append(spec)
return safe, findings
def _pip_download_env() -> dict[str, str]:
"""Return a scrubbed environment for invoking `pip download`.
Hostile shells / CI configs can override the index with PIP_INDEX_URL,
PIP_EXTRA_INDEX_URL, or a user `pip.conf`. We strip every PIP_*
override and route the resolver explicitly at PyPI. PIP_CONFIG_FILE
is forced to /dev/null so a stray ~/.pip/pip.conf with an
extra-index-url cannot bypass the pin.
"""
env = {**os.environ}
# Drop any user override.
for key in [k for k in env if k.startswith("PIP_")]:
env.pop(key, None)
env["PIP_INDEX_URL"] = "https://pypi.org/simple"
env["PIP_EXTRA_INDEX_URL"] = ""
env["PIP_CONFIG_FILE"] = "/dev/null"
env["PIP_DISABLE_PIP_VERSION_CHECK"] = "1"
return env
# Pip resolver flags shared by both download branches. Pinning the
# index URL on the CLI is belt + braces with the env scrub above.
# `--no-build-isolation` is deliberately NOT set; we never invoke
# setup.py at all because of `--only-binary :all:`.
_PIP_DOWNLOAD_PIN_FLAGS = [
"--index-url",
"https://pypi.org/simple",
"--only-binary",
":all:",
]
# Strip any character that could escape `dest` via `os.path.join`. This
# is the last line of defence before `pkg_dir = os.path.join(dest, ...)`
# so a spec like `../../etc/foo==1.0` cannot land outside the temp tree.
_RE_PKG_NAME_SANITIZE = re.compile(r"[^A-Za-z0-9._-]")
def download_packages(
specs: list[str],
dest: str,
*,
with_deps: bool = False,
) -> tuple[list[tuple[str, str]], list[str]]:
"""Download packages to dest using pip download. NEVER installs.
Returns ``(results, download_errors)`` where ``results`` is a list of
``(spec_or_name, filepath)`` for every downloaded archive and
``download_errors`` is a list of one-line transport-failure summaries.
A non-empty ``download_errors`` MUST cause the caller to exit non-zero
even if no findings were produced; a silent ``0 findings, scan
incomplete`` is the bug class this return-shape was widened to fix.
When with_deps=True, downloads the full transitive dependency tree
in a single pip invocation (all archives land in one flat dir).
When with_deps=False (default), downloads each spec individually
with --no-deps.
"""
results: list[tuple[str, str]] = []
download_errors: list[str] = []
env = _pip_download_env()
if with_deps:
# Single pip download call for all specs + their transitive deps.
# `--only-binary :all:` refuses sdists so we never execute a
# setup.py just to learn dependency metadata; combined with the
# scrubbed env, pip is wired hard at pypi.org.
os.makedirs(dest, exist_ok = True)
cmd = [
sys.executable,
"-m",
"pip",
"download",
*_PIP_DOWNLOAD_PIN_FLAGS,
"--dest",
dest,
] + specs
try:
proc = subprocess.run(
cmd,
capture_output = True,
text = True,
timeout = 600, # transitive resolution can be slow
env = env,
)
if proc.returncode != 0:
msg = (
f"pip download (with deps) failed: " f"{proc.stderr.strip()[:500]}"
)
print(f" [ERROR] {msg}", file = sys.stderr)
download_errors.append(msg)
except subprocess.TimeoutExpired:
msg = "pip download (with deps) timed out"
print(f" [ERROR] {msg}", file = sys.stderr)
download_errors.append(msg)
# Collect every archive that landed in dest
for fname in sorted(os.listdir(dest)):
fpath = os.path.join(dest, fname)
if os.path.isfile(fpath):
# Derive package name from filename
pkg_name = fname.split("-")[0].replace("_", "-").lower()
results.append((pkg_name, fpath))
else:
for spec in specs:
raw_name = _extract_pkg_name(spec)
# Sanitize before joining into `dest` so a hostile spec
# cannot path-traverse out of the destination directory.
safe_name = _RE_PKG_NAME_SANITIZE.sub("_", raw_name) or "_pkg"
pkg_dir = os.path.join(dest, safe_name)
os.makedirs(pkg_dir, exist_ok = True)
cmd = [
sys.executable,
"-m",
"pip",
"download",
"--no-deps",
*_PIP_DOWNLOAD_PIN_FLAGS,
"--dest",
pkg_dir,
spec,
]
try:
proc = subprocess.run(
cmd,
capture_output = True,
text = True,
timeout = 120,
env = env,
)
if proc.returncode != 0:
msg = (
f"pip download failed for {spec}: "
f"{proc.stderr.strip()[:500]}"
)
print(f" [ERROR] {msg}", file = sys.stderr)
download_errors.append(msg)
continue
except subprocess.TimeoutExpired:
msg = f"pip download timed out for {spec}"
print(f" [ERROR] {msg}", file = sys.stderr)
download_errors.append(msg)
continue
# Find downloaded file(s)
for fname in os.listdir(pkg_dir):
fpath = os.path.join(pkg_dir, fname)
if os.path.isfile(fpath):
results.append((spec, fpath))
return results, download_errors
# ---------------------------------------------------------------------------
# Parse requirements files
# ---------------------------------------------------------------------------
_RE_NAME = re.compile(r"^([A-Za-z0-9]([A-Za-z0-9._-]*[A-Za-z0-9])?)")
def _extract_pkg_name(spec: str) -> str:
"""Extract the package name from a pip spec string."""
m = _RE_NAME.match(spec)
return (
m.group(1)
if m
else spec.split("==")[0].split(">=")[0].split("<=")[0].split("[")[0].strip()
)
def parse_requirements(req_files: list[str]) -> list[dict]:
"""Parse requirements files into a list of dicts with source tracking.
Each dict has keys: spec, name, source_file, line_num, raw_line, is_git.
"""
results = []
for req_file in req_files:
abs_path = os.path.abspath(req_file)
try:
with open(req_file) as f:
for line_num, raw_line in enumerate(f, 1):
line = raw_line.strip()
# Skip blanks, comments, options, nested -r
if not line or line.startswith("#") or line.startswith("-"):
continue
is_git = line.startswith("git+") or "git+" in line.split("#")[0]
# Strip inline comments and environment markers for spec
spec = line.split("#")[0].strip()
spec = spec.split(";")[0].strip()
if not spec:
continue
name = _extract_pkg_name(spec) if not is_git else spec
results.append(
{
"spec": spec,
"name": name,
"source_file": abs_path,
"line_num": line_num,
"raw_line": raw_line.rstrip("\n"),
"is_git": is_git,
}
)
except FileNotFoundError:
print(f" [ERROR] Requirements file not found: {req_file}", file = sys.stderr)
return results
def get_downloaded_version(archive_path: str) -> str | None:
"""Extract version from wheel/sdist filename.
Wheel: {name}-{version}(-...).whl
Sdist: {name}-{version}.tar.gz / .zip
"""
basename = os.path.basename(archive_path)
# Wheel: name-version-pytag-abitag-platform.whl
if basename.endswith(".whl"):
parts = basename[:-4].split("-")
if len(parts) >= 2:
return parts[1]
# Sdist: name-version.tar.gz / .tar.bz2 / .zip
for ext in (".tar.gz", ".tar.bz2", ".tar.xz", ".tar", ".zip"):
if basename.endswith(ext):
stem = basename[: -len(ext)]
parts = stem.rsplit("-", 1)
if len(parts) == 2:
return parts[1]
return None
# ---------------------------------------------------------------------------
# Display
# ---------------------------------------------------------------------------
def severity_color(sev: str) -> str:
colors = {CRITICAL: "\033[91m", HIGH: "\033[93m", MEDIUM: "\033[33m"}
return colors.get(sev, "")
RESET = "\033[0m"
def print_findings(findings: list[Finding]) -> None:
if not findings:
print("\n All clean. No suspicious patterns found.")
return
# Sort by severity
findings.sort(key = lambda f: SEVERITY_ORDER.get(f.severity, 99))
print(f"\n {'=' * 72}")
print(f" SCAN RESULTS: {len(findings)} finding(s)")
print(f" {'=' * 72}")
for i, f in enumerate(findings, 1):
color = severity_color(f.severity)
print(f"\n [{i}] {color}{f.severity}{RESET} {f.check}")
print(f" Package: {f.package}")
print(f" File: {f.filename}")
if f.evidence:
for eline in f.evidence.split("\n"):
print(f" Evidence: {eline}")
print(f"\n {'=' * 72}")
crits = sum(1 for f in findings if f.severity == CRITICAL)
highs = sum(1 for f in findings if f.severity == HIGH)
meds = sum(1 for f in findings if f.severity == MEDIUM)
parts = []
if crits:
parts.append(f"{crits} CRITICAL")
if highs:
parts.append(f"{highs} HIGH")
if meds:
parts.append(f"{meds} MEDIUM")
print(f" Summary: {', '.join(parts)}")
# ---------------------------------------------------------------------------
# PyPI version queries and --fix logic
# ---------------------------------------------------------------------------
def version_sort_key(v: str) -> tuple:
"""PEP 440-ish sort key using stdlib only.
Handles: epoch!, major.minor.patch, pre/post/dev suffixes.
Returns a tuple that sorts in ascending version order.
"""
epoch = 0
if "!" in v:
epoch_str, v = v.split("!", 1)
try:
epoch = int(epoch_str)
except ValueError:
pass
# Split off pre/post/dev suffixes
v_clean = re.split(
r"[-_.]?(a|alpha|b|beta|rc|c|pre|preview|dev|post)", v, maxsplit = 1, flags = re.I
)
base = v_clean[0]
suffix = v[len(base) :]
# Parse numeric parts
parts = []
for seg in base.split("."):
try:
parts.append(int(seg))
except ValueError:
parts.append(0)
# Pad to at least 3 parts
while len(parts) < 3:
parts.append(0)
# Suffix ordering: dev < alpha < beta < rc < (none) < post
suffix_lower = suffix.lower().lstrip(".-_")
if suffix_lower.startswith("dev"):
suffix_rank = -4
elif suffix_lower.startswith(("a", "alpha")):
suffix_rank = -3
elif suffix_lower.startswith(("b", "beta")):
suffix_rank = -2
elif suffix_lower.startswith(("rc", "c", "pre", "preview")):
suffix_rank = -1
elif suffix_lower.startswith("post"):
suffix_rank = 1
else:
suffix_rank = 0 # stable release
return (epoch, tuple(parts), suffix_rank, suffix)
def fetch_pypi_versions(name: str) -> list[str]:
"""Fetch all available versions for a package from PyPI JSON API.
Returns versions sorted ascending by version_sort_key.
"""
url = f"https://pypi.org/pypi/{name}/json"
try:
req = urllib.request.Request(url, headers = {"Accept": "application/json"})
with urllib.request.urlopen(req, timeout = 30) as resp:
data = json.loads(resp.read().decode("utf-8"))
except Exception as e:
print(f" [ERROR] Failed to query PyPI for {name}: {e}", file = sys.stderr)
return []
versions = list(data.get("releases", {}).keys())
versions.sort(key = version_sort_key)
return versions
def find_safe_version(
name: str,
bad_ver: str,
tmpdir: str,
max_search: int = 10,
) -> str | None:
"""Search backward from bad_ver for a clean version.
Downloads and scans up to max_search older versions.
Returns the first clean version found, or None.
"""
versions = fetch_pypi_versions(name)
if not versions:
print(f" [WARN] No versions found on PyPI for {name}", file = sys.stderr)
return None
# Find index of bad version
try:
bad_idx = versions.index(bad_ver)
except ValueError:
# bad_ver might have been resolved to a different string; search by sort key
bad_key = version_sort_key(bad_ver)
bad_idx = None
for i, v in enumerate(versions):
if version_sort_key(v) >= bad_key:
bad_idx = i
break
if bad_idx is None:
bad_idx = len(versions) - 1
# Search backward from the version before bad_ver
candidates = versions[:bad_idx]
candidates.reverse() # newest-first among older versions
candidates = candidates[:max_search]
if not candidates:
print(f" [WARN] No older versions to scan for {name}", file = sys.stderr)
return None
print(f" Searching {len(candidates)} older version(s) of {name}...")
for ver in candidates:
spec = f"{name}=={ver}"
scan_dir = os.path.join(tmpdir, f"{name}_{ver}")
os.makedirs(scan_dir, exist_ok = True)
downloaded = download_packages([spec], scan_dir)
if not downloaded:
continue
clean = True
for _, archive_path in downloaded:
findings = scan_archive(archive_path, name)
# Delete archive immediately after scanning
try:
os.remove(archive_path)
except OSError:
pass
crit_findings = [f for f in findings if f.severity == CRITICAL]
if crit_findings:
clean = False
print(f" {ver} -- CRITICAL finding(s), skipping")
break
# Clean up scan dir for this version
shutil.rmtree(scan_dir, ignore_errors = True)
if clean:
print(f" {ver} -- clean!")
return ver
return None
def update_req_line(raw_line: str, safe_ver: str, old_ver: str | None) -> str:
"""Rewrite a single requirements line to pin to safe_ver.
Preserves env markers, inline comments, and line format.
Appends a comment noting the pin.
"""
# Split off inline comment
comment = ""
if " #" in raw_line:
code_part, comment = raw_line.split(" #", 1)
comment = " #" + comment
else:
code_part = raw_line
# Split off env markers (after semicolon)
marker = ""
if ";" in code_part:
code_part, marker = code_part.split(";", 1)
marker = ";" + marker
# Replace version specifier
# Match patterns like ==1.2.3, >=1.2, ~=1.0, <=2.0, !=1.1, or bare name
rewritten = re.sub(
r"([A-Za-z0-9._-]+)\s*(?:[><=!~]=?[^;#,\s]*(?:\s*,\s*[><=!~]=?[^;#,\s]*)*)?",
lambda m: f"{m.group(1)}=={safe_ver}",
code_part.strip(),
count = 1,
)
was_note = f" (was {old_ver})" if old_ver else ""
pin_comment = f" # pinned by pth_scanner{was_note}"
return f"{rewritten}{marker}{pin_comment}"
def update_req_file(filepath: str, updates: dict[int, str]) -> None:
"""Apply line-level updates to a requirements file.
updates: {line_num (1-indexed): new_line_text}
Writes atomically: stage in a sibling tmp file on the same
filesystem, fsync, then `os.replace` over the original. A SIGKILL
or power loss mid-write therefore either leaves the original
intact or leaves the fully new file -- never a half-written
requirements file (which would silently re-introduce a malicious
pin).
"""
with open(filepath) as f:
lines = f.readlines()
for line_num, new_text in updates.items():
idx = line_num - 1
if 0 <= idx < len(lines):
# Preserve original line ending
ending = "\n" if lines[idx].endswith("\n") else ""
lines[idx] = new_text + ending
dirpath = os.path.dirname(os.path.abspath(filepath)) or "."
fd, tmp_path = tempfile.mkstemp(
prefix = ".req_fix.",
dir = dirpath,
)
try:
with os.fdopen(fd, "w") as f:
f.writelines(lines)
f.flush()
os.fsync(f.fileno())
os.replace(tmp_path, filepath)
except Exception:
# Best effort cleanup; the destination was never touched.
try:
os.unlink(tmp_path)
except OSError:
pass
raise
def _run_fix(
critical_pkgs: set[str],
entries: list[dict],
max_search: int,
) -> None:
"""Run the --fix flow: find safe versions, update requirements files."""
# Map package names to their entries for source tracking
pkg_entries: dict[str, list[dict]] = {}
for e in entries:
norm = e["name"].lower().replace("-", "_").replace(".", "_")
pkg_entries.setdefault(norm, []).append(e)
changes_summary: list[str] = []
with tempfile.TemporaryDirectory(prefix = "pth_fix_") as tmpdir:
for pkg_name in sorted(critical_pkgs):
norm = pkg_name.lower().replace("-", "_").replace(".", "_")
related = pkg_entries.get(norm, [])
# Check if any are git deps
git_entries = [e for e in related if e["is_git"]]
if git_entries:
for e in git_entries:
src = e["source_file"] or "CLI"
print(
f" [SKIP] {pkg_name} is a git URL dep in {src}, cannot auto-update"
)
changes_summary.append(f" SKIP {pkg_name} (git URL)")
continue
# Get the currently resolved version
# Try to extract from the spec (e.g. name==1.2.3)
current_ver = None
for e in related:
spec = e["spec"]
if "==" in spec:
current_ver = spec.split("==", 1)[1].split(";")[0].strip()
break
if not current_ver:
# If no pinned version, download to find what pip resolves
dl_dir = os.path.join(tmpdir, f"resolve_{pkg_name}")
os.makedirs(dl_dir, exist_ok = True)
downloaded = download_packages([pkg_name], dl_dir)
if downloaded:
current_ver = get_downloaded_version(downloaded[0][1])
# Delete resolution download immediately
shutil.rmtree(dl_dir, ignore_errors = True)
if not current_ver:
print(
f" [WARN] Cannot determine current version of {pkg_name}, skipping fix"
)
changes_summary.append(f" SKIP {pkg_name} (version unknown)")
continue
print(f"\n Fixing {pkg_name} (current: {current_ver})...")
safe_ver = find_safe_version(pkg_name, current_ver, tmpdir, max_search)
if not safe_ver:
print(
f" [FAIL] No safe version found for {pkg_name} within {max_search} older versions"
)
changes_summary.append(
f" FAIL {pkg_name}=={current_ver} -> no safe version found"
)
continue
print(f" [OK] {pkg_name}: {current_ver} -> {safe_ver}")
changes_summary.append(
f" FIX {pkg_name}=={current_ver} -> {pkg_name}=={safe_ver}"
)
# Update all occurrences in requirements files
file_updates: dict[str, dict[int, str]] = {}
for e in related:
if e["source_file"] is None:
# CLI arg, no file to update
print(f" (CLI arg, no file to update)")
continue
new_line = update_req_line(e["raw_line"], safe_ver, current_ver)
file_updates.setdefault(e["source_file"], {})[e["line_num"]] = new_line
print(f" {e['source_file']}:{e['line_num']}")
print(f" - {e['raw_line']}")
print(f" + {new_line}")
for filepath, updates in file_updates.items():
update_req_file(filepath, updates)
# Print summary
print(f"\n {'=' * 72}")
print(f" FIX SUMMARY")
print(f" {'=' * 72}")
for line in changes_summary:
print(line)
print(f"\n Re-run without --fix to verify the scan is clean.")
# ---------------------------------------------------------------------------
# Directory scanning
# ---------------------------------------------------------------------------
def _find_requirements_files(root: str) -> list[str]:
"""Recursively find pip requirements files under root.
Matches:
- requirements*.txt (e.g. requirements.txt, requirements-dev.txt)
- *.txt inside directories named 'requirements' (e.g. requirements/base.txt)
Skips:
- .egg-info dirs, venvs, hidden dirs, __pycache__, node_modules
"""
import fnmatch
skip_dirs = {"__pycache__", "node_modules", "venv", ".venv", "site-packages"}
results = []
for dirpath, dirnames, filenames in os.walk(root):
# Skip hidden dirs and known non-requirement dirs
dirnames[:] = [
d
for d in dirnames
if not d.startswith(".")
and d not in skip_dirs
and not d.endswith(".egg-info")
]
dirname = os.path.basename(dirpath)
for fname in sorted(filenames):
if not fname.endswith(".txt"):
continue
# Match requirements*.txt anywhere
if fnmatch.fnmatch(fname.lower(), "requirements*.txt"):
results.append(os.path.join(dirpath, fname))
# Match *.txt inside a directory named "requirements"
elif dirname == "requirements":
results.append(os.path.join(dirpath, fname))
return sorted(results)
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def main() -> int:
parser = argparse.ArgumentParser(
description = __doc__,
formatter_class = argparse.RawDescriptionHelpFormatter,
)
parser.add_argument(
"packages",
nargs = "*",
help = "Package specs (e.g. requests==2.32.5 fastapi)",
)
parser.add_argument(
"-r",
"--requirements",
action = "append",
default = [],
metavar = "FILE",
help = "Requirements file(s) to scan",
)
parser.add_argument(
"-d",
"--scan-dir",
action = "append",
default = [],
metavar = "DIR",
help = "Recursively find requirements*.txt files in DIR",
)
parser.add_argument(
"--with-deps",
action = "store_true",
help = "Also download and scan transitive dependencies (full dependency tree)",
)
parser.add_argument(
"--fix",
action = "store_true",
help = "Auto-search for safe versions and update requirements files",
)
parser.add_argument(
"--max-search",
type = int,
default = 10,
metavar = "N",
help = "Max older versions to scan when searching for safe version (default: 10)",
)
args = parser.parse_args()
# --scan-dir: auto-discover requirements files
req_files = list(args.requirements)
for scan_dir in args.scan_dir:
found = _find_requirements_files(scan_dir)
if found:
print(f" Found {len(found)} requirements file(s) in {scan_dir}/")
for f in found:
print(f" {f}")
req_files.extend(found)
else:
print(
f" [WARN] No requirements files found in {scan_dir}/", file = sys.stderr
)
# Build unified entry list: list of dicts with source tracking
entries: list[dict] = []
# CLI args -> entries with no source file
for pkg in args.packages or []:
entries.append(
{
"spec": pkg,
"name": _extract_pkg_name(pkg),
"source_file": None,
"line_num": None,
"raw_line": pkg,
"is_git": pkg.startswith("git+") or "git+" in pkg,
}
)
# Requirements files -> entries with source tracking
if req_files:
entries.extend(parse_requirements(req_files))
if not entries:
parser.print_help()
return 2
# Deduplicate by normalized name, preserving first occurrence
seen: set[str] = set()
unique_entries: list[dict] = []
for e in entries:
key = e["name"].lower().replace("-", "_").replace(".", "_")
if key not in seen:
seen.add(key)
unique_entries.append(e)
specs = [e["spec"] for e in unique_entries]
mode_label = " (with transitive deps)" if args.with_deps else ""
print(f" Scanning {len(specs)} package(s){mode_label}...")
all_findings: list[Finding] = []
# Hard pin-block: refuse to download known-malicious PyPI versions.
specs, blocked_findings = _check_blocked_pypi_versions(specs)
all_findings.extend(blocked_findings)
tmpdir = tempfile.mkdtemp(prefix = "pth_scan_")
atexit.register(lambda d = tmpdir: shutil.rmtree(d, ignore_errors = True))
download_errors: list[str] = []
try:
downloaded, download_errors = download_packages(
specs,
tmpdir,
with_deps = args.with_deps,
)
print(f" Downloaded {len(downloaded)} archive(s).")
for spec, archive_path in downloaded:
pkg_name = _extract_pkg_name(spec)
findings = scan_archive(archive_path, pkg_name)
all_findings.extend(findings)
# Delete archive immediately after scanning
try:
os.remove(archive_path)
except OSError:
pass
finally:
shutil.rmtree(tmpdir, ignore_errors = True)
print_findings(all_findings)
# --fix mode: auto-search for safe versions
if args.fix and all_findings:
critical_pkgs = {f.package for f in all_findings if f.severity == CRITICAL}
if critical_pkgs:
print(
f"\n --fix: Searching for safe versions of {len(critical_pkgs)} CRITICAL package(s)..."
)
_run_fix(critical_pkgs, entries, args.max_search)
# Surface any pip-download failures BEFORE the scan-result exit code so
# an empty / partial download cannot mask itself as "0 findings, all
# clean". This is item (4) of the silent-failure hardening: an
# unresolvable spec or PyPI timeout used to print to stderr and exit 0.
if download_errors:
print(
f"\n {'=' * 72}\n"
f" SCAN INCOMPLETE: {len(download_errors)} pip download "
f"failure(s):\n"
f" {'=' * 72}",
file = sys.stderr,
)
for err in download_errors:
print(f" [ERROR] {err}", file = sys.stderr)
print(
" Refusing to report 'all clean' on a partial scan; " "exiting 2.",
file = sys.stderr,
)
return 2
# Exit code: 1 if any CRITICAL or HIGH
if any(f.severity in (CRITICAL, HIGH) for f in all_findings):
return 1
return 0
if __name__ == "__main__":
sys.exit(main())