unsloth/scripts/notebook_to_python.py
Daniel Han bfb5c2872c CI(notebooks): cross-repo validator for unslothai/notebooks
New PR-time + scheduled workflow that walks every nb/, kaggle/, and
original_template/ notebook in unslothai/notebooks and statically
validates the install cells and user-facing code against:

  - googlecolab/backend-info pip-freeze.gpu.txt (Colab oracle, refreshed
    on every run; fallback snapshot committed under scripts/data/).
  - PyPI metadata for transitive constraint resolution.
  - Hardcoded torch/torchcodec ABI table.
  - Hardcoded peft/torchao floor table.
  - The live unsloth + trl API surface, introspected under
    tests/_zoo_aggressive_cuda_spoof.py so the api job runs on a
    GPU-less ubuntu-latest runner.

Catches the bug classes from notebooks#258 / #260 / #261 / #264 / #221
and commit 51b1462 mechanically:

  R-INST-001  forbid git+ HEAD installs (notebooks#221)
  R-INST-002  --no-deps + transitive constraint violation
  R-INST-003  peft 0.19+ requires torchao 0.16.0+ (notebooks#258)
  R-INST-004  torch <-> torchcodec ABI mismatch (notebooks#261a)
  R-INST-005  --no-deps transformers + Colab tokenizers drift
              (notebooks#261b / #264)
  R-INST-006  forbid !!pip
  R-API-003   adamw_torch_fused -> adamw_8bit hint (warning)
  R-API-004   notebook references symbols outside live unsloth surface
  R-EXC-001   DONT_UPDATE_EXCEPTIONS notebooks must satisfy the same
              policy clauses as generated notebooks (notebooks#260)
  R-DRIFT-001 update_all_notebooks.py emits no diff (commit 51b1462)
  R-CONV-001  notebook_to_python.py converts every .ipynb cleanly

Files:
  .github/workflows/notebooks-ci.yml          PR-time + cron + dispatch
  scripts/notebook_validator.py               1148 LOC, single-file
  scripts/notebook_to_python.py               battle-tested converter
  scripts/data/colab_pip_freeze.gpu.txt       fallback snapshot
  scripts/data/colab_to_cpu_pin.json          cu128 -> CPU wheel map
  tests/notebooks/test_validator_fixtures.py  21 golden tests, all green

CPU-only by design. The api-introspect job follows the existing
consolidated-tests-ci spoof pattern (lines 309/417/536/626/826/1081/
1586/1998 of consolidated-tests-ci.yml). The smoke-install job is
opt-in via workflow_dispatch and stubs torchcodec since no CPU wheel
exists.

Validated on the live unslothai/notebooks@7af0ac0f tree: every fixture
test passes, exceptions check is silent, lint surfaces 27 errors + 6
warnings on real notebooks (mix of #258-class regressions in 6 nb/
notebooks the previous template fixes did not reach, plus 14
git+-HEAD installs in hand-tuned exception notebooks).
2026-05-07 11:42:57 +00:00

292 lines
9 KiB
Python

#!/usr/bin/env python
# coding: utf-8
"""
Convert Jupyter notebooks (.ipynb) to executable Python scripts (.py).
Converts IPython magics to plain Python:
!command -> subprocess.run('command', shell=True)
%cd path -> os.chdir('path')
%env VAR=value -> os.environ['VAR'] = 'value'
%%file filename -> with open('filename', 'w') as f: f.write(...)
%%capture -> (skipped)
/content/... -> _WORKING_DIR + /...
"""
import nbformat
import re
import sys
import os
import urllib.request
import urllib.parse
from pathlib import Path
def needs_fstring(cmd: str) -> bool:
"""Check if command has Python variable interpolation like {var_name}."""
pattern = r"(?<!\$)\{([a-zA-Z_][a-zA-Z0-9_]*)\}"
return bool(re.search(pattern, cmd))
def github_blob_to_raw(url: str) -> str:
"""Convert GitHub blob URL to raw URL."""
# https://github.com/user/repo/blob/branch/path -> https://raw.githubusercontent.com/user/repo/branch/path
if "github.com" in url and "/blob/" in url:
url = url.replace("github.com", "raw.githubusercontent.com")
url = url.replace("/blob/", "/")
return url
def download_notebook(url: str) -> tuple[str, str]:
"""Download notebook from URL. Returns (content, filename)."""
# Convert blob URL to raw if needed
raw_url = github_blob_to_raw(url)
# Extract filename from URL
parsed = urllib.parse.urlparse(raw_url)
filename = os.path.basename(urllib.parse.unquote(parsed.path))
# Download
print(f"Downloading {url}...")
with urllib.request.urlopen(raw_url, timeout = 60) as response:
content = response.read().decode("utf-8")
return content, filename
def is_url(path: str) -> bool:
"""Check if path is a URL."""
return path.startswith("http://") or path.startswith("https://")
def replace_colab_paths(source: str) -> str:
"""Replace Colab-specific /content/ paths with current working directory."""
# Replace /content/ with f-string using _WORKING_DIR
source = source.replace('"/content/', 'f"{_WORKING_DIR}/')
source = source.replace("'/content/", "f'{_WORKING_DIR}/")
return source
def convert_cell_to_python(source: str) -> str:
"""Convert a cell's IPython magics to plain Python."""
lines = source.split("\n")
result = []
i = 0
while i < len(lines):
line = lines[i]
stripped = line.strip()
indent = line[: len(line) - len(line.lstrip())]
# Skip %%capture
if stripped.startswith("%%capture"):
i += 1
continue
# Handle %%file magic
if stripped.startswith("%%file "):
filename = stripped[7:].strip()
file_lines = []
i += 1
while i < len(lines):
file_lines.append(lines[i])
i += 1
file_content = "\n".join(file_lines)
file_content = file_content.replace('"""', r"\"\"\"")
result.append(f'{indent}with open({filename!r}, "w") as _f:')
result.append(f'{indent} _f.write("""{file_content}""")')
continue
# Handle ! shell commands
if stripped.startswith("!"):
cmd_lines = [stripped[1:]]
while cmd_lines[-1].rstrip().endswith("\\") and i + 1 < len(lines):
i += 1
cmd_lines.append(lines[i].strip())
full_cmd = "\n".join(cmd_lines)
f_prefix = "f" if needs_fstring(full_cmd) else ""
if "\n" in full_cmd:
escaped_cmd = full_cmd.replace('"""', r"\"\"\"")
if escaped_cmd.rstrip().endswith('"'):
escaped_cmd = escaped_cmd.rstrip() + " "
result.append(
f'{indent}subprocess.run({f_prefix}"""{escaped_cmd}""", shell=True)'
)
else:
result.append(
f"{indent}subprocess.run({f_prefix}{full_cmd!r}, shell=True)"
)
# %cd path -> os.chdir(path)
elif stripped.startswith("%cd "):
path = stripped[4:].strip()
result.append(f"{indent}os.chdir({path!r})")
# %env VAR=value
elif stripped.startswith("%env ") and "=" in stripped:
match = re.match(r"%env\s+(\w+)=(.+)", stripped)
if match:
var, val = match.groups()
result.append(f"{indent}os.environ[{var!r}] = {val!r}")
# %env VAR
elif stripped.startswith("%env "):
var = stripped[5:].strip()
result.append(f"{indent}os.environ.get({var!r})")
# %pwd
elif stripped == "%pwd":
result.append(f"{indent}os.getcwd()")
else:
result.append(line)
i += 1
return "\n".join(result)
def convert_notebook(notebook_content: str, source_name: str = "notebook") -> str:
"""Convert notebook JSON content to Python script."""
# Parse notebook
if isinstance(notebook_content, str):
notebook = nbformat.reads(notebook_content, as_version = 4)
else:
notebook = notebook_content
lines = [
"#!/usr/bin/env python",
"# coding: utf-8",
f"# Converted from: {source_name}",
"",
"import subprocess",
"import os",
"import sys",
"import re",
"",
"# Capture original packages before any installs",
"_original_packages = subprocess.run(",
" [sys.executable, '-m', 'pip', 'freeze'],",
" capture_output=True, text=True",
").stdout",
"",
"# Working directory (replaces Colab's /content/)",
"_WORKING_DIR = os.getcwd()",
"",
]
for cell in notebook.cells:
source = cell.source.strip()
if not source:
continue
if cell.cell_type == "code":
converted = convert_cell_to_python(source)
converted = replace_colab_paths(converted)
lines.append(converted)
lines.append("")
elif cell.cell_type == "markdown":
for line in source.split("\n"):
lines.append(f"# {line}")
lines.append("")
# Add package restoration at the end
lines.extend(
[
"",
"# Restore original packages (install one by one, skip failures)",
"for _pkg in _original_packages.strip().split('\\n'):",
" if _pkg:",
" subprocess.run([sys.executable, '-m', 'pip', 'install', _pkg, '-q'],",
" stderr=subprocess.DEVNULL)",
"",
]
)
return "\n".join(lines)
def convert_notebook_to_script(source: str, output_dir: str | None = None):
"""
Convert a notebook to Python script.
Args:
source: Local file path or URL to notebook
output_dir: Output directory (optional, defaults to current directory)
"""
if is_url(source):
content, filename = download_notebook(source)
source_name = source
else:
filename = os.path.basename(source)
with open(source, "r", encoding = "utf-8") as f:
content = f.read()
source_name = source
# Generate output filename
output_filename = filename.replace(".ipynb", ".py")
# Clean up filename
output_filename = (
output_filename.replace("(", "").replace(")", "").replace("-", "_")
)
# Add output directory if specified
if output_dir:
output_path = os.path.join(output_dir, output_filename)
else:
output_path = output_filename
# Convert
script = convert_notebook(content, source_name)
# Write output
with open(output_path, "w", encoding = "utf-8") as f:
f.write(script)
print(f"Converted {source} -> {output_path}")
return output_path
def main():
import argparse
class Formatter(
argparse.ArgumentDefaultsHelpFormatter, argparse.RawDescriptionHelpFormatter
):
pass
parser = argparse.ArgumentParser(
description = __doc__,
formatter_class = Formatter,
epilog = """
Examples:
python notebook_to_python.py notebook.ipynb
python notebook_to_python.py -o scripts/ notebook1.ipynb notebook2.ipynb
python notebook_to_python.py --output ./converted https://github.com/user/repo/blob/main/notebook.ipynb
python notebook_to_python.py https://github.com/unslothai/notebooks/blob/main/nb/Oute_TTS_(1B).ipynb
""",
)
parser.add_argument(
"notebooks", nargs = "+", help = "Notebook files or URLs to convert."
)
parser.add_argument(
"-o", "--output", dest = "output_dir", default = ".", help = "Output directory."
)
args = parser.parse_args()
# Create output directory if needed
os.makedirs(args.output_dir, exist_ok = True)
for source in args.notebooks:
try:
convert_notebook_to_script(
source, output_dir = args.output_dir if args.output_dir != "." else None
)
except Exception as e:
print(f"ERROR converting {source}: {e}")
if __name__ == "__main__":
main()