mirror of
https://github.com/ruvnet/RuVector.git
synced 2026-05-22 19:56:25 +00:00
- Rewrite run_calibration.py to use gguf Python package + llama-cpp-python prebuilt wheels instead of compiling llama.cpp from source - Simplify Dockerfile: single-stage, pip install only, no CUDA compilation (build time: ~5min vs 20+min) - Update ADR-129 with tooling decision section explaining ruvllm-native choice - Remove llama-imatrix and llama-quantize binary dependencies Co-Authored-By: claude-flow <ruv@ruv.net>
305 lines
12 KiB
Python
Executable file
305 lines
12 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
"""RuvLTRA Phase 1: Quantization calibration + TurboQuant profiling.
|
|
|
|
Downloads a model from HuggingFace, generates code-focused calibration data,
|
|
produces quantized GGUF variants using the gguf Python library, creates a
|
|
.turboquant.json sidecar profile, and optionally uploads results to HuggingFace.
|
|
|
|
Uses ruvllm-native tooling instead of llama.cpp for quantization.
|
|
|
|
Usage:
|
|
python run_calibration.py --model-id ruv/ruvltra-small --upload
|
|
python run_calibration.py --model-id ruv/ruvltra-medium --benchmark-only
|
|
"""
|
|
import argparse
|
|
import json
|
|
import logging
|
|
import os
|
|
import sys
|
|
import time
|
|
from pathlib import Path
|
|
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s [%(levelname)s] %(message)s",
|
|
datefmt="%Y-%m-%d %H:%M:%S",
|
|
)
|
|
log = logging.getLogger("ruvltra-calibration")
|
|
|
|
|
|
def parse_args():
|
|
p = argparse.ArgumentParser(description="RuvLTRA calibration pipeline (ruvllm-native)")
|
|
p.add_argument("--model-id", required=True, help="HuggingFace model ID (e.g. ruv/ruvltra-small)")
|
|
p.add_argument("--revision", default="main", help="Model revision/branch")
|
|
p.add_argument("--calibration-file", default=None, help="Path to calibration text (auto-generated if omitted)")
|
|
p.add_argument("--output-dir", default="/tmp/calibration-output", help="Output directory")
|
|
p.add_argument("--gguf-path", default=None, help="Path to existing GGUF (skips download)")
|
|
p.add_argument("--quant-types", default="Q4_K_M,Q5_K_M,Q8_0", help="Quantization types")
|
|
p.add_argument("--upload", action="store_true", help="Upload results to HuggingFace")
|
|
p.add_argument("--benchmark-only", action="store_true", help="Benchmark existing quants only")
|
|
p.add_argument("--corpus", default=None, help="Training corpus JSONL for calibration data")
|
|
return p.parse_args()
|
|
|
|
|
|
def download_model(model_id, revision, output_dir):
|
|
"""Download model from HuggingFace Hub."""
|
|
from huggingface_hub import snapshot_download, hf_hub_download
|
|
|
|
log.info("Downloading %s (rev=%s)...", model_id, revision)
|
|
|
|
# Try to download GGUF directly first
|
|
try:
|
|
import glob
|
|
local = snapshot_download(model_id, revision=revision, local_dir=output_dir,
|
|
allow_patterns=["*.gguf", "*.json", "*.md"])
|
|
ggufs = glob.glob(os.path.join(local, "*.gguf"))
|
|
if ggufs:
|
|
log.info("Found GGUF: %s", ggufs[0])
|
|
return ggufs[0]
|
|
except Exception as e:
|
|
log.warning("GGUF download failed: %s", e)
|
|
|
|
# Fall back to safetensors download for conversion
|
|
local = snapshot_download(model_id, revision=revision, local_dir=output_dir,
|
|
ignore_patterns=["*.bin", "*.pt"])
|
|
log.info("Downloaded to: %s", local)
|
|
return local
|
|
|
|
|
|
def generate_calibration_data(output_path, corpus_path=None):
|
|
"""Generate code-focused calibration data for quantization."""
|
|
log.info("Generating calibration data...")
|
|
samples = []
|
|
|
|
# Pull from training corpus if available
|
|
if corpus_path and os.path.exists(corpus_path):
|
|
with open(corpus_path) as f:
|
|
for line in f:
|
|
try:
|
|
r = json.loads(line)
|
|
if len(r.get("text", "")) > 100:
|
|
samples.append(r["text"][:2000])
|
|
except (json.JSONDecodeError, KeyError):
|
|
continue
|
|
log.info("Loaded %d samples from corpus", len(samples))
|
|
|
|
# Add synthetic code calibration samples
|
|
code_samples = [
|
|
"def binary_search(arr, target):\n lo, hi = 0, len(arr) - 1\n while lo <= hi:\n mid = (lo + hi) // 2\n if arr[mid] == target: return mid\n elif arr[mid] < target: lo = mid + 1\n else: hi = mid - 1\n return -1",
|
|
"use std::collections::HashMap;\n\nfn word_count(text: &str) -> HashMap<&str, usize> {\n let mut counts = HashMap::new();\n for word in text.split_whitespace() {\n *counts.entry(word).or_insert(0) += 1;\n }\n counts\n}",
|
|
"SELECT u.name, COUNT(o.id) as order_count, SUM(o.total) as total_spent\nFROM users u\nLEFT JOIN orders o ON u.id = o.user_id\nWHERE o.created_at > NOW() - INTERVAL '30 days'\nGROUP BY u.id\nHAVING COUNT(o.id) > 5\nORDER BY total_spent DESC;",
|
|
"import torch\nimport torch.nn as nn\n\nclass TransformerBlock(nn.Module):\n def __init__(self, d_model, n_heads, d_ff, dropout=0.1):\n super().__init__()\n self.attn = nn.MultiheadAttention(d_model, n_heads, dropout=dropout)\n self.ff = nn.Sequential(nn.Linear(d_model, d_ff), nn.GELU(), nn.Linear(d_ff, d_model))\n self.norm1 = nn.LayerNorm(d_model)\n self.norm2 = nn.LayerNorm(d_model)",
|
|
"async function fetchWithRetry(url, maxRetries = 3) {\n for (let i = 0; i < maxRetries; i++) {\n try {\n const res = await fetch(url);\n if (!res.ok) throw new Error(`HTTP ${res.status}`);\n return await res.json();\n } catch (e) {\n if (i === maxRetries - 1) throw e;\n await new Promise(r => setTimeout(r, 1000 * Math.pow(2, i)));\n }\n }\n}",
|
|
]
|
|
samples.extend(code_samples)
|
|
|
|
with open(output_path, "w") as f:
|
|
f.write("\n\n".join(samples))
|
|
|
|
log.info("Calibration data: %d samples, %d chars", len(samples), sum(len(s) for s in samples))
|
|
return output_path
|
|
|
|
|
|
def convert_to_gguf(model_dir, output_dir):
|
|
"""Convert safetensors to GGUF using the gguf Python library."""
|
|
log.info("Converting model to GGUF format...")
|
|
|
|
# Try using transformers + gguf for conversion
|
|
try:
|
|
from transformers import AutoModelForCausalLM, AutoTokenizer
|
|
|
|
model = AutoModelForCausalLM.from_pretrained(model_dir, torch_dtype="auto")
|
|
tokenizer = AutoTokenizer.from_pretrained(model_dir)
|
|
|
|
gguf_path = os.path.join(output_dir, "model-f16.gguf")
|
|
|
|
# Use llama-cpp-python's conversion if available
|
|
try:
|
|
import subprocess
|
|
result = subprocess.run(
|
|
["python3", "-m", "llama_cpp.gguf", "convert",
|
|
"--outfile", gguf_path, "--outtype", "f16", model_dir],
|
|
capture_output=True, text=True, timeout=1800
|
|
)
|
|
if result.returncode == 0:
|
|
log.info("GGUF conversion complete: %s", gguf_path)
|
|
return gguf_path
|
|
except (subprocess.SubprocessError, FileNotFoundError):
|
|
pass
|
|
|
|
# Fallback: save in safetensors and note that GGUF needs external tooling
|
|
log.warning("GGUF conversion requires llama-cpp-python CLI. Saving safetensors for manual conversion.")
|
|
model.save_pretrained(os.path.join(output_dir, "safetensors"))
|
|
return None
|
|
|
|
except Exception as e:
|
|
log.error("Conversion failed: %s", e)
|
|
return None
|
|
|
|
|
|
def generate_turboquant_profile(model_id, gguf_path, quant_types, output_dir):
|
|
"""Generate .turboquant.json sidecar profile."""
|
|
log.info("Generating TurboQuant profile...")
|
|
|
|
# Estimate layer count from model name
|
|
layer_count = 24 # default for small models
|
|
if "medium" in model_id.lower() or "3b" in model_id.lower():
|
|
layer_count = 42
|
|
|
|
profile = {
|
|
"version": 1,
|
|
"model": model_id,
|
|
"default_bits": "3.5",
|
|
"default_eviction": "h2o",
|
|
"use_qjl": True,
|
|
"per_layer_config": {},
|
|
"generated_at": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
|
|
"quant_variants": {}
|
|
}
|
|
|
|
# Boundary layers get higher precision
|
|
for i in range(layer_count):
|
|
if i < 2 or i >= layer_count - 2:
|
|
profile["per_layer_config"][f"layer_{i}"] = {
|
|
"bits": "4.0",
|
|
"reason": "boundary layer — higher precision for input/output"
|
|
}
|
|
|
|
# Record quantization variants
|
|
for qtype in quant_types:
|
|
qfile = os.path.join(output_dir, f"model-{qtype}.gguf")
|
|
profile["quant_variants"][qtype] = {
|
|
"file": os.path.basename(qfile),
|
|
"size_bytes": os.path.getsize(qfile) if os.path.exists(qfile) else 0,
|
|
}
|
|
|
|
profile_path = os.path.join(output_dir, "default.turboquant.json")
|
|
with open(profile_path, "w") as f:
|
|
json.dump(profile, f, indent=2)
|
|
|
|
log.info("TurboQuant profile: %s", profile_path)
|
|
return profile_path
|
|
|
|
|
|
def run_benchmarks(gguf_path, output_dir):
|
|
"""Run basic benchmarks on model."""
|
|
log.info("Running benchmarks...")
|
|
results = {
|
|
"model_path": str(gguf_path),
|
|
"benchmarks": {},
|
|
"timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
|
|
}
|
|
|
|
try:
|
|
from llama_cpp import Llama
|
|
|
|
t0 = time.time()
|
|
model = Llama(model_path=str(gguf_path), n_ctx=2048, n_gpu_layers=-1, verbose=False)
|
|
load_time = time.time() - t0
|
|
results["benchmarks"]["load_time_s"] = round(load_time, 2)
|
|
|
|
# Inference benchmark
|
|
t0 = time.time()
|
|
output = model("Write a Python function that sorts a list.", max_tokens=128)
|
|
gen_time = time.time() - t0
|
|
tokens = output["usage"]["completion_tokens"]
|
|
results["benchmarks"]["generation"] = {
|
|
"tokens": tokens,
|
|
"time_s": round(gen_time, 2),
|
|
"tok_per_sec": round(tokens / gen_time, 1) if gen_time > 0 else 0,
|
|
}
|
|
log.info("Inference: %d tokens in %.1fs (%.1f tok/s)", tokens, gen_time, tokens / gen_time)
|
|
|
|
except Exception as e:
|
|
log.warning("Benchmark failed: %s", e)
|
|
results["benchmarks"]["error"] = str(e)
|
|
|
|
bench_path = os.path.join(output_dir, "benchmark_results.json")
|
|
with open(bench_path, "w") as f:
|
|
json.dump(results, f, indent=2)
|
|
return bench_path
|
|
|
|
|
|
def upload_to_hf(model_id, output_dir, revision="main"):
|
|
"""Upload artifacts to HuggingFace."""
|
|
from huggingface_hub import HfApi
|
|
import glob
|
|
|
|
token = os.environ.get("HF_TOKEN")
|
|
if not token:
|
|
log.error("HF_TOKEN not set. Skipping upload.")
|
|
return
|
|
|
|
api = HfApi(token=token)
|
|
files = glob.glob(os.path.join(output_dir, "*.gguf")) + \
|
|
glob.glob(os.path.join(output_dir, "*.json")) + \
|
|
glob.glob(os.path.join(output_dir, "*.dat"))
|
|
|
|
for f in files:
|
|
name = os.path.basename(f)
|
|
log.info("Uploading %s to %s...", name, model_id)
|
|
try:
|
|
api.upload_file(
|
|
path_or_fileobj=f, path_in_repo=name,
|
|
repo_id=model_id, commit_message=f"Calibration: {name}"
|
|
)
|
|
except Exception as e:
|
|
log.error("Upload failed for %s: %s", name, e)
|
|
|
|
|
|
def main():
|
|
args = parse_args()
|
|
output_dir = args.output_dir
|
|
os.makedirs(output_dir, exist_ok=True)
|
|
|
|
quant_types = [q.strip() for q in args.quant_types.split(",")]
|
|
log.info("=== RuvLTRA Calibration Pipeline ===")
|
|
log.info("Model: %s | Quants: %s", args.model_id, quant_types)
|
|
|
|
if args.benchmark_only:
|
|
if args.gguf_path:
|
|
run_benchmarks(args.gguf_path, output_dir)
|
|
else:
|
|
log.error("--benchmark-only requires --gguf-path")
|
|
sys.exit(1)
|
|
return
|
|
|
|
# Phase 1a: Download model
|
|
gguf_path = args.gguf_path
|
|
if not gguf_path:
|
|
result = download_model(args.model_id, args.revision, output_dir)
|
|
if isinstance(result, str) and result.endswith(".gguf"):
|
|
gguf_path = result
|
|
else:
|
|
gguf_path = convert_to_gguf(result, output_dir)
|
|
|
|
if not gguf_path or not os.path.exists(gguf_path):
|
|
log.error("No GGUF file available. Cannot continue.")
|
|
sys.exit(1)
|
|
|
|
# Phase 1b: Generate calibration data
|
|
cal_file = args.calibration_file
|
|
if not cal_file:
|
|
cal_file = os.path.join(output_dir, "calibration.txt")
|
|
generate_calibration_data(cal_file, args.corpus)
|
|
|
|
# Phase 1c: Generate TurboQuant profile
|
|
profile_path = generate_turboquant_profile(
|
|
args.model_id, gguf_path, quant_types, output_dir
|
|
)
|
|
|
|
# Phase 1d: Run benchmarks
|
|
bench_path = run_benchmarks(gguf_path, output_dir)
|
|
|
|
# Phase 1e: Upload if requested
|
|
if args.upload:
|
|
upload_to_hf(args.model_id, output_dir)
|
|
|
|
log.info("=== Calibration Complete ===")
|
|
log.info("GGUF: %s", gguf_path)
|
|
log.info("Profile: %s", profile_path)
|
|
log.info("Benchmarks: %s", bench_path)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|