ruvector/scripts/training/run_sft.py
rUv 385eb17d08 feat(training): ADR-129 RuvLTRA training pipeline — calibration, SFT, benchmarks, HF publishing
* docs(adr): update ADR-129 — all phases executing, Phase 4 publishing complete

- Phase 1 Calibration: Complete (all 4 models, benchmarks uploaded to HF)
- Phase 2 SFT: Executing on L4 GPU (rank-16, 2 epochs)
- Phase 3 Benchmarks: Executing (release gates + L4 benchmark job)
- Phase 4 Publishing: Complete (TQ configs + benchmarks + README updates on HF)

Benchmark results (L4 GPU):
- ruvltra-small: 75.4 tok/s
- ruvltra-medium: 62.6 tok/s
- ruvltra-claude-code: 67.1 tok/s

Co-Authored-By: claude-flow <ruv@ruv.net>

* docs: add training pipeline and release gates to root README

Add Continuous Training & Optimization section (ADR-129) to the
capabilities table: nightly training, 7-gate release checks,
TurboQuant profiling, training corpus.

Co-Authored-By: claude-flow <ruv@ruv.net>

* fix(training): include training corpus in Docker build context

The SFT job failed because merged_corpus.jsonl was not in the Docker
image. Copy it to scripts/training/data/training/ so it's included
in the COPY . /app/ step.

Co-Authored-By: claude-flow <ruv@ruv.net>

* fix(training): handle raw text corpus format in SFT pipeline

The training corpus uses a flat 'text' field (brain memories, ADRs)
rather than chat messages or Alpaca instruction format. Add handler
that converts raw text to completion-style messages for SFT.

Co-Authored-By: claude-flow <ruv@ruv.net>
2026-03-30 07:58:07 -04:00

410 lines
15 KiB
Python
Executable file

