mirror of
https://github.com/ruvnet/RuVector.git
synced 2026-05-22 19:56:25 +00:00
Training tooling: - release_gate.py: Automated 7-gate ship/no-ship checker (G1-G7) - export_training_data.py: Dataset export with governance (schema, dedup, quality scoring, contamination check) - contamination_check.py: 13-gram eval contamination detection - run_calibration.py: Phase 1 imatrix + TurboQuant profiling - run_sft.py: Phase 2 LoRA SFT + DPO training - deploy_training.sh: Cloud Run job creation + Vertex AI setup - Dockerfile: GPU training image (transformers + peft + trl) Rust infrastructure: - turboquant_profile.rs: .turboquant.json sidecar config loading, per-layer TQ config discovery, default profiles Ref: ADR-129, #310 Co-Authored-By: claude-flow <ruv@ruv.net>
366 lines
12 KiB
Python
Executable file
366 lines
12 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
"""
|
|
Export training data for RuvLTRA model fine-tuning.
|
|
|
|
Implements dataset governance from ADR-129 Section 2.2:
|
|
- Record schema validation (id, source, text, license, quality_score, provenance, content_hash)
|
|
- SHA-256 content dedup
|
|
- Quality score filtering (< 0.5 excluded)
|
|
- Output statistics (count, token count per source, quality histogram)
|
|
|
|
Sources:
|
|
1. Brain memories from pi.ruv.io (graceful fallback on connection failure)
|
|
2. ADR corpus from docs/adr/
|
|
3. Claude Flow routing dataset reference (ruvnet/claude-flow-routing on HF)
|
|
|
|
Output: data/training/corpus.jsonl
|
|
"""
|
|
|
|
import hashlib
|
|
import json
|
|
import os
|
|
import sys
|
|
import uuid
|
|
from collections import Counter, defaultdict
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from urllib.error import URLError
|
|
from urllib.request import Request, urlopen
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Configuration
|
|
# ---------------------------------------------------------------------------
|
|
|
|
REPO_ROOT = Path(__file__).resolve().parents[2]
|
|
ADR_DIR = REPO_ROOT / "docs" / "adr"
|
|
OUTPUT_DIR = REPO_ROOT / "data" / "training"
|
|
OUTPUT_FILE = OUTPUT_DIR / "corpus.jsonl"
|
|
|
|
BRAIN_API = "https://pi.ruv.io/v1/memories/list"
|
|
BRAIN_LIMIT = 5000
|
|
BRAIN_TIMEOUT_S = 15
|
|
|
|
QUALITY_THRESHOLD = 0.5
|
|
|
|
# ADR-129 Section 2.2 source allowlist
|
|
ALLOWED_SOURCES = {"brain", "wet", "claude-routing", "code", "adr"}
|
|
ALLOWED_LICENSES = {"apache-2.0", "mit", "cc-by-4.0", "public-domain"}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Record helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def content_hash(text: str) -> str:
|
|
"""SHA-256 hash of the text content for dedup."""
|
|
return hashlib.sha256(text.encode("utf-8")).hexdigest()
|
|
|
|
|
|
def make_record(
|
|
source: str,
|
|
text: str,
|
|
license_id: str,
|
|
quality_score: float,
|
|
provenance: str,
|
|
created_at: str | None = None,
|
|
) -> dict:
|
|
"""Build a training record conforming to ADR-129 Section 2.2 schema."""
|
|
if source not in ALLOWED_SOURCES:
|
|
raise ValueError(f"Source '{source}' not in allowlist: {ALLOWED_SOURCES}")
|
|
if license_id not in ALLOWED_LICENSES:
|
|
raise ValueError(f"License '{license_id}' not in allowlist: {ALLOWED_LICENSES}")
|
|
|
|
return {
|
|
"id": str(uuid.uuid4()),
|
|
"source": source,
|
|
"text": text,
|
|
"license": license_id,
|
|
"quality_score": round(quality_score, 4),
|
|
"provenance": provenance,
|
|
"created_at": created_at or datetime.now(timezone.utc).isoformat(),
|
|
"content_hash": content_hash(text),
|
|
}
|
|
|
|
|
|
def validate_record(record: dict) -> list[str]:
|
|
"""Validate a record against the ADR-129 schema. Returns list of errors."""
|
|
required = {"id", "source", "text", "license", "quality_score", "provenance",
|
|
"created_at", "content_hash"}
|
|
errors = []
|
|
missing = required - set(record.keys())
|
|
if missing:
|
|
errors.append(f"Missing fields: {missing}")
|
|
if record.get("source") not in ALLOWED_SOURCES:
|
|
errors.append(f"Invalid source: {record.get('source')}")
|
|
if record.get("license") not in ALLOWED_LICENSES:
|
|
errors.append(f"Invalid license: {record.get('license')}")
|
|
qs = record.get("quality_score")
|
|
if qs is not None and not (0.0 <= qs <= 1.0):
|
|
errors.append(f"quality_score out of range: {qs}")
|
|
if not record.get("text", "").strip():
|
|
errors.append("Empty text")
|
|
ch = record.get("content_hash", "")
|
|
if len(ch) != 64:
|
|
errors.append(f"Invalid content_hash length: {len(ch)}")
|
|
return errors
|
|
|
|
|
|
def estimate_tokens(text: str) -> int:
|
|
"""Rough token estimate: ~4 chars per token for English/code."""
|
|
return max(1, len(text) // 4)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Source 1: Brain memories from pi.ruv.io
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def fetch_brain_memories() -> list[dict]:
|
|
"""Fetch memories from pi.ruv.io. Returns empty list on failure."""
|
|
url = f"{BRAIN_API}?limit={BRAIN_LIMIT}"
|
|
print(f"[brain] Fetching memories from {url} ...")
|
|
|
|
try:
|
|
req = Request(url, headers={"Accept": "application/json"})
|
|
with urlopen(req, timeout=BRAIN_TIMEOUT_S) as resp:
|
|
data = json.loads(resp.read().decode("utf-8"))
|
|
except (URLError, OSError, json.JSONDecodeError, TimeoutError) as exc:
|
|
print(f"[brain] Connection failed ({type(exc).__name__}: {exc}). "
|
|
"Falling back to local-only sources.")
|
|
return []
|
|
|
|
memories = data if isinstance(data, list) else data.get("memories", [])
|
|
print(f"[brain] Received {len(memories)} memories.")
|
|
|
|
records = []
|
|
for mem in memories:
|
|
text = mem.get("content") or mem.get("text") or mem.get("value", "")
|
|
if not text or not text.strip():
|
|
continue
|
|
|
|
# Quality score from brain API confidence, default 0.7
|
|
confidence = mem.get("confidence", mem.get("quality", 0.7))
|
|
try:
|
|
quality = float(confidence)
|
|
except (TypeError, ValueError):
|
|
quality = 0.7
|
|
|
|
provenance = mem.get("url") or mem.get("source_url") or "pi.ruv.io/brain"
|
|
created = mem.get("created_at") or mem.get("timestamp") or datetime.now(timezone.utc).isoformat()
|
|
|
|
records.append(make_record(
|
|
source="brain",
|
|
text=text.strip(),
|
|
license_id="apache-2.0",
|
|
quality_score=quality,
|
|
provenance=provenance,
|
|
created_at=created,
|
|
))
|
|
|
|
return records
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Source 2: ADR corpus from docs/adr/
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def load_adr_corpus() -> list[dict]:
|
|
"""Read all ADR markdown files and convert to training records."""
|
|
if not ADR_DIR.is_dir():
|
|
print(f"[adr] Directory not found: {ADR_DIR}")
|
|
return []
|
|
|
|
adr_files = sorted(ADR_DIR.glob("*.md"))
|
|
print(f"[adr] Found {len(adr_files)} ADR files in {ADR_DIR}")
|
|
|
|
records = []
|
|
for adr_path in adr_files:
|
|
try:
|
|
text = adr_path.read_text(encoding="utf-8")
|
|
except (OSError, UnicodeDecodeError) as exc:
|
|
print(f"[adr] Skipping {adr_path.name}: {exc}")
|
|
continue
|
|
|
|
if not text.strip():
|
|
continue
|
|
|
|
# ADRs are project-owned, MIT, quality = 1.0 per ADR-129 Section 2.2
|
|
records.append(make_record(
|
|
source="adr",
|
|
text=text.strip(),
|
|
license_id="mit",
|
|
quality_score=1.0,
|
|
provenance=f"docs/adr/{adr_path.name}",
|
|
))
|
|
|
|
return records
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Source 3: Claude Flow routing dataset reference
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def routing_dataset_reference() -> list[dict]:
|
|
"""
|
|
Output a reference record for the HuggingFace routing dataset.
|
|
|
|
The actual dataset (ruvnet/claude-flow-routing, 2700+ examples) is not
|
|
downloaded here -- it should be fetched via `datasets` library or
|
|
`huggingface-cli` during the actual training pipeline. This record serves
|
|
as a corpus manifest entry so the dataset is tracked in provenance.
|
|
"""
|
|
ref_text = (
|
|
"Claude Flow routing dataset — 2,700+ examples of agent routing decisions. "
|
|
"Source: HuggingFace dataset ruvnet/claude-flow-routing. "
|
|
"This is a reference record; fetch the full dataset via "
|
|
"`datasets.load_dataset('ruvnet/claude-flow-routing')` during training."
|
|
)
|
|
return [make_record(
|
|
source="claude-routing",
|
|
text=ref_text,
|
|
license_id="apache-2.0",
|
|
quality_score=1.0,
|
|
provenance="https://huggingface.co/datasets/ruvnet/claude-flow-routing",
|
|
)]
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Governance: dedup, quality filter, validation, statistics
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def deduplicate(records: list[dict]) -> list[dict]:
|
|
"""SHA-256 content-hash dedup at record level (ADR-129 Section 2.2)."""
|
|
seen: set[str] = set()
|
|
unique = []
|
|
dupes = 0
|
|
for rec in records:
|
|
h = rec["content_hash"]
|
|
if h in seen:
|
|
dupes += 1
|
|
continue
|
|
seen.add(h)
|
|
unique.append(rec)
|
|
if dupes:
|
|
print(f"[dedup] Removed {dupes} duplicate records by content hash.")
|
|
return unique
|
|
|
|
|
|
def quality_filter(records: list[dict]) -> list[dict]:
|
|
"""Exclude records with quality_score < 0.5 (ADR-129 Section 2.2)."""
|
|
before = len(records)
|
|
filtered = [r for r in records if r["quality_score"] >= QUALITY_THRESHOLD]
|
|
removed = before - len(filtered)
|
|
if removed:
|
|
print(f"[quality] Excluded {removed} records below quality threshold {QUALITY_THRESHOLD}.")
|
|
return filtered
|
|
|
|
|
|
def validate_all(records: list[dict]) -> list[dict]:
|
|
"""Validate all records, dropping invalid ones with warnings."""
|
|
valid = []
|
|
for rec in records:
|
|
errors = validate_record(rec)
|
|
if errors:
|
|
print(f"[validate] Dropping record {rec.get('id', '?')}: {errors}")
|
|
else:
|
|
valid.append(rec)
|
|
return valid
|
|
|
|
|
|
def compute_statistics(records: list[dict]) -> dict:
|
|
"""Compute corpus statistics as required by ADR-129 Section 2.2."""
|
|
source_counts: Counter = Counter()
|
|
source_tokens: Counter = Counter()
|
|
quality_bins = defaultdict(int) # 0.0-0.1, 0.1-0.2, ..., 0.9-1.0
|
|
|
|
total_tokens = 0
|
|
for rec in records:
|
|
src = rec["source"]
|
|
tokens = estimate_tokens(rec["text"])
|
|
source_counts[src] += 1
|
|
source_tokens[src] += tokens
|
|
total_tokens += tokens
|
|
|
|
# Histogram bin
|
|
bin_idx = min(int(rec["quality_score"] * 10), 9)
|
|
bin_label = f"{bin_idx/10:.1f}-{(bin_idx+1)/10:.1f}"
|
|
quality_bins[bin_label] += 1
|
|
|
|
# Sort bins
|
|
quality_histogram = dict(sorted(quality_bins.items()))
|
|
|
|
stats = {
|
|
"total_records": len(records),
|
|
"total_estimated_tokens": total_tokens,
|
|
"per_source": {
|
|
src: {"count": source_counts[src], "estimated_tokens": source_tokens[src]}
|
|
for src in sorted(source_counts)
|
|
},
|
|
"quality_histogram": quality_histogram,
|
|
"exported_at": datetime.now(timezone.utc).isoformat(),
|
|
}
|
|
return stats
|
|
|
|
|
|
def print_statistics(stats: dict) -> None:
|
|
"""Pretty-print corpus statistics."""
|
|
print("\n" + "=" * 60)
|
|
print("CORPUS STATISTICS")
|
|
print("=" * 60)
|
|
print(f"Total records: {stats['total_records']}")
|
|
print(f"Total estimated tokens: {stats['total_estimated_tokens']:,}")
|
|
print()
|
|
print("Per source:")
|
|
for src, info in stats["per_source"].items():
|
|
print(f" {src:20s} {info['count']:6d} records {info['estimated_tokens']:>10,} tokens")
|
|
print()
|
|
print("Quality histogram:")
|
|
for bin_label, count in stats["quality_histogram"].items():
|
|
bar = "#" * min(count, 60)
|
|
print(f" [{bin_label}] {count:5d} {bar}")
|
|
print("=" * 60)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Main
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def main() -> None:
|
|
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Collect from all sources
|
|
print("Collecting training data (ADR-129 Section 2.2 governance)...\n")
|
|
|
|
records: list[dict] = []
|
|
records.extend(fetch_brain_memories())
|
|
records.extend(load_adr_corpus())
|
|
records.extend(routing_dataset_reference())
|
|
|
|
print(f"\n[total] Collected {len(records)} raw records.")
|
|
|
|
# Governance pipeline
|
|
records = validate_all(records)
|
|
records = deduplicate(records)
|
|
records = quality_filter(records)
|
|
|
|
print(f"[final] {len(records)} records after governance pipeline.")
|
|
|
|
# Write JSONL
|
|
with open(OUTPUT_FILE, "w", encoding="utf-8") as fh:
|
|
for rec in records:
|
|
fh.write(json.dumps(rec, ensure_ascii=False) + "\n")
|
|
|
|
print(f"\nCorpus written to: {OUTPUT_FILE}")
|
|
|
|
# Statistics
|
|
stats = compute_statistics(records)
|
|
print_statistics(stats)
|
|
|
|
# Write stats sidecar
|
|
stats_file = OUTPUT_DIR / "corpus_stats.json"
|
|
with open(stats_file, "w", encoding="utf-8") as fh:
|
|
json.dump(stats, fh, indent=2, ensure_ascii=False)
|
|
print(f"Statistics written to: {stats_file}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|