mirror of
https://github.com/ruvnet/RuVector.git
synced 2026-05-26 07:44:05 +00:00
Pre-existing rustfmt drift across the workspace was blocking CI's `Rustfmt` check on PR #373 + PR #377. Running plain `cargo fmt` reformats 427 files; no semantic changes, no logic changes, no behavior changes — just what rustfmt already wanted. None of the touched files are in ruvector-rabitq, ruvector-rulake, or the new mirror-rulake workflow — those were already fmt-clean per the per-crate checks on commits5a4b0d782,5f32fd450,f5003bc7b. Drift is in cognitum-gate-kernel, mcp-brain, nervous-system, prime-radiant, ruqu-core, ruvector-attention, ruvector-mincut, ruvix/* and sub-crates, plus several examples. Verified post-fmt: cargo check -p ruvector-rabitq -p ruvector-rulake → clean cargo clippy -p ... -p ... --all-targets -- -D warnings → clean cargo test -p ... -p ... --release → 82/82 pass Intentionally does NOT touch clippy drift — many more warnings (missing docs, precision-loss casts, too-many-args, unsafe-safety- docs) spread across unrelated crates, each category a cross-cutting design decision that deserves its own review. With this commit Rustfmt CI goes green on PR #373 and PR #377. Clippy will still fail — that's honest pre-existing state for a separate dedicated PR. Co-Authored-By: claude-flow <ruv@ruv.net>
1064 lines
32 KiB
Rust
1064 lines
32 KiB
Rust
//! Integration tests for ruvector-cluster
|
|
//!
|
|
//! These tests exercise the public API of the cluster management system
|
|
//! end-to-end, covering cluster formation, node registration/deregistration,
|
|
//! shard assignment, consistent hashing, discovery services, and the
|
|
//! DAG-based consensus protocol.
|
|
|
|
use ruvector_cluster::consensus::{DagConsensus, DagVertex, Transaction, TransactionType};
|
|
use ruvector_cluster::discovery::{DiscoveryService, GossipDiscovery, StaticDiscovery};
|
|
use ruvector_cluster::shard::{ConsistentHashRing, LoadBalancer, ShardMigration, ShardRouter};
|
|
use ruvector_cluster::{ClusterConfig, ClusterManager, ClusterNode, NodeStatus, ShardStatus};
|
|
use std::collections::HashMap;
|
|
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
|
|
use std::time::Duration;
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Helpers
|
|
// ---------------------------------------------------------------------------
|
|
|
|
fn test_addr(port: u16) -> SocketAddr {
|
|
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), port)
|
|
}
|
|
|
|
fn test_node(id: &str, port: u16) -> ClusterNode {
|
|
ClusterNode::new(id.to_string(), test_addr(port))
|
|
}
|
|
|
|
fn test_config(shard_count: u32, replication_factor: usize) -> ClusterConfig {
|
|
ClusterConfig {
|
|
shard_count,
|
|
replication_factor,
|
|
heartbeat_interval: Duration::from_secs(5),
|
|
node_timeout: Duration::from_secs(30),
|
|
enable_consensus: true,
|
|
min_quorum_size: 2,
|
|
}
|
|
}
|
|
|
|
fn test_manager(shard_count: u32, replication_factor: usize) -> ClusterManager {
|
|
let config = test_config(shard_count, replication_factor);
|
|
let discovery = Box::new(StaticDiscovery::new(vec![]));
|
|
ClusterManager::new(config, "test-manager".to_string(), discovery).unwrap()
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// 1. Cluster creation
|
|
// ---------------------------------------------------------------------------
|
|
|
|
#[tokio::test]
|
|
async fn test_cluster_manager_creation_with_defaults() {
|
|
let config = ClusterConfig::default();
|
|
let discovery = Box::new(StaticDiscovery::new(vec![]));
|
|
let result = ClusterManager::new(config, "manager-1".to_string(), discovery);
|
|
assert!(result.is_ok());
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn test_cluster_manager_creation_with_custom_config() {
|
|
let config = ClusterConfig {
|
|
replication_factor: 5,
|
|
shard_count: 128,
|
|
heartbeat_interval: Duration::from_secs(2),
|
|
node_timeout: Duration::from_secs(10),
|
|
enable_consensus: false,
|
|
min_quorum_size: 3,
|
|
};
|
|
let discovery = Box::new(StaticDiscovery::new(vec![]));
|
|
let manager = ClusterManager::new(config, "custom-mgr".to_string(), discovery).unwrap();
|
|
|
|
// Without consensus enabled, consensus() should return None
|
|
assert!(manager.consensus().is_none());
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn test_cluster_manager_consensus_enabled() {
|
|
let config = ClusterConfig {
|
|
enable_consensus: true,
|
|
..ClusterConfig::default()
|
|
};
|
|
let discovery = Box::new(StaticDiscovery::new(vec![]));
|
|
let manager = ClusterManager::new(config, "mgr".to_string(), discovery).unwrap();
|
|
|
|
assert!(manager.consensus().is_some());
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn test_cluster_starts_empty() {
|
|
let manager = test_manager(16, 3);
|
|
let stats = manager.get_stats();
|
|
|
|
assert_eq!(stats.total_nodes, 0);
|
|
assert_eq!(stats.healthy_nodes, 0);
|
|
assert_eq!(stats.total_vectors, 0);
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// 2. Node registration and deregistration
|
|
// ---------------------------------------------------------------------------
|
|
|
|
#[tokio::test]
|
|
async fn test_add_single_node() {
|
|
let manager = test_manager(4, 2);
|
|
let node = test_node("node-1", 9001);
|
|
|
|
manager.add_node(node).await.unwrap();
|
|
|
|
let nodes = manager.list_nodes();
|
|
assert_eq!(nodes.len(), 1);
|
|
assert_eq!(nodes[0].node_id, "node-1");
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn test_add_multiple_nodes() {
|
|
let manager = test_manager(4, 2);
|
|
|
|
for i in 0..5 {
|
|
manager
|
|
.add_node(test_node(&format!("n{}", i), 9000 + i))
|
|
.await
|
|
.unwrap();
|
|
}
|
|
|
|
assert_eq!(manager.list_nodes().len(), 5);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn test_get_node_by_id() {
|
|
let manager = test_manager(4, 2);
|
|
manager.add_node(test_node("alpha", 9001)).await.unwrap();
|
|
manager.add_node(test_node("beta", 9002)).await.unwrap();
|
|
|
|
let found = manager.get_node("alpha");
|
|
assert!(found.is_some());
|
|
assert_eq!(found.unwrap().node_id, "alpha");
|
|
|
|
let missing = manager.get_node("gamma");
|
|
assert!(missing.is_none());
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn test_remove_node() {
|
|
let manager = test_manager(4, 2);
|
|
|
|
manager.add_node(test_node("n1", 9001)).await.unwrap();
|
|
manager.add_node(test_node("n2", 9002)).await.unwrap();
|
|
assert_eq!(manager.list_nodes().len(), 2);
|
|
|
|
manager.remove_node("n1").await.unwrap();
|
|
assert_eq!(manager.list_nodes().len(), 1);
|
|
|
|
// Verify the correct node was removed
|
|
assert!(manager.get_node("n1").is_none());
|
|
assert!(manager.get_node("n2").is_some());
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn test_remove_nonexistent_node_does_not_panic() {
|
|
let manager = test_manager(4, 2);
|
|
// Removing a non-existent node from an empty cluster triggers rebalancing
|
|
// which may error because there are no nodes to assign shards to.
|
|
// The important property is that it does not panic.
|
|
let _result = manager.remove_node("does-not-exist").await;
|
|
// If we reach this point, the call didn't panic -- that's the safety guarantee.
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn test_remove_node_that_was_added() {
|
|
let manager = test_manager(4, 2);
|
|
manager.add_node(test_node("a", 9001)).await.unwrap();
|
|
manager.add_node(test_node("b", 9002)).await.unwrap();
|
|
|
|
// Removing an existing node when others remain should succeed
|
|
let result = manager.remove_node("a").await;
|
|
assert!(result.is_ok());
|
|
assert!(manager.get_node("a").is_none());
|
|
assert!(manager.get_node("b").is_some());
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn test_node_health_check() {
|
|
let node = test_node("healthy-node", 9001);
|
|
assert!(node.is_healthy(Duration::from_secs(60)));
|
|
|
|
// A very tight timeout will consider even a fresh node unhealthy
|
|
// (since some nanoseconds have passed since creation)
|
|
// We test the opposite: a generous timeout keeps the node healthy
|
|
assert!(node.is_healthy(Duration::from_secs(300)));
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn test_node_heartbeat_refreshes_timestamp() {
|
|
let mut node = test_node("node-hb", 9001);
|
|
let original_last_seen = node.last_seen;
|
|
|
|
// Small delay to ensure timestamps differ
|
|
tokio::time::sleep(Duration::from_millis(10)).await;
|
|
node.heartbeat();
|
|
|
|
assert!(
|
|
node.last_seen >= original_last_seen,
|
|
"heartbeat should refresh last_seen"
|
|
);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn test_node_default_status_is_follower() {
|
|
let node = test_node("new-node", 9001);
|
|
assert_eq!(node.status, NodeStatus::Follower);
|
|
assert_eq!(node.capacity, 1.0);
|
|
assert!(node.metadata.is_empty());
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn test_healthy_nodes_filter() {
|
|
let manager = test_manager(4, 2);
|
|
|
|
// All fresh nodes should be healthy
|
|
for i in 0..3 {
|
|
manager
|
|
.add_node(test_node(&format!("h{}", i), 9000 + i))
|
|
.await
|
|
.unwrap();
|
|
}
|
|
|
|
let healthy = manager.healthy_nodes();
|
|
assert_eq!(healthy.len(), 3);
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// 3. Shard assignment
|
|
// ---------------------------------------------------------------------------
|
|
|
|
#[tokio::test]
|
|
async fn test_shard_assignment_with_nodes() {
|
|
let manager = test_manager(8, 2);
|
|
|
|
for i in 0..3 {
|
|
manager
|
|
.add_node(test_node(&format!("node{}", i), 9000 + i))
|
|
.await
|
|
.unwrap();
|
|
}
|
|
|
|
let shard = manager.assign_shard(0).unwrap();
|
|
assert_eq!(shard.shard_id, 0);
|
|
assert_eq!(shard.status, ShardStatus::Active);
|
|
assert!(!shard.primary_node.is_empty());
|
|
assert_eq!(shard.vector_count, 0);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn test_shard_assignment_returns_replicas() {
|
|
let manager = test_manager(8, 3);
|
|
|
|
for i in 0..5 {
|
|
manager
|
|
.add_node(test_node(&format!("node{}", i), 9000 + i))
|
|
.await
|
|
.unwrap();
|
|
}
|
|
|
|
let shard = manager.assign_shard(1).unwrap();
|
|
// With replication_factor=3, we expect 1 primary + up to 2 replicas
|
|
assert!(
|
|
shard.replica_nodes.len() <= 2,
|
|
"should have at most (replication_factor - 1) replicas"
|
|
);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn test_shard_assignment_without_nodes_fails() {
|
|
let manager = test_manager(8, 2);
|
|
// No nodes added => assignment should fail
|
|
let result = manager.assign_shard(0);
|
|
assert!(result.is_err(), "assigning shard with no nodes should fail");
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn test_get_shard_after_assignment() {
|
|
let manager = test_manager(8, 2);
|
|
manager.add_node(test_node("n1", 9001)).await.unwrap();
|
|
|
|
manager.assign_shard(3).unwrap();
|
|
|
|
let shard = manager.get_shard(3);
|
|
assert!(shard.is_some());
|
|
assert_eq!(shard.unwrap().shard_id, 3);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn test_list_shards() {
|
|
let manager = test_manager(8, 2);
|
|
manager.add_node(test_node("n1", 9001)).await.unwrap();
|
|
|
|
for sid in 0..4 {
|
|
manager.assign_shard(sid).unwrap();
|
|
}
|
|
|
|
// There may be more shards due to rebalancing in add_node,
|
|
// but at least our 4 manual assignments should be present
|
|
let shards = manager.list_shards();
|
|
assert!(shards.len() >= 4);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn test_cluster_stats_after_setup() {
|
|
let manager = test_manager(4, 2);
|
|
|
|
for i in 0..3 {
|
|
manager
|
|
.add_node(test_node(&format!("s{}", i), 9000 + i))
|
|
.await
|
|
.unwrap();
|
|
}
|
|
|
|
let stats = manager.get_stats();
|
|
assert_eq!(stats.total_nodes, 3);
|
|
assert_eq!(stats.healthy_nodes, 3);
|
|
// Shards should be auto-assigned during rebalancing
|
|
assert!(stats.total_shards > 0);
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// 4. Consistent hash ring
|
|
// ---------------------------------------------------------------------------
|
|
|
|
#[test]
|
|
fn test_hash_ring_empty() {
|
|
let ring = ConsistentHashRing::new(3);
|
|
assert_eq!(ring.node_count(), 0);
|
|
assert!(ring.get_primary_node("any-key").is_none());
|
|
assert!(ring.get_nodes("any-key", 3).is_empty());
|
|
}
|
|
|
|
#[test]
|
|
fn test_hash_ring_add_and_remove() {
|
|
let mut ring = ConsistentHashRing::new(2);
|
|
|
|
ring.add_node("alpha".to_string());
|
|
ring.add_node("beta".to_string());
|
|
assert_eq!(ring.node_count(), 2);
|
|
|
|
ring.remove_node("alpha");
|
|
assert_eq!(ring.node_count(), 1);
|
|
|
|
let nodes = ring.list_nodes();
|
|
assert!(nodes.contains(&"beta".to_string()));
|
|
assert!(!nodes.contains(&"alpha".to_string()));
|
|
}
|
|
|
|
#[test]
|
|
fn test_hash_ring_idempotent_add() {
|
|
let mut ring = ConsistentHashRing::new(2);
|
|
ring.add_node("node".to_string());
|
|
ring.add_node("node".to_string()); // duplicate
|
|
assert_eq!(ring.node_count(), 1);
|
|
}
|
|
|
|
#[test]
|
|
fn test_hash_ring_remove_nonexistent_is_safe() {
|
|
let mut ring = ConsistentHashRing::new(2);
|
|
ring.add_node("existing".to_string());
|
|
ring.remove_node("ghost"); // should not panic
|
|
assert_eq!(ring.node_count(), 1);
|
|
}
|
|
|
|
#[test]
|
|
fn test_hash_ring_deterministic_routing() {
|
|
let mut ring = ConsistentHashRing::new(3);
|
|
ring.add_node("a".to_string());
|
|
ring.add_node("b".to_string());
|
|
ring.add_node("c".to_string());
|
|
|
|
let primary1 = ring.get_primary_node("my-key").unwrap();
|
|
let primary2 = ring.get_primary_node("my-key").unwrap();
|
|
assert_eq!(
|
|
primary1, primary2,
|
|
"same key must always route to same node"
|
|
);
|
|
}
|
|
|
|
#[test]
|
|
fn test_hash_ring_distribution_fairness() {
|
|
let mut ring = ConsistentHashRing::new(3);
|
|
ring.add_node("n1".to_string());
|
|
ring.add_node("n2".to_string());
|
|
ring.add_node("n3".to_string());
|
|
|
|
let mut counts: HashMap<String, usize> = HashMap::new();
|
|
for i in 0..3000 {
|
|
let key = format!("key-{}", i);
|
|
if let Some(node) = ring.get_primary_node(&key) {
|
|
*counts.entry(node).or_default() += 1;
|
|
}
|
|
}
|
|
|
|
// Each node should get roughly 1/3 of keys (with some variance)
|
|
for (node, count) in &counts {
|
|
let ratio = *count as f64 / 3000.0;
|
|
assert!(
|
|
ratio > 0.15 && ratio < 0.55,
|
|
"node {} has ratio {} which is outside acceptable range",
|
|
node,
|
|
ratio
|
|
);
|
|
}
|
|
}
|
|
|
|
#[test]
|
|
fn test_hash_ring_get_nodes_returns_unique_nodes() {
|
|
let mut ring = ConsistentHashRing::new(3);
|
|
ring.add_node("x".to_string());
|
|
ring.add_node("y".to_string());
|
|
ring.add_node("z".to_string());
|
|
|
|
let nodes = ring.get_nodes("test-key", 3);
|
|
assert_eq!(nodes.len(), 3);
|
|
|
|
// All nodes should be unique
|
|
let mut unique = nodes.clone();
|
|
unique.sort();
|
|
unique.dedup();
|
|
assert_eq!(unique.len(), 3, "get_nodes should return unique nodes");
|
|
}
|
|
|
|
#[test]
|
|
fn test_hash_ring_get_nodes_caps_at_available() {
|
|
let mut ring = ConsistentHashRing::new(3);
|
|
ring.add_node("only-node".to_string());
|
|
|
|
// Requesting 5 nodes when only 1 exists
|
|
let nodes = ring.get_nodes("key", 5);
|
|
assert_eq!(nodes.len(), 1);
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// 5. Shard router (jump consistent hash)
|
|
// ---------------------------------------------------------------------------
|
|
|
|
#[test]
|
|
fn test_shard_router_deterministic() {
|
|
let router = ShardRouter::new(32);
|
|
|
|
let shard1 = router.get_shard("vector-123");
|
|
let shard2 = router.get_shard("vector-123");
|
|
assert_eq!(shard1, shard2, "same key must always route to same shard");
|
|
}
|
|
|
|
#[test]
|
|
fn test_shard_router_range_within_bounds() {
|
|
let shard_count = 64;
|
|
let router = ShardRouter::new(shard_count);
|
|
|
|
for i in 0..1000 {
|
|
let shard = router.get_shard(&format!("key-{}", i));
|
|
assert!(
|
|
shard < shard_count,
|
|
"shard {} exceeds shard_count {}",
|
|
shard,
|
|
shard_count
|
|
);
|
|
}
|
|
}
|
|
|
|
#[test]
|
|
fn test_shard_router_cache_behaviour() {
|
|
let router = ShardRouter::new(16);
|
|
|
|
let _ = router.get_shard("cached-key");
|
|
let stats = router.cache_stats();
|
|
assert_eq!(stats.entries, 1);
|
|
|
|
let _ = router.get_shard("another-key");
|
|
let stats = router.cache_stats();
|
|
assert_eq!(stats.entries, 2);
|
|
|
|
router.clear_cache();
|
|
let stats = router.cache_stats();
|
|
assert_eq!(stats.entries, 0);
|
|
}
|
|
|
|
#[test]
|
|
fn test_shard_router_for_vector_id() {
|
|
let router = ShardRouter::new(16);
|
|
|
|
let shard_a = router.get_shard_for_vector("vec-abc");
|
|
let shard_b = router.get_shard_for_vector("vec-abc");
|
|
assert_eq!(shard_a, shard_b);
|
|
}
|
|
|
|
#[test]
|
|
fn test_shard_router_range_query_returns_all_shards() {
|
|
let router = ShardRouter::new(8);
|
|
let shards = router.get_shards_for_range("a", "z");
|
|
assert_eq!(shards.len(), 8);
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// 6. Shard migration
|
|
// ---------------------------------------------------------------------------
|
|
|
|
#[test]
|
|
fn test_shard_migration_lifecycle() {
|
|
let mut migration = ShardMigration::new(0, 1, 200);
|
|
|
|
assert!(!migration.is_complete());
|
|
assert_eq!(migration.progress, 0.0);
|
|
assert_eq!(migration.source_shard, 0);
|
|
assert_eq!(migration.target_shard, 1);
|
|
|
|
migration.update_progress(100);
|
|
assert!((migration.progress - 0.5).abs() < f64::EPSILON);
|
|
assert!(!migration.is_complete());
|
|
|
|
migration.update_progress(200);
|
|
assert!(migration.is_complete());
|
|
assert!((migration.progress - 1.0).abs() < f64::EPSILON);
|
|
}
|
|
|
|
#[test]
|
|
fn test_shard_migration_zero_keys() {
|
|
let migration = ShardMigration::new(0, 1, 0);
|
|
// With 0 total keys, migration should be considered complete
|
|
assert!(migration.is_complete());
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// 7. Load balancer
|
|
// ---------------------------------------------------------------------------
|
|
|
|
#[test]
|
|
fn test_load_balancer_least_loaded() {
|
|
let lb = LoadBalancer::new();
|
|
|
|
lb.update_load(0, 0.9);
|
|
lb.update_load(1, 0.2);
|
|
lb.update_load(2, 0.5);
|
|
|
|
let least = lb.get_least_loaded_shard(&[0, 1, 2]);
|
|
assert_eq!(least, Some(1));
|
|
}
|
|
|
|
#[test]
|
|
fn test_load_balancer_stats() {
|
|
let lb = LoadBalancer::new();
|
|
|
|
lb.update_load(0, 0.4);
|
|
lb.update_load(1, 0.6);
|
|
|
|
let stats = lb.get_stats();
|
|
assert_eq!(stats.shard_count, 2);
|
|
assert!((stats.avg_load - 0.5).abs() < f64::EPSILON);
|
|
assert!((stats.max_load - 0.6).abs() < f64::EPSILON);
|
|
assert!((stats.min_load - 0.4).abs() < f64::EPSILON);
|
|
}
|
|
|
|
#[test]
|
|
fn test_load_balancer_empty() {
|
|
let lb = LoadBalancer::new();
|
|
let stats = lb.get_stats();
|
|
assert_eq!(stats.shard_count, 0);
|
|
assert_eq!(stats.avg_load, 0.0);
|
|
}
|
|
|
|
#[test]
|
|
fn test_load_balancer_no_candidates() {
|
|
let lb = LoadBalancer::new();
|
|
let result = lb.get_least_loaded_shard(&[]);
|
|
assert!(result.is_none());
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// 8. Discovery services
|
|
// ---------------------------------------------------------------------------
|
|
|
|
#[tokio::test]
|
|
async fn test_static_discovery_full_lifecycle() {
|
|
let discovery = StaticDiscovery::new(vec![]);
|
|
|
|
// Register nodes
|
|
discovery
|
|
.register_node(test_node("d1", 9001))
|
|
.await
|
|
.unwrap();
|
|
discovery
|
|
.register_node(test_node("d2", 9002))
|
|
.await
|
|
.unwrap();
|
|
|
|
let nodes = discovery.discover_nodes().await.unwrap();
|
|
assert_eq!(nodes.len(), 2);
|
|
|
|
// Unregister one
|
|
discovery.unregister_node("d1").await.unwrap();
|
|
let nodes = discovery.discover_nodes().await.unwrap();
|
|
assert_eq!(nodes.len(), 1);
|
|
assert_eq!(nodes[0].node_id, "d2");
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn test_static_discovery_heartbeat() {
|
|
let node = test_node("hb-node", 9001);
|
|
let discovery = StaticDiscovery::new(vec![node]);
|
|
|
|
// Heartbeat should succeed without error
|
|
let result = discovery.heartbeat("hb-node").await;
|
|
assert!(result.is_ok());
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn test_gossip_discovery_initial_state() {
|
|
let local = test_node("local-g", 8000);
|
|
let discovery = GossipDiscovery::new(
|
|
local,
|
|
vec![test_addr(9000)],
|
|
Duration::from_secs(5),
|
|
Duration::from_secs(30),
|
|
);
|
|
|
|
let nodes = discovery.discover_nodes().await.unwrap();
|
|
assert_eq!(nodes.len(), 1, "only local node should exist initially");
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn test_gossip_discovery_merge() {
|
|
let local = test_node("local-g", 8000);
|
|
let discovery = GossipDiscovery::new(
|
|
local,
|
|
vec![],
|
|
Duration::from_secs(5),
|
|
Duration::from_secs(30),
|
|
);
|
|
|
|
// Simulate receiving gossip from remote nodes
|
|
let remote_nodes = vec![
|
|
test_node("remote-1", 8001),
|
|
test_node("remote-2", 8002),
|
|
test_node("remote-3", 8003),
|
|
];
|
|
discovery.merge_gossip(remote_nodes);
|
|
|
|
let stats = discovery.get_stats();
|
|
assert_eq!(stats.total_nodes, 4); // local + 3 remote
|
|
assert_eq!(stats.healthy_nodes, 4);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn test_gossip_discovery_register_and_unregister() {
|
|
let local = test_node("local", 8000);
|
|
let discovery = GossipDiscovery::new(
|
|
local,
|
|
vec![],
|
|
Duration::from_secs(5),
|
|
Duration::from_secs(30),
|
|
);
|
|
|
|
discovery
|
|
.register_node(test_node("new-peer", 8010))
|
|
.await
|
|
.unwrap();
|
|
|
|
let nodes = discovery.discover_nodes().await.unwrap();
|
|
assert_eq!(nodes.len(), 2);
|
|
|
|
discovery.unregister_node("new-peer").await.unwrap();
|
|
let nodes = discovery.discover_nodes().await.unwrap();
|
|
assert_eq!(nodes.len(), 1);
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// 9. DAG consensus
|
|
// ---------------------------------------------------------------------------
|
|
|
|
#[test]
|
|
fn test_dag_consensus_creation() {
|
|
let consensus = DagConsensus::new("node-1".to_string(), 2);
|
|
let stats = consensus.get_stats();
|
|
|
|
assert_eq!(stats.total_vertices, 0);
|
|
assert_eq!(stats.finalized_vertices, 0);
|
|
assert_eq!(stats.pending_transactions, 0);
|
|
assert_eq!(stats.tips, 0);
|
|
}
|
|
|
|
#[test]
|
|
fn test_dag_submit_and_create_vertex() {
|
|
let consensus = DagConsensus::new("node-1".to_string(), 2);
|
|
|
|
let tx_id = consensus
|
|
.submit_transaction(TransactionType::Write, b"data".to_vec())
|
|
.unwrap();
|
|
assert!(!tx_id.is_empty());
|
|
|
|
let stats = consensus.get_stats();
|
|
assert_eq!(stats.pending_transactions, 1);
|
|
|
|
let vertex = consensus.create_vertex().unwrap();
|
|
assert!(vertex.is_some());
|
|
|
|
let stats = consensus.get_stats();
|
|
assert_eq!(stats.total_vertices, 1);
|
|
assert_eq!(stats.pending_transactions, 0);
|
|
assert_eq!(stats.tips, 1);
|
|
}
|
|
|
|
#[test]
|
|
fn test_dag_create_vertex_with_no_pending_returns_none() {
|
|
let consensus = DagConsensus::new("node-1".to_string(), 2);
|
|
let vertex = consensus.create_vertex().unwrap();
|
|
assert!(vertex.is_none());
|
|
}
|
|
|
|
#[test]
|
|
fn test_dag_multiple_vertices_form_chain() {
|
|
let consensus = DagConsensus::new("node-1".to_string(), 2);
|
|
|
|
// Create a chain of 5 vertices
|
|
for i in 0..5 {
|
|
consensus
|
|
.submit_transaction(TransactionType::Write, vec![i])
|
|
.unwrap();
|
|
consensus.create_vertex().unwrap();
|
|
}
|
|
|
|
let stats = consensus.get_stats();
|
|
assert_eq!(stats.total_vertices, 5);
|
|
// Each new vertex references the previous as parent, so there should be 1 tip
|
|
assert_eq!(stats.tips, 1);
|
|
}
|
|
|
|
#[test]
|
|
fn test_dag_add_vertex_from_another_node() {
|
|
let consensus = DagConsensus::new("node-1".to_string(), 2);
|
|
|
|
// Create a local vertex first to serve as parent
|
|
consensus
|
|
.submit_transaction(TransactionType::Write, b"local".to_vec())
|
|
.unwrap();
|
|
let local_vertex = consensus.create_vertex().unwrap().unwrap();
|
|
|
|
// Simulate a vertex from another node referencing the local vertex
|
|
let remote_vertex = DagVertex::new(
|
|
"node-2".to_string(),
|
|
Transaction {
|
|
id: "remote-tx-1".to_string(),
|
|
tx_type: TransactionType::Write,
|
|
data: b"remote-data".to_vec(),
|
|
nonce: 1,
|
|
},
|
|
vec![local_vertex.id.clone()],
|
|
{
|
|
let mut clock = HashMap::new();
|
|
clock.insert("node-1".to_string(), 1);
|
|
clock.insert("node-2".to_string(), 1);
|
|
clock
|
|
},
|
|
);
|
|
|
|
let result = consensus.add_vertex(remote_vertex);
|
|
assert!(result.is_ok());
|
|
|
|
let stats = consensus.get_stats();
|
|
assert_eq!(stats.total_vertices, 2);
|
|
}
|
|
|
|
#[test]
|
|
fn test_dag_add_vertex_with_missing_parent_fails() {
|
|
let consensus = DagConsensus::new("node-1".to_string(), 2);
|
|
|
|
let orphan_vertex = DagVertex::new(
|
|
"node-2".to_string(),
|
|
Transaction {
|
|
id: "orphan-tx".to_string(),
|
|
tx_type: TransactionType::Write,
|
|
data: b"orphan".to_vec(),
|
|
nonce: 1,
|
|
},
|
|
vec!["nonexistent-parent-id".to_string()],
|
|
HashMap::new(),
|
|
);
|
|
|
|
let result = consensus.add_vertex(orphan_vertex);
|
|
assert!(
|
|
result.is_err(),
|
|
"vertex with missing parent should be rejected"
|
|
);
|
|
}
|
|
|
|
#[test]
|
|
fn test_dag_finalization_requires_quorum() {
|
|
let consensus = DagConsensus::new("node-1".to_string(), 2);
|
|
|
|
// Create vertices only from node-1
|
|
for i in 0..3 {
|
|
consensus
|
|
.submit_transaction(TransactionType::Write, vec![i])
|
|
.unwrap();
|
|
consensus.create_vertex().unwrap();
|
|
}
|
|
|
|
// Without vertices from other nodes, nothing can be finalized
|
|
let finalized = consensus.finalize_vertices().unwrap();
|
|
assert_eq!(finalized.len(), 0);
|
|
}
|
|
|
|
#[test]
|
|
fn test_dag_finalization_with_multi_node_confirmations() {
|
|
let consensus = DagConsensus::new("node-1".to_string(), 1);
|
|
|
|
// Create vertex from node-1
|
|
consensus
|
|
.submit_transaction(TransactionType::Write, b"v1".to_vec())
|
|
.unwrap();
|
|
let v1 = consensus.create_vertex().unwrap().unwrap();
|
|
|
|
// Simulate a confirmation vertex from node-2 that references v1
|
|
let confirm = DagVertex::new(
|
|
"node-2".to_string(),
|
|
Transaction {
|
|
id: "confirm-tx".to_string(),
|
|
tx_type: TransactionType::Write,
|
|
data: b"confirm".to_vec(),
|
|
nonce: 1,
|
|
},
|
|
vec![v1.id.clone()],
|
|
{
|
|
let mut clock = HashMap::new();
|
|
clock.insert("node-1".to_string(), 1);
|
|
clock.insert("node-2".to_string(), 1);
|
|
clock
|
|
},
|
|
);
|
|
consensus.add_vertex(confirm).unwrap();
|
|
|
|
// Now v1 has a confirmation from node-2, with min_quorum_size=1 it should finalize
|
|
let finalized = consensus.finalize_vertices().unwrap();
|
|
assert!(
|
|
finalized.contains(&v1.id),
|
|
"v1 should be finalized with sufficient confirmations"
|
|
);
|
|
assert!(consensus.is_finalized(&v1.id));
|
|
}
|
|
|
|
#[test]
|
|
fn test_dag_conflict_detection() {
|
|
let consensus = DagConsensus::new("node-1".to_string(), 2);
|
|
|
|
let write_tx = Transaction {
|
|
id: "w1".to_string(),
|
|
tx_type: TransactionType::Write,
|
|
data: b"write".to_vec(),
|
|
nonce: 1,
|
|
};
|
|
let another_write = Transaction {
|
|
id: "w2".to_string(),
|
|
tx_type: TransactionType::Write,
|
|
data: b"write2".to_vec(),
|
|
nonce: 2,
|
|
};
|
|
let read_tx = Transaction {
|
|
id: "r1".to_string(),
|
|
tx_type: TransactionType::Read,
|
|
data: b"read".to_vec(),
|
|
nonce: 3,
|
|
};
|
|
|
|
// Write-Write conflicts
|
|
assert!(consensus.detect_conflicts(&write_tx, &another_write));
|
|
// Read-Read does not conflict
|
|
assert!(!consensus.detect_conflicts(&read_tx, &read_tx));
|
|
// Read-Write does not conflict (per current implementation)
|
|
assert!(!consensus.detect_conflicts(&read_tx, &write_tx));
|
|
}
|
|
|
|
#[test]
|
|
fn test_dag_prune_old_vertices() {
|
|
let consensus = DagConsensus::new("node-1".to_string(), 1);
|
|
|
|
// Create vertices from node-1
|
|
let mut vertex_ids = Vec::new();
|
|
for i in 0..5 {
|
|
consensus
|
|
.submit_transaction(TransactionType::Write, vec![i])
|
|
.unwrap();
|
|
let v = consensus.create_vertex().unwrap().unwrap();
|
|
vertex_ids.push(v.id.clone());
|
|
}
|
|
|
|
// Manually finalize by adding confirmations from node-2
|
|
for vid in &vertex_ids {
|
|
let confirm = DagVertex::new(
|
|
"node-2".to_string(),
|
|
Transaction {
|
|
id: format!("confirm-{}", vid),
|
|
tx_type: TransactionType::System,
|
|
data: vec![],
|
|
nonce: 0,
|
|
},
|
|
vec![vid.clone()],
|
|
{
|
|
let mut c = HashMap::new();
|
|
c.insert("node-2".to_string(), 1);
|
|
c
|
|
},
|
|
);
|
|
consensus.add_vertex(confirm).unwrap();
|
|
}
|
|
consensus.finalize_vertices().unwrap();
|
|
|
|
// Prune keeping only 2 finalized vertices
|
|
consensus.prune_old_vertices(2);
|
|
|
|
// Stats should reflect pruning (total_vertices reduced)
|
|
let stats = consensus.get_stats();
|
|
// Some vertices were pruned; the exact count depends on implementation
|
|
// but total should be less than the original 10 (5 originals + 5 confirmations)
|
|
assert!(
|
|
stats.total_vertices < 10,
|
|
"pruning should reduce vertex count, got {}",
|
|
stats.total_vertices
|
|
);
|
|
}
|
|
|
|
#[test]
|
|
fn test_dag_get_finalized_order() {
|
|
let consensus = DagConsensus::new("node-1".to_string(), 1);
|
|
|
|
// Submit and create 3 vertices
|
|
for i in 0..3 {
|
|
consensus
|
|
.submit_transaction(TransactionType::Write, vec![i])
|
|
.unwrap();
|
|
consensus.create_vertex().unwrap();
|
|
}
|
|
|
|
// Without finalization, no transactions should be in the finalized order
|
|
let order = consensus.get_finalized_order();
|
|
assert!(order.is_empty());
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// 10. Cluster start with discovery
|
|
// ---------------------------------------------------------------------------
|
|
|
|
#[tokio::test]
|
|
async fn test_cluster_start_with_static_discovery() {
|
|
let peers = vec![test_node("peer-1", 9001), test_node("peer-2", 9002)];
|
|
|
|
let config = test_config(4, 2);
|
|
let discovery = Box::new(StaticDiscovery::new(peers));
|
|
let manager = ClusterManager::new(config, "self-node".to_string(), discovery).unwrap();
|
|
|
|
// start() discovers peers and initialises shards
|
|
manager.start().await.unwrap();
|
|
|
|
let stats = manager.get_stats();
|
|
assert_eq!(stats.total_nodes, 2, "discovered peers should be added");
|
|
assert!(stats.total_shards > 0, "shards should be initialised");
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn test_cluster_start_excludes_self_from_discovery() {
|
|
let peers = vec![
|
|
test_node("self-node", 9000), // same as manager's own ID
|
|
test_node("other", 9001),
|
|
];
|
|
|
|
let config = test_config(4, 2);
|
|
let discovery = Box::new(StaticDiscovery::new(peers));
|
|
let manager = ClusterManager::new(config, "self-node".to_string(), discovery).unwrap();
|
|
|
|
manager.start().await.unwrap();
|
|
|
|
// self-node should NOT be added to the cluster (it would be the local node)
|
|
let stats = manager.get_stats();
|
|
assert_eq!(stats.total_nodes, 1, "self should be excluded from peers");
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// 11. Health check marks offline nodes
|
|
// ---------------------------------------------------------------------------
|
|
|
|
#[tokio::test]
|
|
async fn test_health_check_no_panic_on_empty_cluster() {
|
|
let manager = test_manager(4, 2);
|
|
let result = manager.run_health_checks().await;
|
|
assert!(result.is_ok());
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn test_health_check_keeps_fresh_nodes_healthy() {
|
|
let manager = test_manager(4, 2);
|
|
|
|
manager.add_node(test_node("fresh", 9001)).await.unwrap();
|
|
manager.run_health_checks().await.unwrap();
|
|
|
|
// Fresh node should still be healthy (not marked offline)
|
|
let node = manager.get_node("fresh").unwrap();
|
|
assert_ne!(node.status, NodeStatus::Offline);
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// 12. Router accessor
|
|
// ---------------------------------------------------------------------------
|
|
|
|
#[tokio::test]
|
|
async fn test_cluster_manager_exposes_router() {
|
|
let manager = test_manager(16, 2);
|
|
let router = manager.router();
|
|
|
|
// Router should work independently
|
|
let shard = router.get_shard("test-vector");
|
|
assert!(shard < 16);
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// 13. End-to-end: full cluster lifecycle
|
|
// ---------------------------------------------------------------------------
|
|
|
|
#[tokio::test]
|
|
async fn test_full_cluster_lifecycle() {
|
|
// 1. Create cluster
|
|
let manager = test_manager(8, 2);
|
|
|
|
// 2. Add nodes
|
|
for i in 0..4 {
|
|
manager
|
|
.add_node(test_node(&format!("lifecycle-{}", i), 9000 + i))
|
|
.await
|
|
.unwrap();
|
|
}
|
|
|
|
// 3. Verify cluster state
|
|
assert_eq!(manager.list_nodes().len(), 4);
|
|
assert_eq!(manager.healthy_nodes().len(), 4);
|
|
|
|
// 4. Assign shards manually (in addition to auto-rebalanced ones)
|
|
for sid in 0..8 {
|
|
let _ = manager.assign_shard(sid); // may already exist from rebalancing
|
|
}
|
|
|
|
// 5. All shards should be assigned
|
|
for sid in 0..8 {
|
|
let shard = manager.get_shard(sid);
|
|
assert!(shard.is_some(), "shard {} should be assigned", sid);
|
|
}
|
|
|
|
// 6. Use router to map vectors to shards
|
|
let router = manager.router();
|
|
let shard_id = router.get_shard("my-vector-id");
|
|
assert!(shard_id < 8);
|
|
|
|
// 7. Remove a node
|
|
manager.remove_node("lifecycle-0").await.unwrap();
|
|
assert_eq!(manager.list_nodes().len(), 3);
|
|
|
|
// 8. Verify stats
|
|
let stats = manager.get_stats();
|
|
assert_eq!(stats.total_nodes, 3);
|
|
assert!(stats.total_shards > 0);
|
|
|
|
// 9. Health check
|
|
manager.run_health_checks().await.unwrap();
|
|
}
|