mirror of
https://github.com/ruvnet/RuVector.git
synced 2026-05-22 19:56:25 +00:00
* docs(adr): update ADR-129 — all phases executing, Phase 4 publishing complete - Phase 1 Calibration: Complete (all 4 models, benchmarks uploaded to HF) - Phase 2 SFT: Executing on L4 GPU (rank-16, 2 epochs) - Phase 3 Benchmarks: Executing (release gates + L4 benchmark job) - Phase 4 Publishing: Complete (TQ configs + benchmarks + README updates on HF) Benchmark results (L4 GPU): - ruvltra-small: 75.4 tok/s - ruvltra-medium: 62.6 tok/s - ruvltra-claude-code: 67.1 tok/s Co-Authored-By: claude-flow <ruv@ruv.net> * docs: add training pipeline and release gates to root README Add Continuous Training & Optimization section (ADR-129) to the capabilities table: nightly training, 7-gate release checks, TurboQuant profiling, training corpus. Co-Authored-By: claude-flow <ruv@ruv.net> * fix(training): include training corpus in Docker build context The SFT job failed because merged_corpus.jsonl was not in the Docker image. Copy it to scripts/training/data/training/ so it's included in the COPY . /app/ step. Co-Authored-By: claude-flow <ruv@ruv.net> * fix(training): handle raw text corpus format in SFT pipeline The training corpus uses a flat 'text' field (brain memories, ADRs) rather than chat messages or Alpaca instruction format. Add handler that converts raw text to completion-style messages for SFT. Co-Authored-By: claude-flow <ruv@ruv.net>
410 lines
15 KiB
Python
Executable file
410 lines
15 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
"""RuvLTRA Phase 2: LoRA SFT fine-tuning pipeline.
|
|
|
|
Loads training corpus, runs LoRA SFT with peft + transformers,
|
|
merges adapter weights, converts to GGUF, and runs release gate checks.
|
|
|
|
Usage:
|
|
python run_sft.py --model-id ruvnet/ruvLTRA-7b --corpus data/training/corpus.jsonl
|
|
python run_sft.py --model-id ruvnet/ruvLTRA-7b --corpus corpus.jsonl --upload
|
|
"""
|
|
import argparse
|
|
import json
|
|
import logging
|
|
import os
|
|
import sys
|
|
import time
|
|
from pathlib import Path
|
|
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s [%(levelname)s] %(message)s",
|
|
datefmt="%Y-%m-%d %H:%M:%S",
|
|
)
|
|
log = logging.getLogger("ruvltra-sft")
|
|
|
|
|
|
def parse_args():
|
|
p = argparse.ArgumentParser(description="RuvLTRA LoRA SFT training pipeline")
|
|
p.add_argument("--model-id", required=True, help="HuggingFace model ID")
|
|
p.add_argument("--corpus", required=True, help="Path to training corpus (JSONL)")
|
|
p.add_argument("--output-dir", default="/tmp/sft-output", help="Output directory")
|
|
p.add_argument("--revision", default="main", help="Model revision/branch")
|
|
|
|
# LoRA config
|
|
p.add_argument("--lora-r", type=int, default=16, help="LoRA rank")
|
|
p.add_argument("--lora-alpha", type=int, default=32, help="LoRA alpha")
|
|
p.add_argument("--lora-dropout", type=float, default=0.05, help="LoRA dropout")
|
|
p.add_argument("--target-modules", default="q_proj,k_proj,v_proj,o_proj,gate_proj,up_proj,down_proj",
|
|
help="Comma-separated target modules for LoRA")
|
|
|
|
# Training config
|
|
p.add_argument("--epochs", type=int, default=3, help="Number of training epochs")
|
|
p.add_argument("--batch-size", type=int, default=4, help="Per-device batch size")
|
|
p.add_argument("--grad-accum", type=int, default=4, help="Gradient accumulation steps")
|
|
p.add_argument("--lr", type=float, default=2e-4, help="Learning rate")
|
|
p.add_argument("--max-seq-len", type=int, default=2048, help="Maximum sequence length")
|
|
p.add_argument("--warmup-ratio", type=float, default=0.03, help="Warmup ratio")
|
|
|
|
# Output controls
|
|
p.add_argument("--upload", action="store_true", help="Upload merged model to HuggingFace")
|
|
p.add_argument("--convert-gguf", action="store_true", default=True, help="Convert to GGUF after merge")
|
|
p.add_argument("--quant-type", default="Q4_K_M", help="GGUF quantization type for release")
|
|
p.add_argument("--skip-gate", action="store_true", help="Skip release gate checks")
|
|
return p.parse_args()
|
|
|
|
|
|
def load_corpus(corpus_path: str) -> list[dict]:
|
|
"""Load JSONL training corpus. Expected format: {instruction, input, output} or {messages}."""
|
|
if not os.path.exists(corpus_path):
|
|
raise FileNotFoundError(f"Corpus not found: {corpus_path}")
|
|
|
|
records = []
|
|
with open(corpus_path) as f:
|
|
for i, line in enumerate(f):
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
try:
|
|
records.append(json.loads(line))
|
|
except json.JSONDecodeError as e:
|
|
log.warning("Skipping malformed line %d: %s", i + 1, e)
|
|
|
|
if not records:
|
|
raise ValueError(f"No valid records found in {corpus_path}")
|
|
|
|
log.info("Loaded %d training examples from %s", len(records), corpus_path)
|
|
return records
|
|
|
|
|
|
def format_dataset(records: list[dict]):
|
|
"""Convert corpus records into a HuggingFace Dataset."""
|
|
from datasets import Dataset
|
|
|
|
formatted = []
|
|
for rec in records:
|
|
if "messages" in rec:
|
|
# Chat format: [{role, content}, ...]
|
|
formatted.append({"messages": rec["messages"]})
|
|
elif "instruction" in rec:
|
|
# Alpaca format
|
|
messages = [
|
|
{"role": "system", "content": "You are a helpful coding assistant."},
|
|
{"role": "user", "content": rec["instruction"]},
|
|
]
|
|
if rec.get("input"):
|
|
messages[-1]["content"] += f"\n\n{rec['input']}"
|
|
messages.append({"role": "assistant", "content": rec["output"]})
|
|
formatted.append({"messages": messages})
|
|
elif "text" in rec and len(rec["text"]) > 100:
|
|
# Raw text format (brain memories, ADRs) — convert to completion format
|
|
text = rec["text"]
|
|
title = rec.get("title", text[:60].split("\n")[0])
|
|
messages = [
|
|
{"role": "system", "content": "You are a knowledgeable software architect and Rust developer."},
|
|
{"role": "user", "content": f"Explain: {title}"},
|
|
{"role": "assistant", "content": text},
|
|
]
|
|
formatted.append({"messages": messages})
|
|
else:
|
|
log.warning("Skipping record with unknown format: %s", list(rec.keys()))
|
|
|
|
return Dataset.from_list(formatted)
|
|
|
|
|
|
def train_lora(model_id: str, dataset, args) -> str:
|
|
"""Run LoRA SFT training and return path to adapter directory."""
|
|
import torch
|
|
from transformers import AutoModelForCausalLM, AutoTokenizer, BitsAndBytesConfig
|
|
from peft import LoraConfig, get_peft_model, prepare_model_for_kbit_training
|
|
from trl import SFTTrainer, SFTConfig
|
|
|
|
adapter_dir = os.path.join(args.output_dir, "lora-adapter")
|
|
os.makedirs(adapter_dir, exist_ok=True)
|
|
|
|
# Load tokenizer
|
|
log.info("Loading tokenizer for %s...", model_id)
|
|
tokenizer = AutoTokenizer.from_pretrained(model_id, trust_remote_code=True)
|
|
if tokenizer.pad_token is None:
|
|
tokenizer.pad_token = tokenizer.eos_token
|
|
|
|
# Load model in 4-bit for memory efficiency
|
|
log.info("Loading model in 4-bit quantization...")
|
|
bnb_config = BitsAndBytesConfig(
|
|
load_in_4bit=True,
|
|
bnb_4bit_quant_type="nf4",
|
|
bnb_4bit_compute_dtype=torch.bfloat16,
|
|
bnb_4bit_use_double_quant=True,
|
|
)
|
|
|
|
model = AutoModelForCausalLM.from_pretrained(
|
|
model_id,
|
|
quantization_config=bnb_config,
|
|
device_map="auto",
|
|
trust_remote_code=True,
|
|
attn_implementation="flash_attention_2" if torch.cuda.is_available() else "eager",
|
|
)
|
|
model = prepare_model_for_kbit_training(model)
|
|
|
|
# Configure LoRA
|
|
target_modules = [m.strip() for m in args.target_modules.split(",")]
|
|
lora_config = LoraConfig(
|
|
r=args.lora_r,
|
|
lora_alpha=args.lora_alpha,
|
|
lora_dropout=args.lora_dropout,
|
|
target_modules=target_modules,
|
|
bias="none",
|
|
task_type="CAUSAL_LM",
|
|
)
|
|
|
|
model = get_peft_model(model, lora_config)
|
|
trainable, total = model.get_nb_trainable_parameters()
|
|
log.info("Trainable parameters: %d / %d (%.2f%%)", trainable, total, 100 * trainable / total)
|
|
|
|
# Training config
|
|
training_config = SFTConfig(
|
|
output_dir=adapter_dir,
|
|
num_train_epochs=args.epochs,
|
|
per_device_train_batch_size=args.batch_size,
|
|
gradient_accumulation_steps=args.grad_accum,
|
|
learning_rate=args.lr,
|
|
max_seq_length=args.max_seq_len,
|
|
warmup_ratio=args.warmup_ratio,
|
|
logging_steps=10,
|
|
save_steps=100,
|
|
save_total_limit=2,
|
|
bf16=torch.cuda.is_available(),
|
|
gradient_checkpointing=True,
|
|
optim="paged_adamw_8bit",
|
|
lr_scheduler_type="cosine",
|
|
report_to="none",
|
|
seed=42,
|
|
)
|
|
|
|
# Train
|
|
log.info("Starting LoRA SFT training (%d epochs)...", args.epochs)
|
|
start = time.time()
|
|
|
|
trainer = SFTTrainer(
|
|
model=model,
|
|
train_dataset=dataset,
|
|
processing_class=tokenizer,
|
|
args=training_config,
|
|
)
|
|
|
|
trainer.train()
|
|
elapsed = time.time() - start
|
|
log.info("Training completed in %.1f minutes", elapsed / 60)
|
|
|
|
# Save adapter
|
|
trainer.save_model(adapter_dir)
|
|
tokenizer.save_pretrained(adapter_dir)
|
|
log.info("LoRA adapter saved to %s", adapter_dir)
|
|
|
|
return adapter_dir
|
|
|
|
|
|
def merge_adapter(model_id: str, adapter_dir: str, output_dir: str) -> str:
|
|
"""Merge LoRA adapter back into base model."""
|
|
import torch
|
|
from transformers import AutoModelForCausalLM, AutoTokenizer
|
|
from peft import PeftModel
|
|
|
|
merged_dir = os.path.join(output_dir, "merged-model")
|
|
os.makedirs(merged_dir, exist_ok=True)
|
|
|
|
log.info("Loading base model for merge...")
|
|
base_model = AutoModelForCausalLM.from_pretrained(
|
|
model_id,
|
|
torch_dtype=torch.float16,
|
|
device_map="auto",
|
|
trust_remote_code=True,
|
|
)
|
|
|
|
log.info("Loading and merging LoRA adapter...")
|
|
model = PeftModel.from_pretrained(base_model, adapter_dir)
|
|
model = model.merge_and_unload()
|
|
|
|
log.info("Saving merged model...")
|
|
model.save_pretrained(merged_dir, safe_serialization=True)
|
|
tokenizer = AutoTokenizer.from_pretrained(adapter_dir)
|
|
tokenizer.save_pretrained(merged_dir)
|
|
|
|
log.info("Merged model saved to %s", merged_dir)
|
|
return merged_dir
|
|
|
|
|
|
def convert_to_gguf(merged_dir: str, output_dir: str, quant_type: str) -> str:
|
|
"""Convert merged model to quantized GGUF."""
|
|
import subprocess
|
|
import shutil
|
|
|
|
gguf_f16 = os.path.join(output_dir, "model-f16.gguf")
|
|
gguf_quant = os.path.join(output_dir, f"model-{quant_type}.gguf")
|
|
|
|
convert_script = "/opt/llama.cpp/convert_hf_to_gguf.py"
|
|
if not os.path.exists(convert_script):
|
|
log.warning("llama.cpp convert script not found, skipping GGUF conversion")
|
|
return ""
|
|
|
|
# Convert to f16
|
|
log.info("Converting to GGUF (f16)...")
|
|
subprocess.run(
|
|
[sys.executable, convert_script, merged_dir, "--outfile", gguf_f16, "--outtype", "f16"],
|
|
check=True,
|
|
)
|
|
|
|
# Quantize
|
|
quantize_bin = shutil.which("llama-quantize")
|
|
if quantize_bin:
|
|
log.info("Quantizing to %s...", quant_type)
|
|
subprocess.run([quantize_bin, gguf_f16, gguf_quant, quant_type], check=True)
|
|
file_size = os.path.getsize(gguf_quant)
|
|
log.info("Quantized GGUF: %s (%.2f GB)", gguf_quant, file_size / (1024**3))
|
|
return gguf_quant
|
|
else:
|
|
log.warning("llama-quantize not found, returning f16 GGUF")
|
|
return gguf_f16
|
|
|
|
|
|
def release_gate_check(output_dir: str, quant_type: str) -> bool:
|
|
"""Run release gate checks on the final model.
|
|
|
|
Gate criteria:
|
|
- Quantized GGUF exists and is non-empty
|
|
- File size is within expected bounds (> 1GB for 7B model)
|
|
- Training loss log shows convergence
|
|
"""
|
|
log.info("=== Release Gate Check ===")
|
|
passed = True
|
|
|
|
# Check GGUF exists
|
|
gguf_path = os.path.join(output_dir, f"model-{quant_type}.gguf")
|
|
if not os.path.exists(gguf_path):
|
|
gguf_path = os.path.join(output_dir, "model-f16.gguf")
|
|
|
|
if os.path.exists(gguf_path):
|
|
size_gb = os.path.getsize(gguf_path) / (1024**3)
|
|
log.info(" GGUF size: %.2f GB", size_gb)
|
|
if size_gb < 0.5:
|
|
log.error(" FAIL: GGUF file suspiciously small (< 0.5 GB)")
|
|
passed = False
|
|
else:
|
|
log.info(" PASS: GGUF file size OK")
|
|
else:
|
|
log.error(" FAIL: No GGUF file found")
|
|
passed = False
|
|
|
|
# Check adapter was saved
|
|
adapter_dir = os.path.join(output_dir, "lora-adapter")
|
|
adapter_config = os.path.join(adapter_dir, "adapter_config.json")
|
|
if os.path.exists(adapter_config):
|
|
log.info(" PASS: LoRA adapter config present")
|
|
else:
|
|
log.error(" FAIL: LoRA adapter config missing")
|
|
passed = False
|
|
|
|
# Check training logs for convergence
|
|
trainer_state = os.path.join(adapter_dir, "trainer_state.json")
|
|
if os.path.exists(trainer_state):
|
|
with open(trainer_state) as f:
|
|
state = json.load(f)
|
|
log_history = state.get("log_history", [])
|
|
losses = [entry["loss"] for entry in log_history if "loss" in entry]
|
|
if len(losses) >= 2:
|
|
initial_loss = losses[0]
|
|
final_loss = losses[-1]
|
|
if final_loss < initial_loss:
|
|
log.info(" PASS: Loss decreased %.4f -> %.4f", initial_loss, final_loss)
|
|
else:
|
|
log.warning(" WARN: Loss did not decrease %.4f -> %.4f", initial_loss, final_loss)
|
|
else:
|
|
log.warning(" WARN: Not enough loss entries to check convergence")
|
|
else:
|
|
log.warning(" WARN: No trainer state found, cannot check convergence")
|
|
|
|
verdict = "PASSED" if passed else "FAILED"
|
|
log.info("=== Release Gate: %s ===", verdict)
|
|
return passed
|
|
|
|
|
|
def upload_to_hf(model_id: str, output_dir: str, revision: str):
|
|
"""Upload merged model and artifacts to HuggingFace."""
|
|
from huggingface_hub import HfApi
|
|
|
|
api = HfApi()
|
|
merged_dir = os.path.join(output_dir, "merged-model")
|
|
|
|
if os.path.isdir(merged_dir):
|
|
log.info("Uploading merged model to %s...", model_id)
|
|
api.upload_folder(
|
|
folder_path=merged_dir,
|
|
repo_id=model_id,
|
|
revision=revision,
|
|
)
|
|
|
|
# Upload GGUF files separately
|
|
for f in os.listdir(output_dir):
|
|
if f.endswith(".gguf"):
|
|
fpath = os.path.join(output_dir, f)
|
|
log.info("Uploading %s...", f)
|
|
api.upload_file(
|
|
path_or_fileobj=fpath,
|
|
path_in_repo=f,
|
|
repo_id=model_id,
|
|
revision=revision,
|
|
)
|
|
|
|
log.info("Upload complete")
|
|
|
|
|
|
def main():
|
|
args = parse_args()
|
|
os.makedirs(args.output_dir, exist_ok=True)
|
|
|
|
log.info("=== RuvLTRA SFT Training Pipeline ===")
|
|
log.info("Model: %s", args.model_id)
|
|
log.info("Corpus: %s", args.corpus)
|
|
log.info("LoRA: r=%d, alpha=%d, dropout=%.2f", args.lora_r, args.lora_alpha, args.lora_dropout)
|
|
log.info("Training: epochs=%d, batch=%d, lr=%.0e", args.epochs, args.batch_size, args.lr)
|
|
|
|
# Phase 2a: Load and format corpus
|
|
records = load_corpus(args.corpus)
|
|
dataset = format_dataset(records)
|
|
log.info("Dataset prepared: %d examples", len(dataset))
|
|
|
|
# Phase 2b: LoRA SFT training
|
|
adapter_dir = train_lora(args.model_id, dataset, args)
|
|
|
|
# Phase 2c: Merge adapter weights
|
|
merged_dir = merge_adapter(args.model_id, adapter_dir, args.output_dir)
|
|
|
|
# Phase 2d: Convert to GGUF
|
|
gguf_path = ""
|
|
if args.convert_gguf:
|
|
gguf_path = convert_to_gguf(merged_dir, args.output_dir, args.quant_type)
|
|
|
|
# Phase 2e: Release gate check
|
|
if not args.skip_gate:
|
|
gate_passed = release_gate_check(args.output_dir, args.quant_type)
|
|
if not gate_passed:
|
|
log.error("Release gate FAILED — review output before publishing")
|
|
sys.exit(2)
|
|
|
|
# Phase 2f: Upload if requested
|
|
if args.upload:
|
|
upload_to_hf(args.model_id, args.output_dir, args.revision)
|
|
|
|
log.info("=== SFT Pipeline complete ===")
|
|
log.info("Adapter: %s", adapter_dir)
|
|
log.info("Merged: %s", merged_dir)
|
|
if gguf_path:
|
|
log.info("GGUF: %s", gguf_path)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
try:
|
|
main()
|
|
except Exception as e:
|
|
log.error("Pipeline failed: %s", e, exc_info=True)
|
|
sys.exit(1)
|