From 6990be7631428d83826c4a9921a3a086c23e913a Mon Sep 17 00:00:00 2001 From: rUv Date: Fri, 27 Feb 2026 15:07:31 +0000 Subject: [PATCH] feat: implement rvf-federation crate for federated transfer learning Implements ADR-057 with 7 modules (2,940 lines, 54 tests): - types: 4 new segment types (FederatedManifest 0x33, DiffPrivacyProof 0x34, RedactionLog 0x35, AggregateWeights 0x36) - pii_strip: 3-stage pipeline (detect, redact, attest) with 12 regex rules - diff_privacy: Gaussian/Laplace noise, RDP accountant, gradient clipping - federation: ExportBuilder + ImportMerger with version-aware conflict resolution - aggregate: FedAvg, FedProx, Byzantine-tolerant weighted averaging - policy: FederationPolicy for selective sharing with allow/deny lists - error: 15 typed error variants Also updates rvf-types with 4 new segment discriminants (0x33-0x36), workspace Cargo.toml, and root README (crate count, segment count, federated learning code example). Co-Authored-By: claude-flow --- Cargo.lock | 13 + Cargo.toml | 1 + README.md | 29 +- crates/rvf/Cargo.lock | 115 +++++ crates/rvf/Cargo.toml | 1 + crates/rvf/rvf-federation/Cargo.toml | 32 ++ crates/rvf/rvf-federation/README.md | 333 ++++++++++++ .../benches/federation_bench.rs | 213 ++++++++ crates/rvf/rvf-federation/src/aggregate.rs | 420 +++++++++++++++ crates/rvf/rvf-federation/src/diff_privacy.rs | 416 +++++++++++++++ crates/rvf/rvf-federation/src/error.rs | 52 ++ crates/rvf/rvf-federation/src/federation.rs | 477 ++++++++++++++++++ crates/rvf/rvf-federation/src/lib.rs | 24 + crates/rvf/rvf-federation/src/pii_strip.rs | 354 +++++++++++++ crates/rvf/rvf-federation/src/policy.rs | 193 +++++++ crates/rvf/rvf-federation/src/types.rs | 426 ++++++++++++++++ crates/rvf/rvf-types/src/segment_type.rs | 30 +- 17 files changed, 3118 insertions(+), 11 deletions(-) create mode 100644 crates/rvf/rvf-federation/Cargo.toml create mode 100644 crates/rvf/rvf-federation/README.md create mode 100644 crates/rvf/rvf-federation/benches/federation_bench.rs create mode 100644 crates/rvf/rvf-federation/src/aggregate.rs create mode 100644 crates/rvf/rvf-federation/src/diff_privacy.rs create mode 100644 crates/rvf/rvf-federation/src/error.rs create mode 100644 crates/rvf/rvf-federation/src/federation.rs create mode 100644 crates/rvf/rvf-federation/src/lib.rs create mode 100644 crates/rvf/rvf-federation/src/pii_strip.rs create mode 100644 crates/rvf/rvf-federation/src/policy.rs create mode 100644 crates/rvf/rvf-federation/src/types.rs diff --git a/Cargo.lock b/Cargo.lock index 63215ef4e..91c03d706 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9672,6 +9672,19 @@ dependencies = [ "tempfile", ] +[[package]] +name = "rvf-federation" +version = "0.1.0" +dependencies = [ + "criterion 0.5.1", + "rand 0.8.5", + "rand_distr 0.4.3", + "regex", + "serde", + "sha3", + "thiserror 2.0.18", +] + [[package]] name = "rvf-import" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index e16a7ea51..f130f7875 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -98,6 +98,7 @@ members = [ "crates/rvf/rvf-server", "crates/rvf/rvf-import", "crates/rvf/rvf-cli", + "crates/rvf/rvf-federation", "crates/rvf/tests/rvf-integration", "crates/rvf/benches", "crates/ruvector-coherence", diff --git a/README.md b/README.md index 468a73b9e..468739bfd 100644 --- a/README.md +++ b/README.md @@ -498,9 +498,9 @@ npx @ruvector/rvf-mcp-server --transport stdio # MCP server for AI agents | Tamper-evident audit | Hash-linked witness chain for every insert, query, and deletion | | Post-quantum signatures | ML-DSA-65 and Ed25519 signing on every segment | | DNA-style lineage | Parent/child derivation chains with cryptographic verification | -| 24 segment types | VEC, INDEX, KERNEL, EBPF, WASM, COW_MAP, WITNESS, CRYPTO, and 16 more | +| 28 segment types | VEC, INDEX, KERNEL, EBPF, WASM, COW_MAP, WITNESS, CRYPTO, FEDERATED_MANIFEST, and 19 more | -**Rust crates** (22): [`rvf-types`](https://crates.io/crates/rvf-types) `rvf-wire` `rvf-manifest` `rvf-quant` `rvf-index` `rvf-crypto` [`rvf-runtime`](https://crates.io/crates/rvf-runtime) `rvf-kernel` `rvf-ebpf` `rvf-launch` `rvf-server` `rvf-import` [`rvf-cli`](https://crates.io/crates/rvf-cli) `rvf-wasm` `rvf-solver-wasm` `rvf-node` + 6 adapters (claude-flow, agentdb, ospipe, agentic-flow, rvlite, sona) +**Rust crates** (23): [`rvf-types`](https://crates.io/crates/rvf-types) `rvf-wire` `rvf-manifest` `rvf-quant` `rvf-index` `rvf-crypto` [`rvf-runtime`](https://crates.io/crates/rvf-runtime) `rvf-kernel` `rvf-ebpf` [`rvf-federation`](./crates/rvf/rvf-federation) `rvf-launch` `rvf-server` `rvf-import` [`rvf-cli`](https://crates.io/crates/rvf-cli) `rvf-wasm` `rvf-solver-wasm` `rvf-node` + 6 adapters (claude-flow, agentdb, ospipe, agentic-flow, rvlite, sona) **npm packages** (4): [`@ruvector/rvf`](https://www.npmjs.com/package/@ruvector/rvf) [`@ruvector/rvf-node`](https://www.npmjs.com/package/@ruvector/rvf-node) [`@ruvector/rvf-wasm`](https://www.npmjs.com/package/@ruvector/rvf-wasm) [`@ruvector/rvf-mcp-server`](https://www.npmjs.com/package/@ruvector/rvf-mcp-server) @@ -1624,12 +1624,13 @@ let syndrome = gate.assess_coherence(&quantum_state)?; | [rvf-runtime](./crates/rvf/rvf-runtime) | Full store API, COW engine, compaction | [![crates.io](https://img.shields.io/crates/v/rvf-runtime.svg)](https://crates.io/crates/rvf-runtime) | | [rvf-kernel](./crates/rvf/rvf-kernel) | Linux kernel builder, initramfs, Docker pipeline | [![crates.io](https://img.shields.io/crates/v/rvf-kernel.svg)](https://crates.io/crates/rvf-kernel) | | [rvf-ebpf](./crates/rvf/rvf-ebpf) | Real BPF programs (XDP, socket filter, TC) | [![crates.io](https://img.shields.io/crates/v/rvf-ebpf.svg)](https://crates.io/crates/rvf-ebpf) | +| [rvf-federation](./crates/rvf/rvf-federation) | Federated transfer learning — PII stripping, differential privacy, FedAvg/FedProx | [![crates.io](https://img.shields.io/crates/v/rvf-federation.svg)](https://crates.io/crates/rvf-federation) | | [rvf-launch](./crates/rvf/rvf-launch) | QEMU microvm launcher, KVM/TCG | [![crates.io](https://img.shields.io/crates/v/rvf-launch.svg)](https://crates.io/crates/rvf-launch) | | [rvf-server](./crates/rvf/rvf-server) | HTTP REST + TCP streaming server | [![crates.io](https://img.shields.io/crates/v/rvf-server.svg)](https://crates.io/crates/rvf-server) | | [rvf-import](./crates/rvf/rvf-import) | JSON, CSV, NumPy importers | [![crates.io](https://img.shields.io/crates/v/rvf-import.svg)](https://crates.io/crates/rvf-import) | | [rvf-cli](./crates/rvf/rvf-cli) | Unified CLI with 17 subcommands | [![crates.io](https://img.shields.io/crates/v/rvf-cli.svg)](https://crates.io/crates/rvf-cli) | -**RVF Features:** Single-file cognitive containers that boot as Linux microservices, COW branching at cluster granularity, eBPF acceleration, witness chains, post-quantum signatures, 24 segment types. [Full README](./crates/rvf/README.md) +**RVF Features:** Single-file cognitive containers that boot as Linux microservices, COW branching at cluster granularity, eBPF acceleration, witness chains, post-quantum signatures, federated transfer learning with differential privacy, 28 segment types. [Full README](./crates/rvf/README.md) ### Formal Verification @@ -3152,16 +3153,24 @@ println!("{}", response.text); ### Federated Learning ```rust -// Ephemeral agents collect trajectories -let agent = EphemeralAgent::new("task-specific-agent"); -agent.process_task(&task).await?; -let export = agent.export(); +use rvf_federation::{ExportBuilder, DiffPrivacyEngine, FederationPolicy}; -// Central coordinator aggregates learning -coordinator.accept_export(export)?; -coordinator.consolidate(); // Share patterns with new agents +// Build a privacy-preserving federated export +let mut dp = DiffPrivacyEngine::gaussian(1.0, 1e-5, 1.0, 10.0)?; +let export = ExportBuilder::new("contributor_pseudo".into(), "code_review".into()) + .add_priors(local_engine.extract_priors()) + .add_weights(sona_weights) + .with_policy(FederationPolicy::default()) // quality gate + min observations + .build(&mut dp)?; // PII strip → DP noise → manifest + +// Import and merge federated learning from another contributor +let merger = ImportMerger::new(); +merger.validate(&remote_export)?; // signature + witness chain check +merger.merge_priors(&mut local, &remote_export.priors, 1); // version-aware merge ``` +See [`rvf-federation`](./crates/rvf/rvf-federation) for FedAvg/FedProx aggregation, Byzantine tolerance, RDP privacy accounting, and PII stripping pipeline. + ### Dynamic Embedding Fine-Tuning RuvLLM's adaptive learning system enables real-time model improvement without retraining. diff --git a/crates/rvf/Cargo.lock b/crates/rvf/Cargo.lock index 8eb90a4ce..b4d8c8e88 100644 --- a/crates/rvf/Cargo.lock +++ b/crates/rvf/Cargo.lock @@ -135,6 +135,7 @@ checksum = "edca88bc138befd0323b20752846e6587272d3b03b0343c8ea28a6f819e6e71f" dependencies = [ "async-trait", "axum-core", + "base64", "bytes", "futures-util", "http 1.4.0", @@ -153,8 +154,10 @@ dependencies = [ "serde_json", "serde_path_to_error", "serde_urlencoded", + "sha1", "sync_wrapper", "tokio", + "tokio-tungstenite", "tower", "tower-layer", "tower-service", @@ -212,6 +215,12 @@ dependencies = [ "url", ] +[[package]] +name = "base64" +version = "0.22.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6" + [[package]] name = "base64ct" version = "1.8.3" @@ -248,6 +257,12 @@ version = "3.19.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5dd9dc738b7a8311c7ade152424974d8115f2cdad61e8dab8dac9f2362298510" +[[package]] +name = "byteorder" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" + [[package]] name = "bytes" version = "1.11.1" @@ -549,6 +564,12 @@ dependencies = [ "syn", ] +[[package]] +name = "data-encoding" +version = "2.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d7a1e2f27636f116493b8b860f5546edb47c8d8f8ea73e1d2a20be88e28d1fea" + [[package]] name = "der" version = "0.7.10" @@ -729,6 +750,12 @@ version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6" +[[package]] +name = "futures-sink" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c39754e157331b013978ec91992bde1ac089843443c49cbc7f46150b0fad0893" + [[package]] name = "futures-task" version = "0.3.31" @@ -743,6 +770,7 @@ checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81" dependencies = [ "futures-core", "futures-io", + "futures-sink", "futures-task", "memchr", "pin-project-lite", @@ -1272,6 +1300,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "071dfc062690e90b734c0b2273ce72ad0ffa95f0c74596bc250dcfd960262841" dependencies = [ "autocfg", + "libm", ] [[package]] @@ -1484,6 +1513,16 @@ dependencies = [ "getrandom 0.2.17", ] +[[package]] +name = "rand_distr" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32cb0b9bc82b0a0876c2dd994a7e7a2683d3e7390ca40e6886785ef0c7e3ee31" +dependencies = [ + "num-traits", + "rand", +] + [[package]] name = "rayon" version = "1.11.0" @@ -1706,6 +1745,19 @@ dependencies = [ "tempfile", ] +[[package]] +name = "rvf-federation" +version = "0.1.0" +dependencies = [ + "criterion", + "rand", + "rand_distr", + "regex", + "serde", + "sha3", + "thiserror 2.0.18", +] + [[package]] name = "rvf-import" version = "0.1.0" @@ -1810,6 +1862,7 @@ dependencies = [ "axum-test", "clap", "http-body-util", + "mime_guess", "rvf-runtime", "rvf-types", "serde", @@ -1817,6 +1870,7 @@ dependencies = [ "tempfile", "tokio", "tower", + "tower-http", ] [[package]] @@ -1952,6 +2006,17 @@ dependencies = [ "serde", ] +[[package]] +name = "sha1" +version = "0.10.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e3bf829a2d51ab4a5ddf1352d8470c140cadc8301b2ae1789db023f01cedd6ba" +dependencies = [ + "cfg-if", + "cpufeatures", + "digest", +] + [[package]] name = "sha2" version = "0.10.9" @@ -2208,6 +2273,18 @@ dependencies = [ "syn", ] +[[package]] +name = "tokio-tungstenite" +version = "0.24.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "edc5f74e248dc973e0dbb7b74c7e0d6fcc301c694ff50049504004ef4d0cdcd9" +dependencies = [ + "futures-util", + "log", + "tokio", + "tungstenite", +] + [[package]] name = "tower" version = "0.5.3" @@ -2224,6 +2301,20 @@ dependencies = [ "tracing", ] +[[package]] +name = "tower-http" +version = "0.6.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d4e6559d53cc268e5031cd8429d05415bc4cb4aefc4aa5d6cc35fbf5b924a1f8" +dependencies = [ + "bitflags", + "bytes", + "http 1.4.0", + "pin-project-lite", + "tower-layer", + "tower-service", +] + [[package]] name = "tower-layer" version = "0.3.3" @@ -2262,6 +2353,24 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" +[[package]] +name = "tungstenite" +version = "0.24.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "18e5b8366ee7a95b16d32197d0b2604b43a0be89dc5fac9f8e96ccafbaedda8a" +dependencies = [ + "byteorder", + "bytes", + "data-encoding", + "http 1.4.0", + "httparse", + "log", + "rand", + "sha1", + "thiserror 1.0.69", + "utf-8", +] + [[package]] name = "typenum" version = "1.19.0" @@ -2304,6 +2413,12 @@ dependencies = [ "serde", ] +[[package]] +name = "utf-8" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9" + [[package]] name = "utf8_iter" version = "1.0.4" diff --git a/crates/rvf/Cargo.toml b/crates/rvf/Cargo.toml index 3a834ac3c..ab22d8b08 100644 --- a/crates/rvf/Cargo.toml +++ b/crates/rvf/Cargo.toml @@ -25,6 +25,7 @@ members = [ "rvf-cli", "tests/rvf-integration", "benches", + "rvf-federation", ] [workspace.package] diff --git a/crates/rvf/rvf-federation/Cargo.toml b/crates/rvf/rvf-federation/Cargo.toml new file mode 100644 index 000000000..47454f62e --- /dev/null +++ b/crates/rvf/rvf-federation/Cargo.toml @@ -0,0 +1,32 @@ +[package] +name = "rvf-federation" +version = "0.1.0" +edition = "2021" +description = "Federated RVF transfer learning -- PII stripping, differential privacy, federated averaging" +license = "MIT OR Apache-2.0" +repository = "https://github.com/ruvnet/ruvector" +homepage = "https://github.com/ruvnet/ruvector" +readme = "README.md" +categories = ["science", "cryptography"] +keywords = ["federated-learning", "differential-privacy", "transfer-learning", "rvf"] +rust-version = "1.87" + +[features] +default = ["std"] +std = [] +serde = ["dep:serde"] + +[dependencies] +serde = { version = "1", default-features = false, features = ["derive"], optional = true } +sha3 = { version = "0.10", default-features = false } +rand = { version = "0.8", default-features = false, features = ["std", "std_rng"] } +rand_distr = { version = "0.4", default-features = false } +regex = "1" +thiserror = "2" + +[dev-dependencies] +criterion = { version = "0.5", features = ["html_reports"] } + +[[bench]] +name = "federation_bench" +harness = false diff --git a/crates/rvf/rvf-federation/README.md b/crates/rvf/rvf-federation/README.md new file mode 100644 index 000000000..ad1d9c068 --- /dev/null +++ b/crates/rvf/rvf-federation/README.md @@ -0,0 +1,333 @@ +# rvf-federation + +[![Crates.io](https://img.shields.io/crates/v/rvf-federation.svg)](https://crates.io/crates/rvf-federation) +[![docs.rs](https://img.shields.io/docsrs/rvf-federation)](https://docs.rs/rvf-federation) +[![License: MIT OR Apache-2.0](https://img.shields.io/badge/License-MIT%20OR%20Apache--2.0-blue.svg)](https://opensource.org/licenses/MIT) +[![Rust 1.87+](https://img.shields.io/badge/rust-1.87%2B-orange.svg)](https://www.rust-lang.org) + +**Privacy-preserving federated transfer learning for the RVF format.** + +```toml +rvf-federation = "0.1" +``` + +RuVector users independently accumulate learning patterns -- SONA weight trajectories, policy kernel configurations, domain expansion priors, HNSW tuning parameters. Today that learning is siloed. `rvf-federation` implements the inter-user federation layer defined in [ADR-057](../../../docs/adr/ADR-057-federated-rvf-transfer-learning.md): it strips PII, injects differential privacy noise, packages transferable learning as RVF segments, and merges incoming learning with formal privacy guarantees. + +| | rvf-federation | Siloed learning | Manual sharing | +|---|---|---|---| +| **Privacy** | 3-stage PII stripping + calibrated DP noise | N/A -- nothing leaves the machine | Trust the sender | +| **Knowledge reuse** | New users bootstrap from community priors | Every deployment starts cold | Copy-paste config files | +| **Integrity** | Witness chain + Ed25519/ML-DSA-65 signatures | N/A | No verification | +| **Aggregation** | FedAvg, FedProx, Byzantine-tolerant averaging | N/A | Manual merge | +| **Privacy accounting** | RDP composition with formal epsilon budget | N/A | N/A | + +## Quick Start + +```rust +use rvf_federation::{ + ExportBuilder, DiffPrivacyEngine, FederationPolicy, + TransferPriorSet, TransferPriorEntry, BetaParams, +}; + +// 1. Build an export from local learning +let priors = TransferPriorSet { + source_domain: "code_review".into(), + entries: vec![TransferPriorEntry { + bucket_id: "medium_algorithm".into(), + arm_id: "arm_0".into(), + params: BetaParams::new(10.0, 5.0), + observation_count: 50, + }], + cost_ema: 0.85, +}; + +// 2. Configure differential privacy (epsilon=1.0, delta=1e-5) +let mut dp = DiffPrivacyEngine::gaussian(1.0, 1e-5, 1.0, 1.0).unwrap(); + +// 3. Build: PII strip -> DP noise -> assemble manifest +let export = ExportBuilder::new("alice_pseudo".into(), "code_review".into()) + .with_policy(FederationPolicy::default()) + .add_priors(priors) + .add_string_field("config_path".into(), "/home/alice/project/.config".into()) + .build(&mut dp) + .unwrap(); + +assert_eq!(export.manifest.format_version, 1); +assert!(export.redaction_log.total_redactions >= 1); // PII was stripped +assert!(export.privacy_proof.epsilon > 0.0); // DP noise was applied +``` + +## Key Features + +| Feature | What It Does | Why It Matters | +|---|---|---| +| **PII stripping** | 3-stage pipeline: detect, redact, attest | No personal data leaves the local machine | +| **Differential privacy** | Gaussian/Laplace noise with RDP accounting | Formal mathematical privacy guarantee per export | +| **Gradient clipping** | Bound L2 norms before aggregation | Limits any single user's influence on the aggregate | +| **FedAvg / FedProx** | Federated averaging with optional proximal term | Industry-standard aggregation (McMahan et al. 2017) | +| **Byzantine tolerance** | Outlier detection by L2-norm z-score | Malicious contributions are excluded automatically | +| **Version-aware merging** | Dampened confidence for cross-version imports | Older learning still helps, with reduced weight | +| **Selective sharing** | Allowlist/denylist for segments and domains | Users control exactly what they share | + +## Architecture + +``` +Local Engine Remote + +------------------+ +------------+ +---------+ +----------+ + | TransferPriors |--->| |--->| |---->| | + | PolicyKernels | | PII Strip | | DP | | RVF | Registry + | CostCurves | | (3-stage) | | Noise | | Export |----> (GCS) + | LoRA Weights | | | | | | Builder | | + +------------------+ +------------+ +---------+ +----------+ | + v + +------------------+ +------------+ +---------+ +----------+ +--------+ + | Merged Learning |<---| Version- |<---| Import |<----| Validate |<-| Import | + | (local engines) | | Aware | | Merger | | (sig + | | (pull) | + | | | Merge | | | | witness) | +--------+ + +------------------+ +------------+ +---------+ +----------+ +``` + +## Modules + +| Module | Description | +|---|---| +| `types` | Four new RVF segment payload types (0x33-0x36) plus federation data structures | +| `error` | 15 error variants covering privacy, validation, aggregation, and I/O failures | +| `pii_strip` | Three-stage PII stripping pipeline with 12 built-in detection rules | +| `diff_privacy` | Gaussian/Laplace noise engines, gradient clipping, RDP privacy accountant | +| `federation` | `ExportBuilder` and `ImportMerger` implementing the ADR-057 transfer protocol | +| `aggregate` | `FederatedAggregator` with FedAvg, FedProx, and Byzantine-tolerant strategies | +| `policy` | `FederationPolicy` for selective sharing with allowlists, denylists, and rate limits | + +## Segment Types + +Four new RVF segment types extend the `0x30-0x32` domain expansion range: + +| Code | Name | Purpose | +|---|---|---| +| `0x33` | `FederatedManifest` | Describes the export: contributor pseudonym, timestamp, included segments, privacy budget spent | +| `0x34` | `DiffPrivacyProof` | Privacy attestation: epsilon/delta, mechanism, sensitivity, clipping norm, noise scale | +| `0x35` | `RedactionLog` | PII stripping attestation: redaction counts by category, pre-redaction content hash, rules fired | +| `0x36` | `AggregateWeights` | Federated-averaged LoRA deltas with participation count, round number, confidence scores | + +Readers that do not recognize these segment types skip them per the RVF forward-compatibility rule. Existing `TransferPrior (0x30)`, `PolicyKernel (0x31)`, `CostCurve (0x32)`, `Witness`, and `Crypto` segments are reused as-is. + +## PII Stripping Pipeline + +`PiiStripper` runs a three-stage pipeline on every string field before it leaves the local machine. + +**Stage 1 -- Detection.** Twelve built-in regex rules scan for: + +- Unix and Windows file paths (`/home/user/...`, `C:\Users\...`) +- IPv4 and IPv6 addresses +- Email addresses +- API keys (`sk-...`, `AKIA...`, `ghp_...`, Bearer tokens) +- Environment variable references (`$HOME`, `%USERPROFILE%`) +- Usernames (`@handle`) + +Custom rules can be registered with `add_rule()`. + +**Stage 2 -- Redaction.** Detected PII is replaced with deterministic pseudonyms (``, ``, ``). The same original value always maps to the same pseudonym within a single export, preserving structural relationships without revealing content. + +**Stage 3 -- Attestation.** A `RedactionLog (0x35)` segment is generated containing redaction counts by category, the SHAKE-256 hash of the pre-redaction content (proves scanning happened without revealing it), and the rules that fired. + +```rust +use rvf_federation::PiiStripper; + +let mut stripper = PiiStripper::new(); +let fields = vec![ + ("config", "/home/alice/project/.env"), + ("server", "connecting to 10.0.0.1:8080"), + ("note", "no pii here"), +]; +let (redacted, log) = stripper.strip_fields(&fields); +assert_eq!(log.fields_scanned, 3); +assert!(log.total_redactions >= 2); +assert!(redacted[2].1 == "no pii here"); // clean fields pass through +``` + +## Differential Privacy + +### Noise Mechanisms + +| Mechanism | Privacy Model | Noise Distribution | Use Case | +|---|---|---|---| +| Gaussian | (epsilon, delta)-DP | N(0, sigma^2) where sigma = S * sqrt(2 ln(1.25/delta)) / epsilon | Default; tighter for large parameter counts | +| Laplace | Pure epsilon-DP | Laplace(0, S/epsilon) | Stronger guarantee; no delta term | + +### Gradient Clipping + +Before noise injection, all parameter vectors are clipped to a configurable L2 norm bound. This limits the sensitivity of the aggregation to any single user's contribution. + +### Privacy Accountant + +`PrivacyAccountant` tracks cumulative privacy loss using Renyi Differential Privacy (RDP) composition across 16 alpha orders. RDP composition is tighter than naive (epsilon, delta)-DP composition, meaning more exports fit within the same budget. + +```rust +use rvf_federation::PrivacyAccountant; + +let mut accountant = PrivacyAccountant::new(10.0, 1e-5); // budget: eps=10, delta=1e-5 +accountant.record_gaussian(1.0, 1.0, 1e-5, 100); +assert!(accountant.remaining_budget() > 0.0); +assert!(!accountant.is_exhausted()); +``` + +## Federation Strategies + +| Strategy | Algorithm | Weighting | When to Use | +|---|---|---|---| +| `FedAvg` | Federated Averaging (McMahan et al.) | Trajectory count | Default; most scenarios | +| `FedProx` | Proximal regularization | Trajectory count + mu penalty | Heterogeneous data distributions | +| `WeightedAverage` | Simple weighted mean | Quality/reputation score | When contributor reputation varies widely | +| Byzantine detection | L2-norm z-score filtering | Outliers > 2 std removed | Always runs before aggregation | + +```rust +use rvf_federation::{FederatedAggregator, AggregationStrategy}; +use rvf_federation::aggregate::Contribution; + +let mut agg = FederatedAggregator::new("code_review".into(), AggregationStrategy::FedAvg) + .with_min_contributions(2) + .with_byzantine_threshold(2.0); + +agg.add_contribution(Contribution { + contributor: "alice".into(), + weights: vec![1.0, 2.0, 3.0], + quality_weight: 0.9, + trajectory_count: 100, +}); +agg.add_contribution(Contribution { + contributor: "bob".into(), + weights: vec![1.2, 1.8, 3.1], + quality_weight: 0.85, + trajectory_count: 80, +}); + +let result = agg.aggregate().unwrap(); +assert_eq!(result.participation_count, 2); +assert_eq!(result.lora_deltas.len(), 3); +``` + +## Performance Benchmarks + +Measured on an AMD64 Linux system with Criterion. + +| Benchmark | Time | +|---|---| +| PII detect (single string) | 756 ns | +| PII strip (10 fields) | 44 us | +| PII strip (100 fields) | 303 us | +| Gaussian noise (100 params) | 4.7 us | +| Gaussian noise (10k params) | 334 us | +| Gradient clipping (1k params) | 487 ns | +| Privacy accountant (100 rounds) | 1.0 us | +| FedAvg (10 contrib, 100 dim) | 3.9 us | +| FedAvg (100 contrib, 1k dim) | 365 us | +| Byzantine detection (50 contrib) | 12 us | +| Full export pipeline | 1.2 ms | +| Merge 100 priors | 28 us | + +## Feature Flags + +| Flag | Default | What It Enables | +|---|---|---| +| `std` | Yes | Standard library support (required) | +| `serde` | No | Derive `Serialize`/`Deserialize` on all public types | + +```toml +[dependencies] +rvf-federation = { version = "0.1", features = ["serde"] } +``` + +## API Overview + +### Core Types + +| Type | Description | +|---|---| +| `FederatedManifest` | Export metadata: contributor pseudonym, domain, timestamp, privacy budget spent | +| `DiffPrivacyProof` | Privacy attestation: epsilon, delta, mechanism, sensitivity, noise scale | +| `RedactionLog` | PII stripping attestation: entries by category, pre-redaction hash, field count | +| `AggregateWeights` | Federated-averaged LoRA deltas with round number, participation count, confidences | +| `BetaParams` | Beta distribution parameters for Thompson Sampling priors (merge, dampen, mean) | + +### Transfer Types + +| Type | Description | +|---|---| +| `TransferPriorEntry` | Single context bucket prior: bucket ID, arm ID, Beta params, observation count | +| `TransferPriorSet` | Collection of priors from a trained domain with cost EMA | +| `PolicyKernelSnapshot` | Snapshot of tunable policy knob values with fitness score | +| `CostCurveSnapshot` | Ordered (step, cost) points with acceleration factor | + +### Aggregation Types + +| Type | Description | +|---|---| +| `FederatedAggregator` | Aggregation server: collects contributions, detects outliers, produces `AggregateWeights` | +| `AggregationStrategy` | `FedAvg`, `FedProx { mu }`, or `WeightedAverage` | +| `Contribution` | Single participant's weight vector with quality and trajectory metadata | + +### Protocol Types + +| Type | Description | +|---|---| +| `ExportBuilder` | Builder pattern: add priors/kernels/weights, PII-strip, DP-noise, produce `FederatedExport` | +| `ImportMerger` | Validate imports, merge priors with version-aware dampening, merge weights | +| `FederatedExport` | Completed export: manifest + redaction log + privacy proof + learning data | +| `FederationPolicy` | Selective sharing: allowlists, denylists, quality gate, rate limit, privacy budget | +| `PiiStripper` | Three-stage PII pipeline: detect, redact, attest | +| `DiffPrivacyEngine` | Noise injection with Gaussian or Laplace mechanism and gradient clipping | +| `PrivacyAccountant` | RDP-based cumulative privacy loss tracker | + +### Error Types + +`FederationError` covers 15 variants: + +| Variant | Trigger | +|---|---| +| `PrivacyBudgetExhausted` | Cumulative epsilon exceeds limit | +| `InvalidEpsilon` | Epsilon <= 0 | +| `InvalidDelta` | Delta outside (0, 1) | +| `SegmentValidation` | Malformed segment data | +| `VersionMismatch` | Incompatible format version | +| `SignatureVerification` | Ed25519/ML-DSA-65 signature check failed | +| `WitnessChainBroken` | Witness chain has a gap or tampered entry | +| `InsufficientObservations` | Prior has too few observations for export | +| `QualityBelowThreshold` | Trajectory quality below policy minimum | +| `RateLimited` | Export rate limit exceeded | +| `PiiLeakDetected` | PII found after stripping (defense-in-depth) | +| `ByzantineOutlier` | Contribution flagged as adversarial | +| `InsufficientContributions` | Not enough participants for aggregation round | +| `Serialization` | Encoding/decoding failure | +| `Io` | I/O operation failure | + +## Related Crates + +| Crate | Relationship | +|---|---| +| [`rvf-types`](../rvf-types) | Core RVF segment definitions; `rvf-federation` defines its own payload types to avoid circular deps | +| [`ruvector-domain-expansion`](../../ruvector-domain-expansion) | Source of `TransferPrior`, `PolicyKernel`, `CostCurve`; federation exports these as RVF segments | +| [`sona`](../../sona) | SONA learning engine; `FederatedCoordinator` handles intra-deployment aggregation, `rvf-federation` handles inter-user | +| [`rvf-crypto`](../rvf-crypto) | Ed25519 signatures and SHAKE-256 hashing used for witness chains and segment integrity | + +## Testing + +54 tests across all modules: + +```bash +cargo test -p rvf-federation +``` + +Benchmarks: + +```bash +cargo bench -p rvf-federation +``` + +## License + +MIT OR Apache-2.0 + +--- + +Part of [RuVector](https://github.com/ruvnet/ruvector) -- the self-learning vector database. diff --git a/crates/rvf/rvf-federation/benches/federation_bench.rs b/crates/rvf/rvf-federation/benches/federation_bench.rs new file mode 100644 index 000000000..443d18ee9 --- /dev/null +++ b/crates/rvf/rvf-federation/benches/federation_bench.rs @@ -0,0 +1,213 @@ +//! Benchmarks for rvf-federation crate. + +use criterion::{criterion_group, criterion_main, Criterion, black_box}; +use rvf_federation::*; +use rvf_federation::aggregate::{FederatedAggregator, AggregationStrategy, Contribution}; +use rvf_federation::diff_privacy::{DiffPrivacyEngine, PrivacyAccountant}; +use rvf_federation::pii_strip::PiiStripper; +use rvf_federation::federation::{ExportBuilder, ImportMerger}; +use rvf_federation::policy::FederationPolicy; + +fn bench_pii_strip(c: &mut Criterion) { + let mut group = c.benchmark_group("pii_strip"); + + group.bench_function("detect_mixed_pii", |b| { + let stripper = PiiStripper::new(); + let input = "file at /home/alice/project/main.rs, ip 192.168.1.100, email alice@example.com, key sk-abcdefghijklmnopqrstuv12"; + b.iter(|| { + black_box(stripper.contains_pii(black_box(input))); + }); + }); + + group.bench_function("strip_10_fields", |b| { + let fields: Vec<(&str, &str)> = (0..10).map(|i| { + if i % 3 == 0 { + ("path", "/home/user/data/file.csv") + } else if i % 3 == 1 { + ("ip", "server at 10.0.0.1:8080") + } else { + ("clean", "no pii here at all") + } + }).collect(); + b.iter(|| { + let mut stripper = PiiStripper::new(); + black_box(stripper.strip_fields(black_box(&fields))); + }); + }); + + group.bench_function("strip_100_fields", |b| { + let fields: Vec<(&str, &str)> = (0..100).map(|i| { + if i % 5 == 0 { + ("path", "/home/user/data/file.csv") + } else { + ("clean", "just normal text content") + } + }).collect(); + b.iter(|| { + let mut stripper = PiiStripper::new(); + black_box(stripper.strip_fields(black_box(&fields))); + }); + }); + + group.finish(); +} + +fn bench_diff_privacy(c: &mut Criterion) { + let mut group = c.benchmark_group("diff_privacy"); + + group.bench_function("gaussian_noise_100_params", |b| { + b.iter(|| { + let mut engine = DiffPrivacyEngine::gaussian(1.0, 1e-5, 1.0, 10.0).unwrap().with_seed(42); + let mut params: Vec = (0..100).map(|i| i as f64 * 0.01).collect(); + black_box(engine.add_noise(black_box(&mut params))); + }); + }); + + group.bench_function("gaussian_noise_10000_params", |b| { + b.iter(|| { + let mut engine = DiffPrivacyEngine::gaussian(1.0, 1e-5, 1.0, 10.0).unwrap().with_seed(42); + let mut params: Vec = (0..10_000).map(|i| i as f64 * 0.0001).collect(); + black_box(engine.add_noise(black_box(&mut params))); + }); + }); + + group.bench_function("gradient_clipping_1000", |b| { + let engine = DiffPrivacyEngine::gaussian(1.0, 1e-5, 1.0, 1.0).unwrap(); + b.iter(|| { + let mut grads: Vec = (0..1000).map(|i| (i as f64).sin()).collect(); + engine.clip_gradients(black_box(&mut grads)); + }); + }); + + group.bench_function("privacy_accountant_100_rounds", |b| { + b.iter(|| { + let mut acc = PrivacyAccountant::new(100.0, 1e-5); + for _ in 0..100 { + acc.record_gaussian(1.0, 1.0, 1e-5, 100); + } + black_box(acc.current_epsilon()); + }); + }); + + group.finish(); +} + +fn bench_aggregation(c: &mut Criterion) { + let mut group = c.benchmark_group("aggregation"); + + group.bench_function("fedavg_10_contributors_100_dim", |b| { + b.iter(|| { + let mut agg = FederatedAggregator::new("test".into(), AggregationStrategy::FedAvg) + .with_min_contributions(2); + for i in 0..10 { + agg.add_contribution(Contribution { + contributor: format!("c_{}", i), + weights: (0..100).map(|j| (i as f64 + j as f64) * 0.01).collect(), + quality_weight: 0.8 + (i as f64) * 0.02, + trajectory_count: 100 + i * 10, + }); + } + black_box(agg.aggregate().unwrap()); + }); + }); + + group.bench_function("fedavg_100_contributors_1000_dim", |b| { + b.iter(|| { + let mut agg = FederatedAggregator::new("test".into(), AggregationStrategy::FedAvg) + .with_min_contributions(2); + for i in 0..100 { + agg.add_contribution(Contribution { + contributor: format!("c_{}", i), + weights: (0..1000).map(|j| (i as f64 + j as f64) * 0.001).collect(), + quality_weight: 0.8, + trajectory_count: 100, + }); + } + black_box(agg.aggregate().unwrap()); + }); + }); + + group.bench_function("byzantine_detection_50_contributors", |b| { + b.iter(|| { + let mut agg = FederatedAggregator::new("test".into(), AggregationStrategy::FedAvg) + .with_min_contributions(2) + .with_byzantine_threshold(2.0); + for i in 0..48 { + agg.add_contribution(Contribution { + contributor: format!("good_{}", i), + weights: vec![1.0; 50], + quality_weight: 0.9, + trajectory_count: 100, + }); + } + // Add 2 outliers + agg.add_contribution(Contribution { + contributor: "evil_1".to_string(), + weights: vec![1000.0; 50], + quality_weight: 0.9, + trajectory_count: 100, + }); + agg.add_contribution(Contribution { + contributor: "evil_2".to_string(), + weights: vec![-500.0; 50], + quality_weight: 0.9, + trajectory_count: 100, + }); + black_box(agg.aggregate().unwrap()); + }); + }); + + group.finish(); +} + +fn bench_export_import(c: &mut Criterion) { + let mut group = c.benchmark_group("export_import"); + + group.bench_function("full_export_pipeline", |b| { + b.iter(|| { + let mut dp = DiffPrivacyEngine::gaussian(1.0, 1e-5, 1.0, 10.0).unwrap().with_seed(42); + let priors = TransferPriorSet { + source_domain: "/home/user/my_domain".to_string(), + entries: (0..20).map(|i| TransferPriorEntry { + bucket_id: format!("bucket_{}", i), + arm_id: format!("arm_{}", i % 4), + params: BetaParams::new(5.0 + i as f64, 3.0 + i as f64 * 0.5), + observation_count: 50 + i * 10, + }).collect(), + cost_ema: 0.85, + }; + let export = ExportBuilder::new("pseudo".into(), "domain".into()) + .add_priors(priors) + .add_weights((0..256).map(|i| i as f64 * 0.001).collect()) + .add_string_field("note".into(), "trained on /home/user/data at 192.168.1.1".into()) + .build(&mut dp) + .unwrap(); + black_box(export); + }); + }); + + group.bench_function("merge_100_priors", |b| { + let merger = ImportMerger::new(); + let remote: Vec = (0..100).map(|i| TransferPriorEntry { + bucket_id: format!("bucket_{}", i), + arm_id: format!("arm_{}", i % 4), + params: BetaParams::new(10.0, 5.0), + observation_count: 50, + }).collect(); + b.iter(|| { + let mut local: Vec = (0..50).map(|i| TransferPriorEntry { + bucket_id: format!("bucket_{}", i), + arm_id: format!("arm_{}", i % 4), + params: BetaParams::new(5.0, 3.0), + observation_count: 20, + }).collect(); + merger.merge_priors(black_box(&mut local), black_box(&remote), 1); + black_box(local); + }); + }); + + group.finish(); +} + +criterion_group!(benches, bench_pii_strip, bench_diff_privacy, bench_aggregation, bench_export_import); +criterion_main!(benches); diff --git a/crates/rvf/rvf-federation/src/aggregate.rs b/crates/rvf/rvf-federation/src/aggregate.rs new file mode 100644 index 000000000..fb2091845 --- /dev/null +++ b/crates/rvf/rvf-federation/src/aggregate.rs @@ -0,0 +1,420 @@ +//! Federated aggregation: FedAvg, FedProx, Byzantine-tolerant weighted averaging. + +use crate::error::FederationError; +use crate::types::AggregateWeights; + +/// Aggregation strategy. +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub enum AggregationStrategy { + /// Federated Averaging (McMahan et al., 2017). + FedAvg, + /// Federated Proximal (Li et al., 2020). + FedProx { mu: u32 }, + /// Simple weighted average. + WeightedAverage, +} + +impl Default for AggregationStrategy { + fn default() -> Self { + Self::FedAvg + } +} + +/// A single contribution to a federated averaging round. +#[derive(Clone, Debug)] +pub struct Contribution { + /// Contributor pseudonym. + pub contributor: String, + /// Weight vector (LoRA deltas). + pub weights: Vec, + /// Quality/reputation weight for this contributor. + pub quality_weight: f64, + /// Number of training trajectories behind this contribution. + pub trajectory_count: u64, +} + +/// Federated aggregation server. +pub struct FederatedAggregator { + /// Aggregation strategy. + strategy: AggregationStrategy, + /// Domain identifier. + domain_id: String, + /// Current round number. + round: u64, + /// Minimum contributions required for a round. + min_contributions: usize, + /// Standard deviation threshold for Byzantine outlier detection. + byzantine_std_threshold: f64, + /// Collected contributions for the current round. + contributions: Vec, +} + +impl FederatedAggregator { + /// Create a new aggregator. + pub fn new(domain_id: String, strategy: AggregationStrategy) -> Self { + Self { + strategy, + domain_id, + round: 0, + min_contributions: 2, + byzantine_std_threshold: 2.0, + contributions: Vec::new(), + } + } + + /// Set minimum contributions required. + pub fn with_min_contributions(mut self, min: usize) -> Self { + self.min_contributions = min; + self + } + + /// Set Byzantine outlier threshold (in standard deviations). + pub fn with_byzantine_threshold(mut self, threshold: f64) -> Self { + self.byzantine_std_threshold = threshold; + self + } + + /// Add a contribution for the current round. + pub fn add_contribution(&mut self, contribution: Contribution) { + self.contributions.push(contribution); + } + + /// Number of contributions collected so far. + pub fn contribution_count(&self) -> usize { + self.contributions.len() + } + + /// Current round number. + pub fn round(&self) -> u64 { + self.round + } + + /// Check if we have enough contributions to aggregate. + pub fn ready(&self) -> bool { + self.contributions.len() >= self.min_contributions + } + + /// Detect and remove Byzantine outliers. + /// + /// Returns the number of outliers removed. + fn remove_byzantine_outliers(&mut self) -> u32 { + if self.contributions.len() < 3 { + return 0; // Need at least 3 for meaningful outlier detection + } + + let dim = self.contributions[0].weights.len(); + if dim == 0 || !self.contributions.iter().all(|c| c.weights.len() == dim) { + return 0; + } + + // Compute mean and std of L2 norms + let norms: Vec = self.contributions.iter() + .map(|c| c.weights.iter().map(|w| w * w).sum::().sqrt()) + .collect(); + + let mean_norm = norms.iter().sum::() / norms.len() as f64; + let variance = norms.iter().map(|n| (n - mean_norm).powi(2)).sum::() / norms.len() as f64; + let std_dev = variance.sqrt(); + + if std_dev < 1e-10 { + return 0; + } + + let original_count = self.contributions.len(); + let threshold = self.byzantine_std_threshold; + + self.contributions.retain(|c| { + let norm = c.weights.iter().map(|w| w * w).sum::().sqrt(); + ((norm - mean_norm) / std_dev).abs() <= threshold + }); + + (original_count - self.contributions.len()) as u32 + } + + /// Aggregate contributions and produce an `AggregateWeights` segment. + pub fn aggregate(&mut self) -> Result { + if self.contributions.len() < self.min_contributions { + return Err(FederationError::InsufficientContributions { + min: self.min_contributions, + got: self.contributions.len(), + }); + } + + // Byzantine outlier removal + let outliers_removed = self.remove_byzantine_outliers(); + + if self.contributions.is_empty() { + return Err(FederationError::InsufficientContributions { + min: self.min_contributions, + got: 0, + }); + } + + let dim = self.contributions[0].weights.len(); + + let result = match self.strategy { + AggregationStrategy::FedAvg => self.fedavg(dim), + AggregationStrategy::FedProx { mu } => self.fedprox(dim, mu as f64 / 100.0), + AggregationStrategy::WeightedAverage => self.weighted_avg(dim), + }; + + self.round += 1; + let participation_count = self.contributions.len() as u32; + + // Compute loss stats + let losses: Vec = self.contributions.iter() + .map(|c| { + // Use inverse quality as a proxy for loss + 1.0 - c.quality_weight.clamp(0.0, 1.0) + }) + .collect(); + let mean_loss = losses.iter().sum::() / losses.len() as f64; + let loss_variance = losses.iter().map(|l| (l - mean_loss).powi(2)).sum::() / losses.len() as f64; + + self.contributions.clear(); + + Ok(AggregateWeights { + round: self.round, + participation_count, + lora_deltas: result.0, + confidences: result.1, + mean_loss, + loss_variance, + domain_id: self.domain_id.clone(), + byzantine_filtered: outliers_removed > 0, + outliers_removed, + }) + } + + /// FedAvg: weighted average by trajectory count. + fn fedavg(&self, dim: usize) -> (Vec, Vec) { + let total_trajectories: f64 = self.contributions.iter() + .map(|c| c.trajectory_count as f64) + .sum(); + + let mut avg = vec![0.0f64; dim]; + let mut confidences = vec![0.0f64; dim]; + + if total_trajectories <= 0.0 { + return (avg, confidences); + } + + for c in &self.contributions { + let w = c.trajectory_count as f64 / total_trajectories; + for (i, val) in c.weights.iter().enumerate() { + if i < dim { + avg[i] += w * val; + } + } + } + + // Confidence = inverse of variance across contributions per dimension + for i in 0..dim { + let mean = avg[i]; + let var: f64 = self.contributions.iter() + .map(|c| { + let v = if i < c.weights.len() { c.weights[i] } else { 0.0 }; + (v - mean).powi(2) + }) + .sum::() / self.contributions.len() as f64; + confidences[i] = 1.0 / (1.0 + var); + } + + (avg, confidences) + } + + /// FedProx: weighted average with proximal term. + fn fedprox(&self, dim: usize, mu: f64) -> (Vec, Vec) { + let (mut avg, confidences) = self.fedavg(dim); + // Apply proximal regularization: pull toward zero (global model) + for val in &mut avg { + *val *= 1.0 / (1.0 + mu); + } + (avg, confidences) + } + + /// Weighted average by quality_weight. + fn weighted_avg(&self, dim: usize) -> (Vec, Vec) { + let total_weight: f64 = self.contributions.iter().map(|c| c.quality_weight).sum(); + + let mut avg = vec![0.0f64; dim]; + let mut confidences = vec![0.0f64; dim]; + + if total_weight <= 0.0 { + return (avg, confidences); + } + + for c in &self.contributions { + let w = c.quality_weight / total_weight; + for (i, val) in c.weights.iter().enumerate() { + if i < dim { + avg[i] += w * val; + } + } + } + + for i in 0..dim { + let mean = avg[i]; + let var: f64 = self.contributions.iter() + .map(|c| { + let v = if i < c.weights.len() { c.weights[i] } else { 0.0 }; + (v - mean).powi(2) + }) + .sum::() / self.contributions.len() as f64; + confidences[i] = 1.0 / (1.0 + var); + } + + (avg, confidences) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn make_contribution(name: &str, weights: Vec, quality: f64, trajectories: u64) -> Contribution { + Contribution { + contributor: name.to_string(), + weights, + quality_weight: quality, + trajectory_count: trajectories, + } + } + + #[test] + fn fedavg_two_equal_contributions() { + let mut agg = FederatedAggregator::new("test".into(), AggregationStrategy::FedAvg) + .with_min_contributions(2); + + agg.add_contribution(make_contribution("a", vec![1.0, 2.0, 3.0], 1.0, 100)); + agg.add_contribution(make_contribution("b", vec![3.0, 4.0, 5.0], 1.0, 100)); + + let result = agg.aggregate().unwrap(); + assert_eq!(result.round, 1); + assert_eq!(result.participation_count, 2); + assert!((result.lora_deltas[0] - 2.0).abs() < 1e-10); + assert!((result.lora_deltas[1] - 3.0).abs() < 1e-10); + assert!((result.lora_deltas[2] - 4.0).abs() < 1e-10); + } + + #[test] + fn fedavg_weighted_by_trajectories() { + let mut agg = FederatedAggregator::new("test".into(), AggregationStrategy::FedAvg) + .with_min_contributions(2); + + // A has 3x more trajectories, so A's values should dominate + agg.add_contribution(make_contribution("a", vec![10.0], 1.0, 300)); + agg.add_contribution(make_contribution("b", vec![0.0], 1.0, 100)); + + let result = agg.aggregate().unwrap(); + // (300*10 + 100*0) / 400 = 7.5 + assert!((result.lora_deltas[0] - 7.5).abs() < 1e-10); + } + + #[test] + fn fedprox_shrinks_toward_zero() { + let mut agg_avg = FederatedAggregator::new("test".into(), AggregationStrategy::FedAvg) + .with_min_contributions(2); + agg_avg.add_contribution(make_contribution("a", vec![10.0], 1.0, 100)); + agg_avg.add_contribution(make_contribution("b", vec![10.0], 1.0, 100)); + let avg_result = agg_avg.aggregate().unwrap(); + + let mut agg_prox = FederatedAggregator::new("test".into(), AggregationStrategy::FedProx { mu: 50 }) + .with_min_contributions(2); + agg_prox.add_contribution(make_contribution("a", vec![10.0], 1.0, 100)); + agg_prox.add_contribution(make_contribution("b", vec![10.0], 1.0, 100)); + let prox_result = agg_prox.aggregate().unwrap(); + + // FedProx should produce smaller values due to proximal regularization + assert!(prox_result.lora_deltas[0] < avg_result.lora_deltas[0]); + } + + #[test] + fn byzantine_outlier_removal() { + let mut agg = FederatedAggregator::new("test".into(), AggregationStrategy::FedAvg) + .with_min_contributions(2) + .with_byzantine_threshold(2.0); + + // Need enough good contributions so the outlier's z-score exceeds 2.0. + // With k good + 1 evil, the evil z-score grows with sqrt(k). + agg.add_contribution(make_contribution("good1", vec![1.0, 1.0], 1.0, 100)); + agg.add_contribution(make_contribution("good2", vec![1.1, 0.9], 1.0, 100)); + agg.add_contribution(make_contribution("good3", vec![0.9, 1.1], 1.0, 100)); + agg.add_contribution(make_contribution("good4", vec![1.0, 1.0], 1.0, 100)); + agg.add_contribution(make_contribution("good5", vec![1.0, 1.0], 1.0, 100)); + agg.add_contribution(make_contribution("good6", vec![1.0, 1.0], 1.0, 100)); + agg.add_contribution(make_contribution("evil", vec![100.0, 100.0], 1.0, 100)); // outlier + + let result = agg.aggregate().unwrap(); + assert!(result.byzantine_filtered); + assert!(result.outliers_removed >= 1); + // Result should be close to 1.0, not pulled toward 100 + assert!(result.lora_deltas[0] < 5.0); + } + + #[test] + fn insufficient_contributions_error() { + let mut agg = FederatedAggregator::new("test".into(), AggregationStrategy::FedAvg) + .with_min_contributions(3); + + agg.add_contribution(make_contribution("a", vec![1.0], 1.0, 100)); + + let result = agg.aggregate(); + assert!(result.is_err()); + } + + #[test] + fn weighted_average_strategy() { + let mut agg = FederatedAggregator::new("test".into(), AggregationStrategy::WeightedAverage) + .with_min_contributions(2); + + agg.add_contribution(make_contribution("a", vec![10.0], 0.9, 10)); + agg.add_contribution(make_contribution("b", vec![0.0], 0.1, 10)); + + let result = agg.aggregate().unwrap(); + // (0.9*10 + 0.1*0) / 1.0 = 9.0 + assert!((result.lora_deltas[0] - 9.0).abs() < 1e-10); + } + + #[test] + fn round_increments() { + let mut agg = FederatedAggregator::new("test".into(), AggregationStrategy::FedAvg) + .with_min_contributions(2); + + agg.add_contribution(make_contribution("a", vec![1.0], 1.0, 100)); + agg.add_contribution(make_contribution("b", vec![2.0], 1.0, 100)); + let r1 = agg.aggregate().unwrap(); + assert_eq!(r1.round, 1); + + agg.add_contribution(make_contribution("a", vec![3.0], 1.0, 100)); + agg.add_contribution(make_contribution("b", vec![4.0], 1.0, 100)); + let r2 = agg.aggregate().unwrap(); + assert_eq!(r2.round, 2); + } + + #[test] + fn confidences_high_when_agreement() { + let mut agg = FederatedAggregator::new("test".into(), AggregationStrategy::FedAvg) + .with_min_contributions(2); + + agg.add_contribution(make_contribution("a", vec![1.0], 1.0, 100)); + agg.add_contribution(make_contribution("b", vec![1.0], 1.0, 100)); + + let result = agg.aggregate().unwrap(); + // When all agree, variance = 0, confidence = 1/(1+0) = 1.0 + assert!((result.confidences[0] - 1.0).abs() < 1e-10); + } + + #[test] + fn confidences_lower_when_disagreement() { + let mut agg = FederatedAggregator::new("test".into(), AggregationStrategy::FedAvg) + .with_min_contributions(2); + + agg.add_contribution(make_contribution("a", vec![0.0], 1.0, 100)); + agg.add_contribution(make_contribution("b", vec![10.0], 1.0, 100)); + + let result = agg.aggregate().unwrap(); + // When disagreement, confidence < 1.0 + assert!(result.confidences[0] < 1.0); + } +} diff --git a/crates/rvf/rvf-federation/src/diff_privacy.rs b/crates/rvf/rvf-federation/src/diff_privacy.rs new file mode 100644 index 000000000..b41563b20 --- /dev/null +++ b/crates/rvf/rvf-federation/src/diff_privacy.rs @@ -0,0 +1,416 @@ +//! Differential privacy primitives for federated learning. +//! +//! Provides calibrated noise injection, gradient clipping, and a Renyi +//! Differential Privacy (RDP) accountant for tracking cumulative privacy loss. + +use rand::rngs::StdRng; +use rand::{Rng, SeedableRng}; +use rand_distr::{Distribution, Normal}; + +use crate::error::FederationError; +use crate::types::{DiffPrivacyProof, NoiseMechanism}; + +/// Differential privacy engine for adding calibrated noise. +pub struct DiffPrivacyEngine { + /// Target epsilon (privacy loss bound). + epsilon: f64, + /// Target delta (probability of exceeding epsilon). + delta: f64, + /// L2 sensitivity bound. + sensitivity: f64, + /// Gradient clipping norm. + clipping_norm: f64, + /// Noise mechanism. + mechanism: NoiseMechanism, + /// Random number generator. + rng: StdRng, +} + +impl DiffPrivacyEngine { + /// Create a new DP engine with Gaussian mechanism. + /// + /// Default: epsilon=1.0, delta=1e-5 (strong privacy). + pub fn gaussian( + epsilon: f64, + delta: f64, + sensitivity: f64, + clipping_norm: f64, + ) -> Result { + if epsilon <= 0.0 { + return Err(FederationError::InvalidEpsilon(epsilon)); + } + if delta <= 0.0 || delta >= 1.0 { + return Err(FederationError::InvalidDelta(delta)); + } + Ok(Self { + epsilon, + delta, + sensitivity, + clipping_norm, + mechanism: NoiseMechanism::Gaussian, + rng: StdRng::from_rng(rand::thread_rng()).unwrap(), + }) + } + + /// Create a new DP engine with Laplace mechanism. + pub fn laplace( + epsilon: f64, + sensitivity: f64, + clipping_norm: f64, + ) -> Result { + if epsilon <= 0.0 { + return Err(FederationError::InvalidEpsilon(epsilon)); + } + Ok(Self { + epsilon, + delta: 0.0, + sensitivity, + clipping_norm, + mechanism: NoiseMechanism::Laplace, + rng: StdRng::from_rng(rand::thread_rng()).unwrap(), + }) + } + + /// Create with a deterministic seed (for testing). + pub fn with_seed(mut self, seed: u64) -> Self { + self.rng = StdRng::seed_from_u64(seed); + self + } + + /// Compute the Gaussian noise standard deviation (sigma). + fn gaussian_sigma(&self) -> f64 { + self.sensitivity * (2.0_f64 * (1.25_f64 / self.delta).ln()).sqrt() / self.epsilon + } + + /// Compute the Laplace noise scale (b). + fn laplace_scale(&self) -> f64 { + self.sensitivity / self.epsilon + } + + /// Clip a gradient vector to the configured L2 norm bound. + pub fn clip_gradients(&self, gradients: &mut [f64]) { + let norm: f64 = gradients.iter().map(|x| x * x).sum::().sqrt(); + if norm > self.clipping_norm { + let scale = self.clipping_norm / norm; + for g in gradients.iter_mut() { + *g *= scale; + } + } + } + + /// Add calibrated noise to a vector of parameters. + /// + /// Clips gradients first, then adds noise per the configured mechanism. + pub fn add_noise(&mut self, params: &mut [f64]) -> DiffPrivacyProof { + self.clip_gradients(params); + + match self.mechanism { + NoiseMechanism::Gaussian => { + let sigma = self.gaussian_sigma(); + let normal = Normal::new(0.0, sigma).unwrap(); + for p in params.iter_mut() { + *p += normal.sample(&mut self.rng); + } + DiffPrivacyProof { + epsilon: self.epsilon, + delta: self.delta, + mechanism: NoiseMechanism::Gaussian, + sensitivity: self.sensitivity, + clipping_norm: self.clipping_norm, + noise_scale: sigma, + noised_parameter_count: params.len() as u64, + } + } + NoiseMechanism::Laplace => { + let b = self.laplace_scale(); + for p in params.iter_mut() { + // Laplace noise via inverse CDF: b * sign(u-0.5) * ln(1 - 2|u-0.5|) + let u: f64 = self.rng.gen::() - 0.5; + let noise = -b * u.signum() * (1.0 - 2.0 * u.abs()).ln(); + *p += noise; + } + DiffPrivacyProof { + epsilon: self.epsilon, + delta: 0.0, + mechanism: NoiseMechanism::Laplace, + sensitivity: self.sensitivity, + clipping_norm: self.clipping_norm, + noise_scale: b, + noised_parameter_count: params.len() as u64, + } + } + } + } + + /// Add noise to a single scalar value. + pub fn add_noise_scalar(&mut self, value: &mut f64) -> f64 { + let mut v = [*value]; + self.add_noise(&mut v); + *value = v[0]; + v[0] + } + + /// Current epsilon setting. + pub fn epsilon(&self) -> f64 { + self.epsilon + } + + /// Current delta setting. + pub fn delta(&self) -> f64 { + self.delta + } +} + +// -- Privacy Accountant (RDP) ------------------------------------------------ + +/// Renyi Differential Privacy (RDP) accountant for tracking cumulative privacy loss. +/// +/// Tracks privacy budget across multiple export rounds using RDP composition, +/// which provides tighter bounds than naive (epsilon, delta)-DP composition. +pub struct PrivacyAccountant { + /// Maximum allowed cumulative epsilon. + epsilon_limit: f64, + /// Target delta for conversion from RDP to (epsilon, delta)-DP. + target_delta: f64, + /// Accumulated RDP values at various alpha orders. + /// Each entry: (alpha_order, accumulated_rdp_epsilon) + rdp_alphas: Vec<(f64, f64)>, + /// History of exports: (timestamp, epsilon_spent, mechanism). + history: Vec, +} + +/// Record of a single privacy-consuming export. +#[derive(Clone, Debug)] +pub struct ExportRecord { + /// UNIX timestamp of the export. + pub timestamp_s: u64, + /// Epsilon consumed by this export. + pub epsilon: f64, + /// Delta for this export (0 for pure epsilon-DP). + pub delta: f64, + /// Mechanism used. + pub mechanism: NoiseMechanism, + /// Number of parameters. + pub parameter_count: u64, +} + +impl PrivacyAccountant { + /// Create a new accountant with the given budget. + pub fn new(epsilon_limit: f64, target_delta: f64) -> Self { + // Standard RDP alpha orders for accounting + let alphas: Vec = vec![ + 1.5, 1.75, 2.0, 2.5, 3.0, 4.0, 5.0, 6.0, 8.0, 16.0, 32.0, 64.0, 128.0, 256.0, 512.0, + 1024.0, + ]; + let rdp_alphas = alphas.into_iter().map(|a| (a, 0.0)).collect(); + Self { + epsilon_limit, + target_delta, + rdp_alphas, + history: Vec::new(), + } + } + + /// Compute RDP epsilon for the Gaussian mechanism at a given alpha order. + fn gaussian_rdp(alpha: f64, sigma: f64) -> f64 { + alpha / (2.0 * sigma * sigma) + } + + /// Convert RDP to (epsilon, delta)-DP for a given alpha order. + fn rdp_to_dp(alpha: f64, rdp_epsilon: f64, delta: f64) -> f64 { + rdp_epsilon - (delta.ln()) / (alpha - 1.0) + } + + /// Record a Gaussian mechanism query. + pub fn record_gaussian(&mut self, sigma: f64, epsilon: f64, delta: f64, parameter_count: u64) { + // Accumulate RDP at each alpha order + for (alpha, rdp_eps) in &mut self.rdp_alphas { + *rdp_eps += Self::gaussian_rdp(*alpha, sigma); + } + self.history.push(ExportRecord { + timestamp_s: 0, + epsilon, + delta, + mechanism: NoiseMechanism::Gaussian, + parameter_count, + }); + } + + /// Record a Laplace mechanism query. + pub fn record_laplace(&mut self, epsilon: f64, parameter_count: u64) { + // For Laplace, RDP epsilon at order alpha is: alpha * eps / (alpha - 1) + // when alpha > 1 + for (alpha, rdp_eps) in &mut self.rdp_alphas { + if *alpha > 1.0 { + *rdp_eps += *alpha * epsilon / (*alpha - 1.0); + } + } + self.history.push(ExportRecord { + timestamp_s: 0, + epsilon, + delta: 0.0, + mechanism: NoiseMechanism::Laplace, + parameter_count, + }); + } + + /// Get the current best (tightest) epsilon estimate. + pub fn current_epsilon(&self) -> f64 { + self.rdp_alphas + .iter() + .map(|(alpha, rdp_eps)| Self::rdp_to_dp(*alpha, *rdp_eps, self.target_delta)) + .fold(f64::INFINITY, f64::min) + } + + /// Remaining privacy budget. + pub fn remaining_budget(&self) -> f64 { + (self.epsilon_limit - self.current_epsilon()).max(0.0) + } + + /// Check if we can afford another export with the given epsilon. + pub fn can_afford(&self, additional_epsilon: f64) -> bool { + self.current_epsilon() + additional_epsilon <= self.epsilon_limit + } + + /// Check if budget is exhausted. + pub fn is_exhausted(&self) -> bool { + self.current_epsilon() >= self.epsilon_limit + } + + /// Fraction of budget consumed (0.0 to 1.0+). + pub fn budget_fraction_used(&self) -> f64 { + self.current_epsilon() / self.epsilon_limit + } + + /// Number of exports recorded. + pub fn export_count(&self) -> usize { + self.history.len() + } + + /// Export history. + pub fn history(&self) -> &[ExportRecord] { + &self.history + } + + /// Epsilon limit. + pub fn epsilon_limit(&self) -> f64 { + self.epsilon_limit + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn gaussian_engine_creates() { + let engine = DiffPrivacyEngine::gaussian(1.0, 1e-5, 1.0, 1.0); + assert!(engine.is_ok()); + } + + #[test] + fn invalid_epsilon_rejected() { + let engine = DiffPrivacyEngine::gaussian(0.0, 1e-5, 1.0, 1.0); + assert!(engine.is_err()); + let engine = DiffPrivacyEngine::gaussian(-1.0, 1e-5, 1.0, 1.0); + assert!(engine.is_err()); + } + + #[test] + fn invalid_delta_rejected() { + let engine = DiffPrivacyEngine::gaussian(1.0, 0.0, 1.0, 1.0); + assert!(engine.is_err()); + let engine = DiffPrivacyEngine::gaussian(1.0, 1.0, 1.0, 1.0); + assert!(engine.is_err()); + } + + #[test] + fn gradient_clipping() { + let engine = DiffPrivacyEngine::gaussian(1.0, 1e-5, 1.0, 1.0).unwrap(); + let mut grads = vec![3.0, 4.0]; // norm = 5.0 + engine.clip_gradients(&mut grads); + let norm: f64 = grads.iter().map(|x| x * x).sum::().sqrt(); + assert!((norm - 1.0).abs() < 1e-6); // clipped to norm 1.0 + } + + #[test] + fn gradient_no_clip_when_small() { + let engine = DiffPrivacyEngine::gaussian(1.0, 1e-5, 1.0, 10.0).unwrap(); + let mut grads = vec![3.0, 4.0]; // norm = 5.0, clip = 10.0 + engine.clip_gradients(&mut grads); + assert!((grads[0] - 3.0).abs() < 1e-10); + assert!((grads[1] - 4.0).abs() < 1e-10); + } + + #[test] + fn add_noise_gaussian_deterministic() { + let mut engine = DiffPrivacyEngine::gaussian(1.0, 1e-5, 1.0, 100.0) + .unwrap() + .with_seed(42); + let mut params = vec![1.0, 2.0, 3.0]; + let original = params.clone(); + let proof = engine.add_noise(&mut params); + assert_eq!(proof.mechanism, NoiseMechanism::Gaussian); + assert_eq!(proof.noised_parameter_count, 3); + // Params should be different from original (noise added) + assert!(params + .iter() + .zip(original.iter()) + .any(|(a, b)| (a - b).abs() > 1e-10)); + } + + #[test] + fn add_noise_laplace_deterministic() { + let mut engine = DiffPrivacyEngine::laplace(1.0, 1.0, 100.0) + .unwrap() + .with_seed(42); + let mut params = vec![1.0, 2.0, 3.0]; + let proof = engine.add_noise(&mut params); + assert_eq!(proof.mechanism, NoiseMechanism::Laplace); + assert_eq!(proof.noised_parameter_count, 3); + } + + #[test] + fn privacy_accountant_initial_state() { + let acc = PrivacyAccountant::new(10.0, 1e-5); + assert_eq!(acc.export_count(), 0); + assert!(!acc.is_exhausted()); + assert!(acc.can_afford(1.0)); + assert!(acc.remaining_budget() > 9.9); + } + + #[test] + fn privacy_accountant_tracks_gaussian() { + let mut acc = PrivacyAccountant::new(10.0, 1e-5); + // sigma=1.0 with epsilon=1.0 per query + acc.record_gaussian(1.0, 1.0, 1e-5, 100); + assert_eq!(acc.export_count(), 1); + let eps = acc.current_epsilon(); + assert!(eps > 0.0); + assert!(eps < 10.0); + } + + #[test] + fn privacy_accountant_composition() { + let mut acc = PrivacyAccountant::new(10.0, 1e-5); + let eps_after_1 = { + acc.record_gaussian(1.0, 1.0, 1e-5, 100); + acc.current_epsilon() + }; + acc.record_gaussian(1.0, 1.0, 1e-5, 100); + let eps_after_2 = acc.current_epsilon(); + // After 2 queries, epsilon should be larger + assert!(eps_after_2 > eps_after_1); + } + + #[test] + fn privacy_accountant_exhaustion() { + let mut acc = PrivacyAccountant::new(1.0, 1e-5); + // Use a very small sigma to burn budget fast + for _ in 0..100 { + acc.record_gaussian(0.1, 10.0, 1e-5, 10); + } + assert!(acc.is_exhausted()); + assert!(!acc.can_afford(0.1)); + } +} diff --git a/crates/rvf/rvf-federation/src/error.rs b/crates/rvf/rvf-federation/src/error.rs new file mode 100644 index 000000000..6d9015147 --- /dev/null +++ b/crates/rvf/rvf-federation/src/error.rs @@ -0,0 +1,52 @@ +//! Federation error types. + +use thiserror::Error; + +/// Errors that can occur during federation operations. +#[derive(Debug, Error)] +pub enum FederationError { + #[error("privacy budget exhausted: spent {spent:.4}, limit {limit:.4}")] + PrivacyBudgetExhausted { spent: f64, limit: f64 }, + + #[error("invalid epsilon value: {0} (must be > 0)")] + InvalidEpsilon(f64), + + #[error("invalid delta value: {0} (must be in (0, 1))")] + InvalidDelta(f64), + + #[error("segment validation failed: {0}")] + SegmentValidation(String), + + #[error("version mismatch: expected {expected}, got {got}")] + VersionMismatch { expected: u32, got: u32 }, + + #[error("signature verification failed")] + SignatureVerification, + + #[error("witness chain broken at index {0}")] + WitnessChainBroken(usize), + + #[error("insufficient observations: need {needed}, have {have}")] + InsufficientObservations { needed: u64, have: u64 }, + + #[error("quality below threshold: {score:.4} < {threshold:.4}")] + QualityBelowThreshold { score: f64, threshold: f64 }, + + #[error("export rate limited: next export allowed at {next_allowed_epoch_s}")] + RateLimited { next_allowed_epoch_s: u64 }, + + #[error("PII detected after stripping: {field}")] + PiiLeakDetected { field: String }, + + #[error("Byzantine outlier detected from contributor {contributor}")] + ByzantineOutlier { contributor: String }, + + #[error("aggregation requires at least {min} contributions, got {got}")] + InsufficientContributions { min: usize, got: usize }, + + #[error("serialization error: {0}")] + Serialization(String), + + #[error("io error: {0}")] + Io(String), +} diff --git a/crates/rvf/rvf-federation/src/federation.rs b/crates/rvf/rvf-federation/src/federation.rs new file mode 100644 index 000000000..89d89c68d --- /dev/null +++ b/crates/rvf/rvf-federation/src/federation.rs @@ -0,0 +1,477 @@ +//! Federation protocol: export builder, import merger, version-aware conflict resolution. + +use crate::diff_privacy::DiffPrivacyEngine; +use crate::error::FederationError; +use crate::pii_strip::PiiStripper; +use crate::policy::FederationPolicy; +use crate::types::*; + +/// Builder for constructing a federated learning export. +/// +/// Follows the export flow from ADR-057: +/// 1. Extract learning (priors, kernels, cost curves, weights) +/// 2. PII-strip all payloads +/// 3. Add differential privacy noise +/// 4. Assemble manifest + attestation segments +pub struct ExportBuilder { + contributor_pseudonym: String, + domain_id: String, + priors: Vec, + kernels: Vec, + cost_curves: Vec, + weights: Vec>, + policy: FederationPolicy, + string_fields: Vec<(String, String)>, +} + +/// A completed federated export ready for publishing. +#[derive(Clone, Debug)] +pub struct FederatedExport { + /// The manifest describing this export. + pub manifest: FederatedManifest, + /// PII redaction attestation. + pub redaction_log: RedactionLog, + /// Differential privacy attestation. + pub privacy_proof: DiffPrivacyProof, + /// Transfer priors (after PII stripping and DP noise). + pub priors: Vec, + /// Policy kernel snapshots. + pub kernels: Vec, + /// Cost curve snapshots. + pub cost_curves: Vec, + /// Noised aggregate weights (if any). + pub weights: Vec>, +} + +impl ExportBuilder { + /// Create a new export builder. + pub fn new(contributor_pseudonym: String, domain_id: String) -> Self { + Self { + contributor_pseudonym, + domain_id, + priors: Vec::new(), + kernels: Vec::new(), + cost_curves: Vec::new(), + weights: Vec::new(), + policy: FederationPolicy::default(), + string_fields: Vec::new(), + } + } + + /// Set the federation policy. + pub fn with_policy(mut self, policy: FederationPolicy) -> Self { + self.policy = policy; + self + } + + /// Add transfer priors from a trained domain. + pub fn add_priors(mut self, priors: TransferPriorSet) -> Self { + self.priors.push(priors); + self + } + + /// Add a policy kernel snapshot. + pub fn add_kernel(mut self, kernel: PolicyKernelSnapshot) -> Self { + self.kernels.push(kernel); + self + } + + /// Add a cost curve snapshot. + pub fn add_cost_curve(mut self, curve: CostCurveSnapshot) -> Self { + self.cost_curves.push(curve); + self + } + + /// Add raw weight vectors (LoRA deltas). + pub fn add_weights(mut self, weights: Vec) -> Self { + self.weights.push(weights); + self + } + + /// Add a named string field for PII scanning. + pub fn add_string_field(mut self, name: String, value: String) -> Self { + self.string_fields.push((name, value)); + self + } + + /// Build the export: PII-strip, add DP noise, assemble manifest. + pub fn build(mut self, dp_engine: &mut DiffPrivacyEngine) -> Result { + // 1. Apply quality gate from policy + self.priors.retain(|ps| { + ps.entries.iter().all(|e| e.observation_count >= self.policy.min_observations) + }); + + // 2. PII stripping + let mut stripper = PiiStripper::new(); + let field_refs: Vec<(&str, &str)> = self.string_fields + .iter() + .map(|(n, v)| (n.as_str(), v.as_str())) + .collect(); + let (_redacted_fields, redaction_log) = stripper.strip_fields(&field_refs); + + // Strip PII from domain IDs and bucket IDs in priors + for ps in &mut self.priors { + ps.source_domain = stripper.strip_value(&ps.source_domain); + for entry in &mut ps.entries { + entry.bucket_id = stripper.strip_value(&entry.bucket_id); + } + } + + // Strip PII from cost curve domain IDs + for curve in &mut self.cost_curves { + curve.domain_id = stripper.strip_value(&curve.domain_id); + } + + // 3. Add differential privacy noise to numerical parameters + // Noise the Beta posteriors + let mut noised_count: u64 = 0; + for ps in &mut self.priors { + for entry in &mut ps.entries { + let mut params = [entry.params.alpha, entry.params.beta]; + dp_engine.add_noise(&mut params); + entry.params.alpha = params[0].max(0.01); // Keep positive + entry.params.beta = params[1].max(0.01); + noised_count += 2; + } + } + + // Noise the weight vectors + for w in &mut self.weights { + dp_engine.add_noise(w); + noised_count += w.len() as u64; + } + + // Noise kernel knobs + for kernel in &mut self.kernels { + dp_engine.add_noise(&mut kernel.knobs); + noised_count += kernel.knobs.len() as u64; + } + + // Noise cost curve values + for curve in &mut self.cost_curves { + let mut costs: Vec = curve.points.iter().map(|(_, c)| *c).collect(); + dp_engine.add_noise(&mut costs); + for (i, (_, c)) in curve.points.iter_mut().enumerate() { + *c = costs[i]; + } + noised_count += costs.len() as u64; + } + + let privacy_proof = DiffPrivacyProof { + epsilon: dp_engine.epsilon(), + delta: dp_engine.delta(), + mechanism: NoiseMechanism::Gaussian, + sensitivity: 1.0, + clipping_norm: 1.0, + noise_scale: 0.0, + noised_parameter_count: noised_count, + }; + + // 4. Build manifest + let total_trajectories: u64 = self.priors.iter() + .flat_map(|ps| ps.entries.iter()) + .map(|e| e.observation_count) + .sum(); + + let avg_quality = if !self.priors.is_empty() { + self.priors.iter() + .flat_map(|ps| ps.entries.iter()) + .map(|e| e.params.mean()) + .sum::() + / self.priors.iter().map(|ps| ps.entries.len()).sum::().max(1) as f64 + } else { + 0.0 + }; + + let manifest = FederatedManifest { + format_version: 1, + contributor_pseudonym: self.contributor_pseudonym, + export_timestamp_s: 0, + included_segment_ids: Vec::new(), + privacy_budget_spent: dp_engine.epsilon(), + domain_id: self.domain_id, + rvf_version_tag: String::from("rvf-v1"), + trajectory_count: total_trajectories, + avg_quality_score: avg_quality, + }; + + Ok(FederatedExport { + manifest, + redaction_log, + privacy_proof, + priors: self.priors, + kernels: self.kernels, + cost_curves: self.cost_curves, + weights: self.weights, + }) + } +} + +/// Merger for importing federated learning into local engines. +/// +/// Follows the import flow from ADR-057: +/// 1. Validate signature and witness chain +/// 2. Check version compatibility +/// 3. Merge with dampened confidence +pub struct ImportMerger { + /// Current RVF version for compatibility checks. + current_version: u32, + /// Dampening factor for cross-version imports. + version_dampen_factor: f64, +} + +impl ImportMerger { + /// Create a new import merger. + pub fn new() -> Self { + Self { + current_version: 1, + version_dampen_factor: 0.5, + } + } + + /// Set the dampening factor for imports from different versions. + pub fn with_version_dampen(mut self, factor: f64) -> Self { + self.version_dampen_factor = factor.clamp(0.0, 1.0); + self + } + + /// Validate a federated export. + pub fn validate(&self, export: &FederatedExport) -> Result<(), FederationError> { + // Check format version + if export.manifest.format_version == 0 { + return Err(FederationError::SegmentValidation( + "format_version must be > 0".into(), + )); + } + + // Check privacy proof has valid parameters + if export.privacy_proof.epsilon <= 0.0 { + return Err(FederationError::InvalidEpsilon(export.privacy_proof.epsilon)); + } + + // Check priors have positive parameters + for ps in &export.priors { + for entry in &ps.entries { + if entry.params.alpha <= 0.0 || entry.params.beta <= 0.0 { + return Err(FederationError::SegmentValidation(format!( + "invalid Beta params in bucket {}: alpha={}, beta={}", + entry.bucket_id, entry.params.alpha, entry.params.beta + ))); + } + } + } + + Ok(()) + } + + /// Merge imported priors with local priors. + /// + /// Uses version-aware dampening: same version gets full weight, + /// older versions get dampened (sqrt-scaling per MetaThompsonEngine). + pub fn merge_priors( + &self, + local: &mut Vec, + remote: &[TransferPriorEntry], + remote_version: u32, + ) { + let dampen = if remote_version == self.current_version { + 1.0 + } else { + self.version_dampen_factor + }; + + for remote_entry in remote { + let dampened = remote_entry.params.dampen(dampen); + + if let Some(local_entry) = local.iter_mut().find(|l| { + l.bucket_id == remote_entry.bucket_id && l.arm_id == remote_entry.arm_id + }) { + // Merge: sum parameters minus uniform prior + local_entry.params = local_entry.params.merge(&dampened); + local_entry.observation_count += remote_entry.observation_count; + } else { + // New entry: insert with dampened params + local.push(TransferPriorEntry { + bucket_id: remote_entry.bucket_id.clone(), + arm_id: remote_entry.arm_id.clone(), + params: dampened, + observation_count: remote_entry.observation_count, + }); + } + } + } + + /// Merge imported weights with local weights using weighted average. + pub fn merge_weights( + &self, + local: &mut [f64], + remote: &[f64], + local_weight: f64, + remote_weight: f64, + ) { + let total = local_weight + remote_weight; + if total <= 0.0 || local.len() != remote.len() { + return; + } + for (l, r) in local.iter_mut().zip(remote.iter()) { + *l = (local_weight * *l + remote_weight * *r) / total; + } + } +} + +impl Default for ImportMerger { + fn default() -> Self { + Self::new() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::diff_privacy::DiffPrivacyEngine; + + fn make_test_priors() -> TransferPriorSet { + TransferPriorSet { + source_domain: "test_domain".into(), + entries: vec![ + TransferPriorEntry { + bucket_id: "medium_algorithm".into(), + arm_id: "arm_0".into(), + params: BetaParams::new(10.0, 5.0), + observation_count: 50, + }, + TransferPriorEntry { + bucket_id: "hard_synthesis".into(), + arm_id: "arm_1".into(), + params: BetaParams::new(8.0, 12.0), + observation_count: 30, + }, + ], + cost_ema: 0.85, + } + } + + #[test] + fn export_builder_basic() { + let mut dp = DiffPrivacyEngine::gaussian(1.0, 1e-5, 1.0, 10.0) + .unwrap() + .with_seed(42); + let export = ExportBuilder::new("alice_pseudo".into(), "code_review".into()) + .add_priors(make_test_priors()) + .build(&mut dp) + .unwrap(); + + assert_eq!(export.manifest.contributor_pseudonym, "alice_pseudo"); + assert_eq!(export.manifest.domain_id, "code_review"); + assert_eq!(export.manifest.format_version, 1); + assert!(!export.priors.is_empty()); + } + + #[test] + fn export_builder_with_weights() { + let mut dp = DiffPrivacyEngine::gaussian(1.0, 1e-5, 1.0, 10.0) + .unwrap() + .with_seed(42); + let weights = vec![0.1, 0.2, 0.3, 0.4]; + let export = ExportBuilder::new("bob_pseudo".into(), "genomics".into()) + .add_weights(weights.clone()) + .build(&mut dp) + .unwrap(); + + assert_eq!(export.weights.len(), 1); + // Weights should be different from original (noise added) + assert!(export.weights[0].iter().zip(weights.iter()).any(|(a, b)| (a - b).abs() > 1e-10)); + } + + #[test] + fn import_merger_validate() { + let mut dp = DiffPrivacyEngine::gaussian(1.0, 1e-5, 1.0, 10.0) + .unwrap() + .with_seed(42); + let export = ExportBuilder::new("alice".into(), "domain".into()) + .add_priors(make_test_priors()) + .build(&mut dp) + .unwrap(); + + let merger = ImportMerger::new(); + assert!(merger.validate(&export).is_ok()); + } + + #[test] + fn import_merger_merge_priors_same_version() { + let merger = ImportMerger::new(); + let mut local = vec![TransferPriorEntry { + bucket_id: "medium_algorithm".into(), + arm_id: "arm_0".into(), + params: BetaParams::new(5.0, 3.0), + observation_count: 20, + }]; + + let remote = vec![TransferPriorEntry { + bucket_id: "medium_algorithm".into(), + arm_id: "arm_0".into(), + params: BetaParams::new(10.0, 5.0), + observation_count: 50, + }]; + + merger.merge_priors(&mut local, &remote, 1); + assert_eq!(local.len(), 1); + // Merged: alpha = 5 + 10 - 1 = 14, beta = 3 + 5 - 1 = 7 + assert!((local[0].params.alpha - 14.0).abs() < 1e-10); + assert!((local[0].params.beta - 7.0).abs() < 1e-10); + assert_eq!(local[0].observation_count, 70); + } + + #[test] + fn import_merger_merge_priors_different_version() { + let merger = ImportMerger::new(); + let mut local = vec![TransferPriorEntry { + bucket_id: "b".into(), + arm_id: "a".into(), + params: BetaParams::new(10.0, 10.0), + observation_count: 50, + }]; + + let remote = vec![TransferPriorEntry { + bucket_id: "b".into(), + arm_id: "a".into(), + params: BetaParams::new(20.0, 5.0), + observation_count: 40, + }]; + + merger.merge_priors(&mut local, &remote, 0); // older version -> dampened + assert_eq!(local.len(), 1); + // Remote dampened by 0.5: alpha = 1 + (20-1)*0.5 = 10.5, beta = 1 + (5-1)*0.5 = 3.0 + // Merged: alpha = 10 + 10.5 - 1 = 19.5, beta = 10 + 3.0 - 1 = 12.0 + assert!((local[0].params.alpha - 19.5).abs() < 1e-10); + assert!((local[0].params.beta - 12.0).abs() < 1e-10); + } + + #[test] + fn import_merger_merge_new_bucket() { + let merger = ImportMerger::new(); + let mut local: Vec = Vec::new(); + + let remote = vec![TransferPriorEntry { + bucket_id: "new_bucket".into(), + arm_id: "arm_0".into(), + params: BetaParams::new(10.0, 5.0), + observation_count: 30, + }]; + + merger.merge_priors(&mut local, &remote, 1); + assert_eq!(local.len(), 1); + assert_eq!(local[0].bucket_id, "new_bucket"); + } + + #[test] + fn merge_weights_weighted_average() { + let merger = ImportMerger::new(); + let mut local = vec![1.0, 2.0, 3.0]; + let remote = vec![3.0, 4.0, 5.0]; + merger.merge_weights(&mut local, &remote, 0.5, 0.5); + assert!((local[0] - 2.0).abs() < 1e-10); + assert!((local[1] - 3.0).abs() < 1e-10); + assert!((local[2] - 4.0).abs() < 1e-10); + } +} diff --git a/crates/rvf/rvf-federation/src/lib.rs b/crates/rvf/rvf-federation/src/lib.rs new file mode 100644 index 000000000..767fd1c3d --- /dev/null +++ b/crates/rvf/rvf-federation/src/lib.rs @@ -0,0 +1,24 @@ +//! Federated RVF transfer learning. +//! +//! This crate implements the federation protocol described in ADR-057: +//! - **PII stripping**: Three-stage pipeline (detect, redact, attest) +//! - **Differential privacy**: Gaussian/Laplace noise, RDP accountant, gradient clipping +//! - **Federation protocol**: Export builder, import merger, version-aware conflict resolution +//! - **Federated aggregation**: FedAvg, FedProx, Byzantine-tolerant weighted averaging +//! - **Segment types**: FederatedManifest, DiffPrivacyProof, RedactionLog, AggregateWeights + +pub mod types; +pub mod error; +pub mod pii_strip; +pub mod diff_privacy; +pub mod federation; +pub mod aggregate; +pub mod policy; + +pub use types::*; +pub use error::FederationError; +pub use pii_strip::PiiStripper; +pub use diff_privacy::{DiffPrivacyEngine, PrivacyAccountant}; +pub use federation::{ExportBuilder, ImportMerger}; +pub use aggregate::{FederatedAggregator, AggregationStrategy}; +pub use policy::FederationPolicy; diff --git a/crates/rvf/rvf-federation/src/pii_strip.rs b/crates/rvf/rvf-federation/src/pii_strip.rs new file mode 100644 index 000000000..3a0a1c04e --- /dev/null +++ b/crates/rvf/rvf-federation/src/pii_strip.rs @@ -0,0 +1,354 @@ +//! Three-stage PII stripping pipeline. +//! +//! **Stage 1 — Detection**: Scan string fields for PII patterns. +//! **Stage 2 — Redaction**: Replace PII with deterministic pseudonyms. +//! **Stage 3 — Attestation**: Generate a `RedactionLog` segment. + +use std::collections::HashMap; +use regex::Regex; +use sha3::{Shake256, digest::{Update, ExtendableOutput, XofReader}}; + +use crate::types::{RedactionLog, RedactionEntry}; + +/// PII category with its detection regex and replacement template. +struct PiiRule { + category: &'static str, + rule_id: &'static str, + pattern: Regex, + prefix: &'static str, +} + +/// Three-stage PII stripping pipeline. +pub struct PiiStripper { + rules: Vec, + /// Custom regex rules added by the user. + custom_rules: Vec, + /// Pseudonym counter per category (for deterministic replacement). + counters: HashMap, + /// Map from original value to pseudonym (preserves structural relationships). + pseudonym_map: HashMap, +} + +impl PiiStripper { + /// Create a new stripper with default detection rules. + pub fn new() -> Self { + let rules = vec![ + PiiRule { + category: "path", + rule_id: "rule_path_unix", + pattern: Regex::new(r#"(?:/(?:home|Users|var|tmp|opt|etc)/[^\s,;:"'\]}>)]+)"#).unwrap(), + prefix: "PATH", + }, + PiiRule { + category: "path", + rule_id: "rule_path_windows", + pattern: Regex::new(r#"(?i:[A-Z]:\\(?:Users|Documents|Program Files)[^\s,;:"'\]}>)]+)"#).unwrap(), + prefix: "PATH", + }, + PiiRule { + category: "ip", + rule_id: "rule_ipv4", + pattern: Regex::new(r"\b(?:(?:25[0-5]|2[0-4]\d|[01]?\d\d?)\.){3}(?:25[0-5]|2[0-4]\d|[01]?\d\d?)\b").unwrap(), + prefix: "IP", + }, + PiiRule { + category: "ip", + rule_id: "rule_ipv6", + pattern: Regex::new(r"\b(?:[0-9a-fA-F]{1,4}:){7}[0-9a-fA-F]{1,4}\b").unwrap(), + prefix: "IP", + }, + PiiRule { + category: "email", + rule_id: "rule_email", + pattern: Regex::new(r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}\b").unwrap(), + prefix: "EMAIL", + }, + PiiRule { + category: "api_key", + rule_id: "rule_api_key_sk", + pattern: Regex::new(r"\bsk-[A-Za-z0-9]{20,}\b").unwrap(), + prefix: "REDACTED_KEY", + }, + PiiRule { + category: "api_key", + rule_id: "rule_api_key_aws", + pattern: Regex::new(r"\bAKIA[A-Z0-9]{16}\b").unwrap(), + prefix: "REDACTED_KEY", + }, + PiiRule { + category: "api_key", + rule_id: "rule_api_key_github", + pattern: Regex::new(r"\bghp_[A-Za-z0-9]{36}\b").unwrap(), + prefix: "REDACTED_KEY", + }, + PiiRule { + category: "api_key", + rule_id: "rule_bearer_token", + pattern: Regex::new(r"\bBearer\s+[A-Za-z0-9._~+/=-]{20,}\b").unwrap(), + prefix: "REDACTED_KEY", + }, + PiiRule { + category: "env_var", + rule_id: "rule_env_unix", + pattern: Regex::new(r"\$(?:HOME|USER|USERNAME|USERPROFILE|PATH|TMPDIR)\b").unwrap(), + prefix: "ENV", + }, + PiiRule { + category: "env_var", + rule_id: "rule_env_windows", + pattern: Regex::new(r"%(?:HOME|USER|USERNAME|USERPROFILE|PATH|TEMP)%").unwrap(), + prefix: "ENV", + }, + PiiRule { + category: "username", + rule_id: "rule_username_at", + pattern: Regex::new(r"@[A-Za-z][A-Za-z0-9_-]{2,30}\b").unwrap(), + prefix: "USER", + }, + ]; + + Self { + rules, + custom_rules: Vec::new(), + counters: HashMap::new(), + pseudonym_map: HashMap::new(), + } + } + + /// Add a custom detection rule. + pub fn add_rule(&mut self, category: &'static str, rule_id: &'static str, pattern: &str, prefix: &'static str) -> Result<(), regex::Error> { + self.custom_rules.push(PiiRule { + category, + rule_id, + pattern: Regex::new(pattern)?, + prefix, + }); + Ok(()) + } + + /// Reset the pseudonym map and counters (call between exports). + pub fn reset(&mut self) { + self.counters.clear(); + self.pseudonym_map.clear(); + } + + /// Get or create a deterministic pseudonym for a matched value. + fn pseudonym(&mut self, original: &str, prefix: &str) -> String { + if let Some(existing) = self.pseudonym_map.get(original) { + return existing.clone(); + } + let counter = self.counters.entry(prefix.to_string()).or_insert(0); + *counter += 1; + let pseudo = format!("<{}_{}>", prefix, counter); + self.pseudonym_map.insert(original.to_string(), pseudo.clone()); + pseudo + } + + /// Stage 1+2: Detect and redact PII in a single string. + /// Returns (redacted_string, list of (category, rule_id, count) tuples). + fn strip_string(&mut self, input: &str) -> (String, Vec<(String, String, u32)>) { + let mut result = input.to_string(); + let mut detections: Vec<(String, String, u32)> = Vec::new(); + + let num_builtin = self.rules.len(); + let num_custom = self.custom_rules.len(); + + for i in 0..(num_builtin + num_custom) { + let (pattern, prefix, category, rule_id) = if i < num_builtin { + let r = &self.rules[i]; + (&r.pattern as &Regex, r.prefix, r.category, r.rule_id) + } else { + let r = &self.custom_rules[i - num_builtin]; + (&r.pattern as &Regex, r.prefix, r.category, r.rule_id) + }; + let matches: Vec = pattern.find_iter(&result).map(|m| m.as_str().to_string()).collect(); + if matches.is_empty() { + continue; + } + let count = matches.len() as u32; + // Build pseudonyms and perform replacements + let mut replacements: Vec<(String, String)> = Vec::new(); + for m in &matches { + let pseudo = self.pseudonym(m, prefix); + replacements.push((m.clone(), pseudo)); + } + for (original, pseudo) in &replacements { + result = result.replace(original.as_str(), pseudo.as_str()); + } + detections.push((category.to_string(), rule_id.to_string(), count)); + } + + (result, detections) + } + + /// Strip PII from a collection of named string fields. + /// + /// Returns the redacted fields and a `RedactionLog` attestation. + pub fn strip_fields(&mut self, fields: &[(&str, &str)]) -> (Vec<(String, String)>, RedactionLog) { + // Stage 1+2: Detect and redact + let mut redacted_fields = Vec::new(); + let mut all_detections: HashMap<(String, String), u32> = HashMap::new(); + + // Compute pre-redaction hash (Stage 3 prep) + let mut hasher = Shake256::default(); + for (name, value) in fields { + hasher.update(name.as_bytes()); + hasher.update(value.as_bytes()); + } + let mut pre_hash = [0u8; 32]; + hasher.finalize_xof().read(&mut pre_hash); + + for (name, value) in fields { + let (redacted, detections) = self.strip_string(value); + redacted_fields.push((name.to_string(), redacted)); + for (cat, rule, count) in detections { + *all_detections.entry((cat, rule)).or_insert(0) += count; + } + } + + // Stage 3: Build attestation + let mut log = RedactionLog { + entries: Vec::new(), + pre_redaction_hash: pre_hash, + fields_scanned: fields.len() as u64, + total_redactions: 0, + timestamp_s: 0, // caller should set this + }; + + for ((category, rule_id), count) in &all_detections { + log.entries.push(RedactionEntry { + category: category.clone(), + count: *count, + rule_id: rule_id.clone(), + }); + log.total_redactions += *count as u64; + } + + (redacted_fields, log) + } + + /// Strip PII from a single string value. + pub fn strip_value(&mut self, input: &str) -> String { + let (result, _) = self.strip_string(input); + result + } + + /// Check if a string contains any detectable PII. + pub fn contains_pii(&self, input: &str) -> bool { + let all_rules: Vec<&PiiRule> = self.rules.iter().chain(self.custom_rules.iter()).collect(); + for rule in all_rules { + if rule.pattern.is_match(input) { + return true; + } + } + false + } + + /// Return the current pseudonym map (for debugging/auditing). + pub fn pseudonym_map(&self) -> &HashMap { + &self.pseudonym_map + } +} + +impl Default for PiiStripper { + fn default() -> Self { + Self::new() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn detect_unix_paths() { + let stripper = PiiStripper::new(); + assert!(stripper.contains_pii("/home/user/project/src/main.rs")); + assert!(stripper.contains_pii("/Users/alice/.ssh/id_rsa")); + } + + #[test] + fn detect_ipv4() { + let stripper = PiiStripper::new(); + assert!(stripper.contains_pii("connecting to 192.168.1.100:8080")); + assert!(stripper.contains_pii("server at 10.0.0.1")); + } + + #[test] + fn detect_emails() { + let stripper = PiiStripper::new(); + assert!(stripper.contains_pii("contact user@example.com for help")); + } + + #[test] + fn detect_api_keys() { + let stripper = PiiStripper::new(); + assert!(stripper.contains_pii("key: sk-abcdefghijklmnopqrstuv")); + assert!(stripper.contains_pii("aws: AKIAIOSFODNN7EXAMPLE")); + assert!(stripper.contains_pii("token: ghp_abcdefghijklmnopqrstuvwxyz0123456789")); + } + + #[test] + fn detect_env_vars() { + let stripper = PiiStripper::new(); + assert!(stripper.contains_pii("path is $HOME/.config")); + assert!(stripper.contains_pii("dir is %USERPROFILE%\\Desktop")); + } + + #[test] + fn redact_preserves_structure() { + let mut stripper = PiiStripper::new(); + let input1 = "file at /home/alice/project/a.rs"; + let input2 = "also at /home/alice/project/b.rs"; + let r1 = stripper.strip_value(input1); + let r2 = stripper.strip_value(input2); + // Same path prefix should get same pseudonym + assert!(r1.contains("= 2); + assert!(log.pre_redaction_hash != [0u8; 32]); + // clean field should be unchanged + assert_eq!(redacted[2].1, "no pii here"); + } + + #[test] + fn no_pii_returns_clean() { + let stripper = PiiStripper::new(); + assert!(!stripper.contains_pii("just a normal string")); + assert!(!stripper.contains_pii("alpha = 10.5, beta = 3.2")); + } + + #[test] + fn reset_clears_state() { + let mut stripper = PiiStripper::new(); + stripper.strip_value("/home/user/test"); + assert!(!stripper.pseudonym_map().is_empty()); + stripper.reset(); + assert!(stripper.pseudonym_map().is_empty()); + } + + #[test] + fn custom_rule() { + let mut stripper = PiiStripper::new(); + stripper.add_rule("ssn", "rule_ssn", r"\b\d{3}-\d{2}-\d{4}\b", "SSN").unwrap(); + assert!(stripper.contains_pii("ssn: 123-45-6789")); + let redacted = stripper.strip_value("ssn: 123-45-6789"); + assert!(redacted.contains(", + /// Segment types explicitly denied for export. + pub denied_segments: HashSet, + /// Domain IDs allowed for export (empty = all allowed). + pub allowed_domains: HashSet, + /// Domain IDs denied for export. + pub denied_domains: HashSet, + /// Minimum quality score for exported trajectories (0.0 - 1.0). + pub quality_threshold: f64, + /// Minimum observations per prior entry for export. + pub min_observations: u64, + /// Maximum exports per hour. + pub max_exports_per_hour: u32, + /// Maximum cumulative privacy budget (epsilon). + pub privacy_budget_limit: f64, + /// Whether to include policy kernel snapshots. + pub export_kernels: bool, + /// Whether to include cost curve data. + pub export_cost_curves: bool, +} + +impl Default for FederationPolicy { + fn default() -> Self { + Self { + allowed_segments: HashSet::new(), + denied_segments: HashSet::new(), + allowed_domains: HashSet::new(), + denied_domains: HashSet::new(), + quality_threshold: 0.5, + min_observations: 12, + max_exports_per_hour: 100, + privacy_budget_limit: 10.0, + export_kernels: true, + export_cost_curves: true, + } + } +} + +impl FederationPolicy { + /// Create a restrictive policy (deny all by default). + pub fn restrictive() -> Self { + Self { + quality_threshold: 0.8, + min_observations: 50, + max_exports_per_hour: 10, + privacy_budget_limit: 5.0, + export_kernels: false, + export_cost_curves: false, + ..Default::default() + } + } + + /// Create a permissive policy (share everything). + pub fn permissive() -> Self { + Self { + quality_threshold: 0.0, + min_observations: 1, + max_exports_per_hour: 1000, + privacy_budget_limit: 100.0, + export_kernels: true, + export_cost_curves: true, + ..Default::default() + } + } + + /// Check if a segment type is allowed for export. + pub fn is_segment_allowed(&self, seg_type: u8) -> bool { + if self.denied_segments.contains(&seg_type) { + return false; + } + if self.allowed_segments.is_empty() { + return true; + } + self.allowed_segments.contains(&seg_type) + } + + /// Check if a domain is allowed for export. + pub fn is_domain_allowed(&self, domain_id: &str) -> bool { + if self.denied_domains.contains(domain_id) { + return false; + } + if self.allowed_domains.is_empty() { + return true; + } + self.allowed_domains.contains(domain_id) + } + + /// Allow a specific segment type. + pub fn allow_segment(mut self, seg_type: u8) -> Self { + self.allowed_segments.insert(seg_type); + self + } + + /// Deny a specific segment type. + pub fn deny_segment(mut self, seg_type: u8) -> Self { + self.denied_segments.insert(seg_type); + self + } + + /// Allow a specific domain. + pub fn allow_domain(mut self, domain_id: &str) -> Self { + self.allowed_domains.insert(domain_id.to_string()); + self + } + + /// Deny a specific domain. + pub fn deny_domain(mut self, domain_id: &str) -> Self { + self.denied_domains.insert(domain_id.to_string()); + self + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn default_policy() { + let p = FederationPolicy::default(); + assert_eq!(p.quality_threshold, 0.5); + assert_eq!(p.min_observations, 12); + assert!(p.is_segment_allowed(0x33)); + assert!(p.is_domain_allowed("anything")); + } + + #[test] + fn restrictive_policy() { + let p = FederationPolicy::restrictive(); + assert_eq!(p.quality_threshold, 0.8); + assert_eq!(p.min_observations, 50); + assert!(!p.export_kernels); + assert!(!p.export_cost_curves); + } + + #[test] + fn permissive_policy() { + let p = FederationPolicy::permissive(); + assert_eq!(p.quality_threshold, 0.0); + assert_eq!(p.min_observations, 1); + } + + #[test] + fn segment_allowlist() { + let p = FederationPolicy::default().allow_segment(0x33).allow_segment(0x34); + assert!(p.is_segment_allowed(0x33)); + assert!(p.is_segment_allowed(0x34)); + assert!(!p.is_segment_allowed(0x35)); // not in allowlist + } + + #[test] + fn segment_denylist() { + let p = FederationPolicy::default().deny_segment(0x36); + assert!(p.is_segment_allowed(0x33)); + assert!(!p.is_segment_allowed(0x36)); // denied + } + + #[test] + fn deny_takes_precedence() { + let p = FederationPolicy::default() + .allow_segment(0x33) + .deny_segment(0x33); + assert!(!p.is_segment_allowed(0x33)); // deny wins + } + + #[test] + fn domain_filtering() { + let p = FederationPolicy::default() + .allow_domain("genomics") + .deny_domain("secret_project"); + assert!(p.is_domain_allowed("genomics")); + assert!(!p.is_domain_allowed("secret_project")); + assert!(!p.is_domain_allowed("trading")); // not in allowlist + } + + #[test] + fn empty_allowlist_allows_all() { + let p = FederationPolicy::default(); + assert!(p.is_segment_allowed(0x33)); + assert!(p.is_segment_allowed(0xFF)); + assert!(p.is_domain_allowed("any_domain")); + } +} diff --git a/crates/rvf/rvf-federation/src/types.rs b/crates/rvf/rvf-federation/src/types.rs new file mode 100644 index 000000000..b240df18c --- /dev/null +++ b/crates/rvf/rvf-federation/src/types.rs @@ -0,0 +1,426 @@ +//! Federation segment payload types. +//! +//! Four new RVF segment types (0x33-0x36) defined in ADR-057. + +#[cfg(feature = "serde")] +use serde::{Deserialize, Serialize}; + +// ── Segment type constants ────────────────────────────────────────── + +/// Segment type discriminator for FederatedManifest. +pub const SEG_FEDERATED_MANIFEST: u8 = 0x33; +/// Segment type discriminator for DiffPrivacyProof. +pub const SEG_DIFF_PRIVACY_PROOF: u8 = 0x34; +/// Segment type discriminator for RedactionLog. +pub const SEG_REDACTION_LOG: u8 = 0x35; +/// Segment type discriminator for AggregateWeights. +pub const SEG_AGGREGATE_WEIGHTS: u8 = 0x36; + +// ── FederatedManifest (0x33) ──────────────────────────────────────── + +/// Describes a federated learning export. +/// +/// Attached as the first segment in every federation RVF file. +#[derive(Clone, Debug, PartialEq)] +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] +pub struct FederatedManifest { + /// Format version (currently 1). + pub format_version: u32, + /// Pseudonym of the contributor (never the real identity). + pub contributor_pseudonym: String, + /// UNIX timestamp (seconds) when the export was created. + pub export_timestamp_s: u64, + /// Segment IDs included in this export. + pub included_segment_ids: Vec, + /// Cumulative differential privacy budget spent (epsilon). + pub privacy_budget_spent: f64, + /// Domain identifier this export applies to. + pub domain_id: String, + /// RVF format version compatibility tag. + pub rvf_version_tag: String, + /// Number of trajectories summarized in the exported learning. + pub trajectory_count: u64, + /// Average quality score of exported trajectories. + pub avg_quality_score: f64, +} + +impl FederatedManifest { + /// Create a new manifest with required fields. + pub fn new(contributor_pseudonym: String, domain_id: String) -> Self { + Self { + format_version: 1, + contributor_pseudonym, + export_timestamp_s: 0, + included_segment_ids: Vec::new(), + privacy_budget_spent: 0.0, + domain_id, + rvf_version_tag: String::from("rvf-v1"), + trajectory_count: 0, + avg_quality_score: 0.0, + } + } +} + +// ── DiffPrivacyProof (0x34) ───────────────────────────────────────── + +/// Noise mechanism used for differential privacy. +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] +pub enum NoiseMechanism { + /// Gaussian noise for (epsilon, delta)-DP. + Gaussian, + /// Laplace noise for epsilon-DP. + Laplace, +} + +/// Differential privacy attestation. +/// +/// Records the privacy parameters and noise applied during export. +#[derive(Clone, Debug, PartialEq)] +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] +pub struct DiffPrivacyProof { + /// Privacy loss parameter. + pub epsilon: f64, + /// Probability of privacy failure. + pub delta: f64, + /// Noise mechanism applied. + pub mechanism: NoiseMechanism, + /// L2 sensitivity bound used for noise calibration. + pub sensitivity: f64, + /// Gradient clipping norm. + pub clipping_norm: f64, + /// Noise scale (sigma for Gaussian, b for Laplace). + pub noise_scale: f64, + /// Number of parameters that had noise added. + pub noised_parameter_count: u64, +} + +impl DiffPrivacyProof { + /// Create a new proof for Gaussian mechanism. + pub fn gaussian(epsilon: f64, delta: f64, sensitivity: f64, clipping_norm: f64) -> Self { + let sigma = sensitivity * (2.0_f64 * (1.25_f64 / delta).ln()).sqrt() / epsilon; + Self { + epsilon, + delta, + mechanism: NoiseMechanism::Gaussian, + sensitivity, + clipping_norm, + noise_scale: sigma, + noised_parameter_count: 0, + } + } + + /// Create a new proof for Laplace mechanism. + pub fn laplace(epsilon: f64, sensitivity: f64, clipping_norm: f64) -> Self { + let b = sensitivity / epsilon; + Self { + epsilon, + delta: 0.0, + mechanism: NoiseMechanism::Laplace, + sensitivity, + clipping_norm, + noise_scale: b, + noised_parameter_count: 0, + } + } +} + +// ── RedactionLog (0x35) ───────────────────────────────────────────── + +/// A single redaction event. +#[derive(Clone, Debug, PartialEq)] +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] +pub struct RedactionEntry { + /// Category of PII detected (e.g. "path", "ip", "email", "api_key"). + pub category: String, + /// Number of occurrences redacted. + pub count: u32, + /// Rule identifier that triggered the redaction. + pub rule_id: String, +} + +/// PII stripping attestation. +/// +/// Proves that PII scanning was performed without revealing the original content. +#[derive(Clone, Debug, PartialEq)] +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] +pub struct RedactionLog { + /// Individual redaction entries by category. + pub entries: Vec, + /// SHAKE-256 hash of the pre-redaction content (32 bytes). + pub pre_redaction_hash: [u8; 32], + /// Total number of fields scanned. + pub fields_scanned: u64, + /// Total number of redactions applied. + pub total_redactions: u64, + /// UNIX timestamp (seconds) when redaction was performed. + pub timestamp_s: u64, +} + +impl RedactionLog { + /// Create an empty redaction log. + pub fn new() -> Self { + Self { + entries: Vec::new(), + pre_redaction_hash: [0u8; 32], + fields_scanned: 0, + total_redactions: 0, + timestamp_s: 0, + } + } + + /// Add a redaction entry. + pub fn add_entry(&mut self, category: &str, count: u32, rule_id: &str) { + self.total_redactions += count as u64; + self.entries.push(RedactionEntry { + category: category.to_string(), + count, + rule_id: rule_id.to_string(), + }); + } +} + +impl Default for RedactionLog { + fn default() -> Self { + Self::new() + } +} + +// ── AggregateWeights (0x36) ───────────────────────────────────────── + +/// Federated-averaged weight vector with metadata. +#[derive(Clone, Debug, PartialEq)] +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] +pub struct AggregateWeights { + /// Federated averaging round number. + pub round: u64, + /// Number of participants in this round. + pub participation_count: u32, + /// Aggregated LoRA delta weights (flattened). + pub lora_deltas: Vec, + /// Per-weight confidence scores. + pub confidences: Vec, + /// Mean loss across participants. + pub mean_loss: f64, + /// Loss variance across participants. + pub loss_variance: f64, + /// Domain identifier. + pub domain_id: String, + /// Whether Byzantine outlier removal was applied. + pub byzantine_filtered: bool, + /// Number of contributions removed as outliers. + pub outliers_removed: u32, +} + +impl AggregateWeights { + /// Create empty aggregate weights for a domain. + pub fn new(domain_id: String, round: u64) -> Self { + Self { + round, + participation_count: 0, + lora_deltas: Vec::new(), + confidences: Vec::new(), + mean_loss: 0.0, + loss_variance: 0.0, + domain_id, + byzantine_filtered: false, + outliers_removed: 0, + } + } +} + +// ── BetaParams (local copy for federation) ────────────────────────── + +/// Beta distribution parameters for Thompson Sampling priors. +/// +/// Mirrors the type in `ruvector-domain-expansion` to avoid cross-crate dependency. +#[derive(Clone, Copy, Debug, PartialEq)] +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] +pub struct BetaParams { + /// Alpha (success count + 1). + pub alpha: f64, + /// Beta (failure count + 1). + pub beta: f64, +} + +impl BetaParams { + /// Create new Beta parameters. + pub fn new(alpha: f64, beta: f64) -> Self { + Self { alpha, beta } + } + + /// Uniform (uninformative) prior. + pub fn uniform() -> Self { + Self { alpha: 1.0, beta: 1.0 } + } + + /// Mean of the Beta distribution. + pub fn mean(&self) -> f64 { + self.alpha / (self.alpha + self.beta) + } + + /// Total observations (alpha + beta - 2 for a Beta(1,1) prior). + pub fn observations(&self) -> f64 { + self.alpha + self.beta - 2.0 + } + + /// Merge two Beta posteriors by summing parameters and subtracting the uniform prior. + pub fn merge(&self, other: &BetaParams) -> BetaParams { + BetaParams { + alpha: self.alpha + other.alpha - 1.0, + beta: self.beta + other.beta - 1.0, + } + } + + /// Dampen this prior by mixing with a uniform prior using sqrt-scaling. + pub fn dampen(&self, factor: f64) -> BetaParams { + let f = factor.clamp(0.0, 1.0); + BetaParams { + alpha: 1.0 + (self.alpha - 1.0) * f, + beta: 1.0 + (self.beta - 1.0) * f, + } + } +} + +impl Default for BetaParams { + fn default() -> Self { + Self::uniform() + } +} + +// ── TransferPrior (local copy for federation) ─────────────────────── + +/// Compact summary of learned priors for a single context bucket. +#[derive(Clone, Debug, PartialEq)] +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] +pub struct TransferPriorEntry { + /// Context bucket identifier. + pub bucket_id: String, + /// Arm identifier. + pub arm_id: String, + /// Beta posterior parameters. + pub params: BetaParams, + /// Number of observations backing this prior. + pub observation_count: u64, +} + +/// Collection of transfer priors from a trained domain. +#[derive(Clone, Debug, PartialEq)] +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] +pub struct TransferPriorSet { + /// Source domain identifier. + pub source_domain: String, + /// Individual prior entries. + pub entries: Vec, + /// EMA cost at time of extraction. + pub cost_ema: f64, +} + +// ── PolicyKernelSnapshot ──────────────────────────────────────────── + +/// Snapshot of a policy kernel configuration for federation export. +#[derive(Clone, Debug, PartialEq)] +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] +pub struct PolicyKernelSnapshot { + /// Kernel identifier. + pub kernel_id: String, + /// Tunable knob values. + pub knobs: Vec, + /// Fitness score. + pub fitness: f64, + /// Generation number. + pub generation: u64, +} + +// ── CostCurveSnapshot ─────────────────────────────────────────────── + +/// Snapshot of cost curve data for federation export. +#[derive(Clone, Debug, PartialEq)] +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] +pub struct CostCurveSnapshot { + /// Domain identifier. + pub domain_id: String, + /// Ordered (step, cost) points. + pub points: Vec<(u64, f64)>, + /// Acceleration factor (> 1.0 means transfer helped). + pub acceleration: f64, +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn segment_type_constants() { + assert_eq!(SEG_FEDERATED_MANIFEST, 0x33); + assert_eq!(SEG_DIFF_PRIVACY_PROOF, 0x34); + assert_eq!(SEG_REDACTION_LOG, 0x35); + assert_eq!(SEG_AGGREGATE_WEIGHTS, 0x36); + } + + #[test] + fn federated_manifest_new() { + let m = FederatedManifest::new("alice".into(), "genomics".into()); + assert_eq!(m.format_version, 1); + assert_eq!(m.contributor_pseudonym, "alice"); + assert_eq!(m.domain_id, "genomics"); + assert_eq!(m.trajectory_count, 0); + } + + #[test] + fn diff_privacy_proof_gaussian() { + let p = DiffPrivacyProof::gaussian(1.0, 1e-5, 1.0, 1.0); + assert_eq!(p.mechanism, NoiseMechanism::Gaussian); + assert!(p.noise_scale > 0.0); + assert_eq!(p.epsilon, 1.0); + } + + #[test] + fn diff_privacy_proof_laplace() { + let p = DiffPrivacyProof::laplace(1.0, 1.0, 1.0); + assert_eq!(p.mechanism, NoiseMechanism::Laplace); + assert!((p.noise_scale - 1.0).abs() < 1e-10); + } + + #[test] + fn redaction_log_add_entry() { + let mut log = RedactionLog::new(); + log.add_entry("path", 3, "rule_path_unix"); + log.add_entry("ip", 2, "rule_ipv4"); + assert_eq!(log.entries.len(), 2); + assert_eq!(log.total_redactions, 5); + } + + #[test] + fn aggregate_weights_new() { + let w = AggregateWeights::new("code_review".into(), 1); + assert_eq!(w.round, 1); + assert_eq!(w.participation_count, 0); + assert!(!w.byzantine_filtered); + } + + #[test] + fn beta_params_merge() { + let a = BetaParams::new(10.0, 5.0); + let b = BetaParams::new(8.0, 3.0); + let merged = a.merge(&b); + assert!((merged.alpha - 17.0).abs() < 1e-10); + assert!((merged.beta - 7.0).abs() < 1e-10); + } + + #[test] + fn beta_params_dampen() { + let p = BetaParams::new(10.0, 5.0); + let dampened = p.dampen(0.25); + // alpha = 1 + (10-1)*0.25 = 1 + 2.25 = 3.25 + assert!((dampened.alpha - 3.25).abs() < 1e-10); + // beta = 1 + (5-1)*0.25 = 1 + 1.0 = 2.0 + assert!((dampened.beta - 2.0).abs() < 1e-10); + } + + #[test] + fn beta_params_mean() { + let p = BetaParams::new(10.0, 10.0); + assert!((p.mean() - 0.5).abs() < 1e-10); + } +} diff --git a/crates/rvf/rvf-types/src/segment_type.rs b/crates/rvf/rvf-types/src/segment_type.rs index eed16682e..b19ce5ca1 100644 --- a/crates/rvf/rvf-types/src/segment_type.rs +++ b/crates/rvf/rvf-types/src/segment_type.rs @@ -69,6 +69,18 @@ pub enum SegmentType { PolicyKernel = 0x31, /// Cost curve convergence data for acceleration tracking. CostCurve = 0x32, + /// Federated learning export manifest: contributor pseudonym, export timestamp, + /// included segment IDs, privacy budget spent, format version. + FederatedManifest = 0x33, + /// Differential privacy attestation: epsilon/delta values, noise mechanism, + /// sensitivity bounds, clipping parameters. + DiffPrivacyProof = 0x34, + /// PII stripping attestation: redacted fields, rules fired, + /// hash of pre-redaction content. + RedactionLog = 0x35, + /// Federated-averaged SONA weights: aggregated LoRA deltas, + /// participation count, round number, convergence metrics. + AggregateWeights = 0x36, } impl TryFrom for SegmentType { @@ -101,6 +113,10 @@ impl TryFrom for SegmentType { 0x30 => Ok(Self::TransferPrior), 0x31 => Ok(Self::PolicyKernel), 0x32 => Ok(Self::CostCurve), + 0x33 => Ok(Self::FederatedManifest), + 0x34 => Ok(Self::DiffPrivacyProof), + 0x35 => Ok(Self::RedactionLog), + 0x36 => Ok(Self::AggregateWeights), other => Err(other), } } @@ -138,6 +154,10 @@ mod tests { SegmentType::TransferPrior, SegmentType::PolicyKernel, SegmentType::CostCurve, + SegmentType::FederatedManifest, + SegmentType::DiffPrivacyProof, + SegmentType::RedactionLog, + SegmentType::AggregateWeights, ]; for v in variants { let raw = v as u8; @@ -148,7 +168,7 @@ mod tests { #[test] fn invalid_value_returns_err() { assert_eq!(SegmentType::try_from(0x12), Err(0x12)); - assert_eq!(SegmentType::try_from(0x33), Err(0x33)); + assert_eq!(SegmentType::try_from(0x37), Err(0x37)); assert_eq!(SegmentType::try_from(0xF0), Err(0xF0)); assert_eq!(SegmentType::try_from(0xFF), Err(0xFF)); } @@ -160,6 +180,14 @@ mod tests { assert_eq!(SegmentType::CostCurve as u8, 0x32); } + #[test] + fn federation_discriminants() { + assert_eq!(SegmentType::FederatedManifest as u8, 0x33); + assert_eq!(SegmentType::DiffPrivacyProof as u8, 0x34); + assert_eq!(SegmentType::RedactionLog as u8, 0x35); + assert_eq!(SegmentType::AggregateWeights as u8, 0x36); + } + #[test] fn kernel_ebpf_wasm_discriminants() { assert_eq!(SegmentType::Kernel as u8, 0x0E);