mirror of
https://github.com/ruvnet/RuVector.git
synced 2026-05-24 13:54:31 +00:00
322 lines
9.9 KiB
Rust
322 lines
9.9 KiB
Rust
//! Integration tests for parallel query execution
|
|
|
|
#[cfg(test)]
|
|
mod parallel_tests {
|
|
use ruvector_postgres::index::parallel::*;
|
|
use ruvector_postgres::index::hnsw::{HnswIndex, HnswConfig};
|
|
use ruvector_postgres::distance::DistanceMetric;
|
|
|
|
#[test]
|
|
fn test_parallel_worker_estimation() {
|
|
// Small index - no parallelism
|
|
let workers = ruhnsw_estimate_parallel_workers(50, 5000, 10, 40);
|
|
assert_eq!(workers, 0, "Small indexes should not use parallelism");
|
|
|
|
// Medium index - some workers
|
|
let workers = ruhnsw_estimate_parallel_workers(2000, 100000, 10, 40);
|
|
assert!(workers > 0 && workers <= 4, "Medium indexes should use 1-4 workers");
|
|
|
|
// Large index - more workers
|
|
let workers = ruhnsw_estimate_parallel_workers(10000, 1000000, 10, 40);
|
|
assert!(workers >= 2, "Large indexes should use multiple workers");
|
|
|
|
// Complex query - more workers
|
|
let workers_simple = ruhnsw_estimate_parallel_workers(5000, 500000, 10, 40);
|
|
let workers_complex = ruhnsw_estimate_parallel_workers(5000, 500000, 200, 200);
|
|
assert!(
|
|
workers_complex >= workers_simple,
|
|
"Complex queries should use more workers"
|
|
);
|
|
}
|
|
|
|
#[test]
|
|
fn test_partition_estimation() {
|
|
// Should create more partitions than workers for load balancing
|
|
let partitions = estimate_partitions(4, 100000);
|
|
assert!(partitions >= 4, "Should have at least as many partitions as workers");
|
|
assert!(partitions <= 50, "Should not create too many partitions");
|
|
|
|
// Large dataset should create more partitions
|
|
let partitions_large = estimate_partitions(4, 1000000);
|
|
let partitions_small = estimate_partitions(4, 50000);
|
|
assert!(
|
|
partitions_large >= partitions_small,
|
|
"Larger datasets should have more partitions"
|
|
);
|
|
}
|
|
|
|
#[test]
|
|
fn test_shared_state_work_stealing() {
|
|
let state = RuHnswSharedState::new(
|
|
4, // 4 workers
|
|
16, // 16 partitions
|
|
128, // 128 dimensions
|
|
10, // k=10
|
|
40, // ef_search=40
|
|
DistanceMetric::Euclidean,
|
|
);
|
|
|
|
// Workers should be able to claim partitions
|
|
let mut claimed = Vec::new();
|
|
for _ in 0..16 {
|
|
if let Some(partition) = state.get_next_partition() {
|
|
claimed.push(partition);
|
|
}
|
|
}
|
|
|
|
assert_eq!(claimed.len(), 16, "All partitions should be claimed");
|
|
|
|
// Should return None after all partitions claimed
|
|
assert_eq!(state.get_next_partition(), None);
|
|
|
|
// Verify no duplicates
|
|
let mut sorted = claimed.clone();
|
|
sorted.sort();
|
|
sorted.dedup();
|
|
assert_eq!(sorted.len(), claimed.len(), "No duplicate partitions");
|
|
}
|
|
|
|
#[test]
|
|
fn test_parallel_result_merging() {
|
|
// Create results from 3 workers
|
|
let worker1 = vec![
|
|
(0.1, ItemPointer::new(1, 1)),
|
|
(0.4, ItemPointer::new(1, 4)),
|
|
(0.7, ItemPointer::new(1, 7)),
|
|
];
|
|
|
|
let worker2 = vec![
|
|
(0.2, ItemPointer::new(2, 2)),
|
|
(0.5, ItemPointer::new(2, 5)),
|
|
(0.8, ItemPointer::new(2, 8)),
|
|
];
|
|
|
|
let worker3 = vec![
|
|
(0.3, ItemPointer::new(3, 3)),
|
|
(0.6, ItemPointer::new(3, 6)),
|
|
(0.9, ItemPointer::new(3, 9)),
|
|
];
|
|
|
|
// Merge top 5 results
|
|
let merged = merge_knn_results(&[worker1, worker2, worker3], 5);
|
|
|
|
assert_eq!(merged.len(), 5, "Should return exactly k results");
|
|
|
|
// Verify sorted order
|
|
for i in 1..merged.len() {
|
|
assert!(
|
|
merged[i - 1].0 <= merged[i].0,
|
|
"Results should be sorted by distance"
|
|
);
|
|
}
|
|
|
|
// Verify we got the actual top 5
|
|
assert_eq!(merged[0].0, 0.1);
|
|
assert_eq!(merged[1].0, 0.2);
|
|
assert_eq!(merged[2].0, 0.3);
|
|
assert_eq!(merged[3].0, 0.4);
|
|
assert_eq!(merged[4].0, 0.5);
|
|
}
|
|
|
|
#[test]
|
|
fn test_tournament_merge() {
|
|
// Test tournament tree merge with sorted inputs
|
|
let worker1 = vec![
|
|
(0.1, ItemPointer::new(1, 1)),
|
|
(0.5, ItemPointer::new(1, 5)),
|
|
(0.9, ItemPointer::new(1, 9)),
|
|
];
|
|
|
|
let worker2 = vec![
|
|
(0.2, ItemPointer::new(2, 2)),
|
|
(0.6, ItemPointer::new(2, 6)),
|
|
];
|
|
|
|
let worker3 = vec![
|
|
(0.3, ItemPointer::new(3, 3)),
|
|
(0.4, ItemPointer::new(3, 4)),
|
|
(0.7, ItemPointer::new(3, 7)),
|
|
];
|
|
|
|
let merged = merge_knn_results_tournament(&[worker1, worker2, worker3], 6);
|
|
|
|
assert_eq!(merged.len(), 6);
|
|
|
|
// Verify sorted order
|
|
let distances: Vec<f32> = merged.iter().map(|(d, _)| *d).collect();
|
|
assert_eq!(distances, vec![0.1, 0.2, 0.3, 0.4, 0.5, 0.6]);
|
|
}
|
|
|
|
#[test]
|
|
fn test_parallel_coordinator() {
|
|
// Create a small HNSW index for testing
|
|
let config = HnswConfig {
|
|
m: 8,
|
|
m0: 16,
|
|
ef_construction: 32,
|
|
ef_search: 20,
|
|
max_elements: 1000,
|
|
metric: DistanceMetric::Euclidean,
|
|
seed: 42,
|
|
};
|
|
|
|
let index = HnswIndex::new(3, config);
|
|
|
|
// Insert some test vectors
|
|
for i in 0..100 {
|
|
let vector = vec![
|
|
(i as f32) * 0.1,
|
|
(i as f32) * 0.2,
|
|
(i as f32) * 0.3,
|
|
];
|
|
index.insert(vector);
|
|
}
|
|
|
|
// Create parallel coordinator
|
|
let mut coordinator = ParallelScanCoordinator::new(
|
|
2, // 2 workers
|
|
4, // 4 partitions
|
|
3, // 3 dimensions
|
|
10, // k=10
|
|
20, // ef_search=20
|
|
DistanceMetric::Euclidean,
|
|
);
|
|
|
|
// Execute parallel scan
|
|
let query = vec![0.5, 0.5, 0.5];
|
|
let results = coordinator.execute_parallel_scan(&index, query);
|
|
|
|
// Verify results
|
|
assert!(results.len() <= 10, "Should return at most k results");
|
|
|
|
// Check that results are sorted
|
|
for i in 1..results.len() {
|
|
assert!(
|
|
results[i - 1].0 <= results[i].0,
|
|
"Results should be sorted by distance"
|
|
);
|
|
}
|
|
|
|
// Get statistics
|
|
let stats = coordinator.get_stats();
|
|
assert_eq!(stats.num_workers, 2);
|
|
assert_eq!(stats.total_partitions, 4);
|
|
assert_eq!(stats.completed_workers, 2);
|
|
}
|
|
|
|
#[test]
|
|
fn test_item_pointer_mapping() {
|
|
// Test node ID to ItemPointer mapping
|
|
let ip1 = create_item_pointer(0);
|
|
assert_eq!(ip1.block_number, 0);
|
|
assert_eq!(ip1.offset_number, 1);
|
|
|
|
let ip2 = create_item_pointer(100);
|
|
assert_eq!(ip2.block_number, 0);
|
|
assert_eq!(ip2.offset_number, 101);
|
|
|
|
// Test block boundary (8191 tuples per page)
|
|
let ip3 = create_item_pointer(8191);
|
|
assert_eq!(ip3.block_number, 1);
|
|
assert_eq!(ip3.offset_number, 1);
|
|
|
|
let ip4 = create_item_pointer(16382);
|
|
assert_eq!(ip4.block_number, 2);
|
|
assert_eq!(ip4.offset_number, 1);
|
|
}
|
|
|
|
#[test]
|
|
fn test_empty_worker_results() {
|
|
// Test merging when some workers have no results
|
|
let worker1 = vec![(0.1, ItemPointer::new(1, 1))];
|
|
let worker2 = vec![];
|
|
let worker3 = vec![(0.2, ItemPointer::new(3, 2))];
|
|
|
|
let merged = merge_knn_results(&[worker1, worker2, worker3], 5);
|
|
|
|
assert_eq!(merged.len(), 2);
|
|
assert_eq!(merged[0].0, 0.1);
|
|
assert_eq!(merged[1].0, 0.2);
|
|
}
|
|
|
|
#[test]
|
|
fn test_merge_with_duplicates() {
|
|
// Test that merging handles duplicate ItemPointers correctly
|
|
let worker1 = vec![
|
|
(0.1, ItemPointer::new(1, 1)),
|
|
(0.3, ItemPointer::new(1, 3)),
|
|
];
|
|
|
|
let worker2 = vec![
|
|
(0.1, ItemPointer::new(1, 1)), // Duplicate
|
|
(0.2, ItemPointer::new(2, 2)),
|
|
];
|
|
|
|
let merged = merge_knn_results(&[worker1, worker2], 3);
|
|
|
|
// Should include both instances (heap-based merge doesn't deduplicate)
|
|
assert!(merged.len() >= 3);
|
|
}
|
|
|
|
#[test]
|
|
fn test_large_k_merge() {
|
|
// Test merging with k larger than available results
|
|
let worker1 = vec![
|
|
(0.1, ItemPointer::new(1, 1)),
|
|
(0.2, ItemPointer::new(1, 2)),
|
|
];
|
|
|
|
let worker2 = vec![
|
|
(0.3, ItemPointer::new(2, 3)),
|
|
];
|
|
|
|
let merged = merge_knn_results(&[worker1, worker2], 100);
|
|
|
|
// Should return all available results
|
|
assert_eq!(merged.len(), 3);
|
|
}
|
|
|
|
#[test]
|
|
fn test_parallel_scan_descriptor() {
|
|
use std::sync::Arc;
|
|
use parking_lot::RwLock;
|
|
|
|
let shared_state = Arc::new(RwLock::new(RuHnswSharedState::new(
|
|
2, 4, 128, 10, 40,
|
|
DistanceMetric::Euclidean,
|
|
)));
|
|
|
|
let query = vec![0.5; 128];
|
|
let desc = RuHnswParallelScanDesc::new(shared_state, 0, query.clone());
|
|
|
|
assert_eq!(desc.worker_id, 0);
|
|
assert_eq!(desc.query, query);
|
|
assert_eq!(desc.local_results.len(), 0);
|
|
}
|
|
|
|
#[test]
|
|
fn test_metrics_in_parallel_state() {
|
|
let state = RuHnswSharedState::new(
|
|
3, 9, 256, 50, 100,
|
|
DistanceMetric::Cosine,
|
|
);
|
|
|
|
assert_eq!(state.num_workers, 3);
|
|
assert_eq!(state.total_partitions, 9);
|
|
assert_eq!(state.dimensions, 256);
|
|
assert_eq!(state.k, 50);
|
|
assert_eq!(state.ef_search, 100);
|
|
assert_eq!(state.metric, DistanceMetric::Cosine);
|
|
|
|
// Test completion tracking
|
|
assert_eq!(state.completed_workers.load(std::sync::atomic::Ordering::SeqCst), 0);
|
|
assert!(!state.all_completed());
|
|
|
|
state.mark_completed();
|
|
state.mark_completed();
|
|
assert!(!state.all_completed());
|
|
|
|
state.mark_completed();
|
|
assert!(state.all_completed());
|
|
}
|
|
}
|