#!/usr/bin/env python3
"""RuvLTRA Phase 2: LoRA SFT fine-tuning pipeline.
Loads training corpus, runs LoRA SFT with peft + transformers,
merges adapter weights, converts to GGUF, and runs release gate checks.
Usage:
python run_sft.py --model-id ruvnet/ruvLTRA-7b --corpus data/training/corpus.jsonl
python run_sft.py --model-id ruvnet/ruvLTRA-7b --corpus corpus.jsonl --upload
"""
import argparse
import json
import logging
import os
import sys
import time
from pathlib import Path
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
log = logging.getLogger("ruvltra-sft")
def parse_args():
p = argparse.ArgumentParser(description="RuvLTRA LoRA SFT training pipeline")
p.add_argument("--model-id", required=True, help="HuggingFace model ID")
p.add_argument("--corpus", required=True, help="Path to training corpus (JSONL)")
p.add_argument("--output-dir", default="/tmp/sft-output", help="Output directory")
p.add_argument("--revision", default="main", help="Model revision/branch")
# LoRA config
p.add_argument("--lora-r", type=int, default=16, help="LoRA rank")
p.add_argument("--lora-alpha", type=int, default=32, help="LoRA alpha")
p.add_argument("--lora-dropout", type=float, default=0.05, help="LoRA dropout")
p.add_argument("--target-modules", default="q_proj,k_proj,v_proj,o_proj,gate_proj,up_proj,down_proj",
help="Comma-separated target modules for LoRA")
# Training config
p.add_argument("--epochs", type=int, default=3, help="Number of training epochs")
p.add_argument("--batch-size", type=int, default=4, help="Per-device batch size")
p.add_argument("--grad-accum", type=int, default=4, help="Gradient accumulation steps")
p.add_argument("--lr", type=float, default=2e-4, help="Learning rate")
p.add_argument("--max-seq-len", type=int, default=2048, help="Maximum sequence length")
p.add_argument("--warmup-ratio", type=float, default=0.03, help="Warmup ratio")
# Output controls
p.add_argument("--upload", action="store_true", help="Upload merged model to HuggingFace")
p.add_argument("--convert-gguf", action="store_true", default=True, help="Convert to GGUF after merge")
p.add_argument("--quant-type", default="Q4_K_M", help="GGUF quantization type for release")
p.add_argument("--skip-gate", action="store_true", help="Skip release gate checks")
return p.parse_args()
def load_corpus(corpus_path: str) -> list[dict]:
"""Load JSONL training corpus. Expected format: {instruction, input, output} or {messages}."""
if not os.path.exists(corpus_path):
raise FileNotFoundError(f"Corpus not found: {corpus_path}")
records = []
with open(corpus_path) as f:
for i, line in enumerate(f):
line = line.strip()
if not line:
continue
try:
records.append(json.loads(line))
except json.JSONDecodeError as e:
log.warning("Skipping malformed line %d: %s", i + 1, e)
if not records:
raise ValueError(f"No valid records found in {corpus_path}")
log.info("Loaded %d training examples from %s", len(records), corpus_path)
return records
def format_dataset(records: list[dict]):
"""Convert corpus records into a HuggingFace Dataset."""
from datasets import Dataset
formatted = []
for rec in records:
if "messages" in rec:
# Chat format: [{role, content}, ...]
formatted.append({"messages": rec["messages"]})
elif "instruction" in rec:
# Alpaca format
messages = [
{"role": "system", "content": "You are a helpful coding assistant."},
{"role": "user", "content": rec["instruction"]},
]
if rec.get("input"):
messages[-1]["content"] += f"\n\n{rec['input']}"
messages.append({"role": "assistant", "content": rec["output"]})
formatted.append({"messages": messages})
elif "text" in rec and len(rec["text"]) > 100:
# Raw text format (brain memories, ADRs) — convert to completion format
text = rec["text"]
title = rec.get("title", text[:60].split("\n")[0])
messages = [
{"role": "system", "content": "You are a knowledgeable software architect and Rust developer."},
{"role": "user", "content": f"Explain: {title}"},
{"role": "assistant", "content": text},
]
formatted.append({"messages": messages})
else:
log.warning("Skipping record with unknown format: %s", list(rec.keys()))
return Dataset.from_list(formatted)
def train_lora(model_id: str, dataset, args) -> str:
"""Run LoRA SFT training and return path to adapter directory."""
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer, BitsAndBytesConfig
from peft import LoraConfig, get_peft_model, prepare_model_for_kbit_training
from trl import SFTTrainer, SFTConfig
adapter_dir = os.path.join(args.output_dir, "lora-adapter")
os.makedirs(adapter_dir, exist_ok=True)
# Load tokenizer
log.info("Loading tokenizer for %s...", model_id)
tokenizer = AutoTokenizer.from_pretrained(model_id, trust_remote_code=True)
if tokenizer.pad_token is None:
tokenizer.pad_token = tokenizer.eos_token
# Load model in 4-bit for memory efficiency
log.info("Loading model in 4-bit quantization...")
bnb_config = BitsAndBytesConfig(
load_in_4bit=True,
bnb_4bit_quant_type="nf4",
bnb_4bit_compute_dtype=torch.bfloat16,
bnb_4bit_use_double_quant=True,
)
model = AutoModelForCausalLM.from_pretrained(
model_id,
quantization_config=bnb_config,
device_map="auto",
trust_remote_code=True,
attn_implementation="flash_attention_2" if torch.cuda.is_available() else "eager",
)
model = prepare_model_for_kbit_training(model)
# Configure LoRA
target_modules = [m.strip() for m in args.target_modules.split(",")]
lora_config = LoraConfig(
r=args.lora_r,
lora_alpha=args.lora_alpha,
lora_dropout=args.lora_dropout,
target_modules=target_modules,
bias="none",
task_type="CAUSAL_LM",
)
model = get_peft_model(model, lora_config)
trainable, total = model.get_nb_trainable_parameters()
log.info("Trainable parameters: %d / %d (%.2f%%)", trainable, total, 100 * trainable / total)
# Training config
training_config = SFTConfig(
output_dir=adapter_dir,
num_train_epochs=args.epochs,
per_device_train_batch_size=args.batch_size,
gradient_accumulation_steps=args.grad_accum,
learning_rate=args.lr,
max_seq_length=args.max_seq_len,
warmup_ratio=args.warmup_ratio,
logging_steps=10,
save_steps=100,
save_total_limit=2,
bf16=torch.cuda.is_available(),
gradient_checkpointing=True,
optim="paged_adamw_8bit",
lr_scheduler_type="cosine",
report_to="none",
seed=42,
)
# Train
log.info("Starting LoRA SFT training (%d epochs)...", args.epochs)
start = time.time()
trainer = SFTTrainer(
model=model,
train_dataset=dataset,
processing_class=tokenizer,
args=training_config,
)
trainer.train()
elapsed = time.time() - start
log.info("Training completed in %.1f minutes", elapsed / 60)
# Save adapter
trainer.save_model(adapter_dir)
tokenizer.save_pretrained(adapter_dir)
log.info("LoRA adapter saved to %s", adapter_dir)
return adapter_dir
def merge_adapter(model_id: str, adapter_dir: str, output_dir: str) -> str:
"""Merge LoRA adapter back into base model."""
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer
from peft import PeftModel
merged_dir = os.path.join(output_dir, "merged-model")
os.makedirs(merged_dir, exist_ok=True)
log.info("Loading base model for merge...")
base_model = AutoModelForCausalLM.from_pretrained(
model_id,
torch_dtype=torch.float16,
device_map="auto",
trust_remote_code=True,
)
log.info("Loading and merging LoRA adapter...")
model = PeftModel.from_pretrained(base_model, adapter_dir)
model = model.merge_and_unload()
log.info("Saving merged model...")
model.save_pretrained(merged_dir, safe_serialization=True)
tokenizer = AutoTokenizer.from_pretrained(adapter_dir)
tokenizer.save_pretrained(merged_dir)
log.info("Merged model saved to %s", merged_dir)
return merged_dir
def convert_to_gguf(merged_dir: str, output_dir: str, quant_type: str) -> str:
"""Convert merged model to quantized GGUF."""
import subprocess
import shutil
gguf_f16 = os.path.join(output_dir, "model-f16.gguf")
gguf_quant = os.path.join(output_dir, f"model-{quant_type}.gguf")
convert_script = "/opt/llama.cpp/convert_hf_to_gguf.py"
if not os.path.exists(convert_script):
log.warning("llama.cpp convert script not found, skipping GGUF conversion")
return ""
# Convert to f16
log.info("Converting to GGUF (f16)...")
subprocess.run(
[sys.executable, convert_script, merged_dir, "--outfile", gguf_f16, "--outtype", "f16"],
check=True,
)
# Quantize
quantize_bin = shutil.which("llama-quantize")
if quantize_bin:
log.info("Quantizing to %s...", quant_type)
subprocess.run([quantize_bin, gguf_f16, gguf_quant, quant_type], check=True)
file_size = os.path.getsize(gguf_quant)
log.info("Quantized GGUF: %s (%.2f GB)", gguf_quant, file_size / (1024**3))
return gguf_quant
else:
log.warning("llama-quantize not found, returning f16 GGUF")
return gguf_f16
def release_gate_check(output_dir: str, quant_type: str) -> bool:
"""Run release gate checks on the final model.
Gate criteria:
- Quantized GGUF exists and is non-empty
- File size is within expected bounds (> 1GB for 7B model)
- Training loss log shows convergence
"""
log.info("=== Release Gate Check ===")
passed = True
# Check GGUF exists
gguf_path = os.path.join(output_dir, f"model-{quant_type}.gguf")
if not os.path.exists(gguf_path):
gguf_path = os.path.join(output_dir, "model-f16.gguf")
if os.path.exists(gguf_path):
size_gb = os.path.getsize(gguf_path) / (1024**3)
log.info(" GGUF size: %.2f GB", size_gb)
if size_gb < 0.5:
log.error(" FAIL: GGUF file suspiciously small (< 0.5 GB)")
passed = False
else:
log.info(" PASS: GGUF file size OK")
else:
log.error(" FAIL: No GGUF file found")
passed = False
# Check adapter was saved
adapter_dir = os.path.join(output_dir, "lora-adapter")
adapter_config = os.path.join(adapter_dir, "adapter_config.json")
if os.path.exists(adapter_config):
log.info(" PASS: LoRA adapter config present")
else:
log.error(" FAIL: LoRA adapter config missing")
passed = False
# Check training logs for convergence
trainer_state = os.path.join(adapter_dir, "trainer_state.json")
if os.path.exists(trainer_state):
with open(trainer_state) as f:
state = json.load(f)
log_history = state.get("log_history", [])
losses = [entry["loss"] for entry in log_history if "loss" in entry]
if len(losses) >= 2:
initial_loss = losses[0]
final_loss = losses[-1]
if final_loss < initial_loss:
log.info(" PASS: Loss decreased %.4f -> %.4f", initial_loss, final_loss)
else:
log.warning(" WARN: Loss did not decrease %.4f -> %.4f", initial_loss, final_loss)
else:
log.warning(" WARN: Not enough loss entries to check convergence")
else:
log.warning(" WARN: No trainer state found, cannot check convergence")
verdict = "PASSED" if passed else "FAILED"
log.info("=== Release Gate: %s ===", verdict)
return passed
def upload_to_hf(model_id: str, output_dir: str, revision: str):
"""Upload merged model and artifacts to HuggingFace."""
from huggingface_hub import HfApi
api = HfApi()
merged_dir = os.path.join(output_dir, "merged-model")
if os.path.isdir(merged_dir):
log.info("Uploading merged model to %s...", model_id)
api.upload_folder(
folder_path=merged_dir,
repo_id=model_id,
revision=revision,
)
# Upload GGUF files separately
for f in os.listdir(output_dir):
if f.endswith(".gguf"):
fpath = os.path.join(output_dir, f)
log.info("Uploading %s...", f)
api.upload_file(
path_or_fileobj=fpath,
path_in_repo=f,
repo_id=model_id,
revision=revision,
)
log.info("Upload complete")
def main():
args = parse_args()
os.makedirs(args.output_dir, exist_ok=True)
log.info("=== RuvLTRA SFT Training Pipeline ===")
log.info("Model: %s", args.model_id)
log.info("Corpus: %s", args.corpus)
log.info("LoRA: r=%d, alpha=%d, dropout=%.2f", args.lora_r, args.lora_alpha, args.lora_dropout)
log.info("Training: epochs=%d, batch=%d, lr=%.0e", args.epochs, args.batch_size, args.lr)
# Phase 2a: Load and format corpus
records = load_corpus(args.corpus)
dataset = format_dataset(records)
log.info("Dataset prepared: %d examples", len(dataset))
# Phase 2b: LoRA SFT training
adapter_dir = train_lora(args.model_id, dataset, args)
# Phase 2c: Merge adapter weights
merged_dir = merge_adapter(args.model_id, adapter_dir, args.output_dir)
# Phase 2d: Convert to GGUF
gguf_path = ""
if args.convert_gguf:
gguf_path = convert_to_gguf(merged_dir, args.output_dir, args.quant_type)
# Phase 2e: Release gate check
if not args.skip_gate:
gate_passed = release_gate_check(args.output_dir, args.quant_type)
if not gate_passed:
log.error("Release gate FAILED — review output before publishing")
sys.exit(2)
# Phase 2f: Upload if requested
if args.upload:
upload_to_hf(args.model_id, args.output_dir, args.revision)
log.info("=== SFT Pipeline complete ===")
log.info("Adapter: %s", adapter_dir)
log.info("Merged: %s", merged_dir)
if gguf_path:
log.info("GGUF: %s", gguf_path)
if __name__ == "__main__":
try:
main()
except Exception as e:
log.error("Pipeline failed: %s", e, exc_info=True)
sys.exit(1)