mirror of
https://github.com/ruvnet/RuVector.git
synced 2026-05-24 13:54:31 +00:00
- Remove invalid feature flags (hybrid-search, filtered-search) that don't exist - Replace with valid all-features flag for comprehensive testing - Add PostgreSQL apt repository for older versions on Ubuntu 24.04 - Apply cargo fmt formatting to all crates This fixes CI failures caused by: - Feature flags that were planned but not implemented - PostgreSQL 14 packages not available on Ubuntu 24.04 default repos 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
386 lines
12 KiB
Rust
386 lines
12 KiB
Rust
//! Stress tests for concurrent operations and memory pressure
|
|
//!
|
|
//! These tests verify that the extension handles:
|
|
//! - Concurrent insertions and queries
|
|
//! - High memory pressure
|
|
//! - Large batches of operations
|
|
//! - Thread safety and race conditions
|
|
|
|
use ruvector_postgres::types::RuVector;
|
|
use std::sync::{Arc, Barrier};
|
|
use std::thread;
|
|
|
|
#[cfg(test)]
|
|
mod stress_tests {
|
|
use super::*;
|
|
|
|
// ========================================================================
|
|
// Concurrent Operations Tests
|
|
// ========================================================================
|
|
|
|
#[test]
|
|
fn test_concurrent_vector_creation() {
|
|
let num_threads = 8;
|
|
let vectors_per_thread = 100;
|
|
let barrier = Arc::new(Barrier::new(num_threads));
|
|
|
|
let handles: Vec<_> = (0..num_threads)
|
|
.map(|thread_id| {
|
|
let barrier = Arc::clone(&barrier);
|
|
|
|
thread::spawn(move || {
|
|
barrier.wait();
|
|
|
|
for i in 0..vectors_per_thread {
|
|
let data: Vec<f32> = (0..128)
|
|
.map(|j| ((thread_id * 1000 + i * 10 + j) as f32) * 0.01)
|
|
.collect();
|
|
|
|
let v = RuVector::from_slice(&data);
|
|
assert_eq!(v.dimensions(), 128);
|
|
assert_eq!(v.as_slice().len(), 128);
|
|
}
|
|
})
|
|
})
|
|
.collect();
|
|
|
|
for handle in handles {
|
|
handle.join().expect("Thread panicked");
|
|
}
|
|
}
|
|
|
|
#[test]
|
|
fn test_concurrent_distance_calculations() {
|
|
let num_threads = 16;
|
|
let calculations_per_thread = 1000;
|
|
|
|
// Prepare shared test vectors
|
|
let v1 = Arc::new(RuVector::from_slice(&[1.0, 2.0, 3.0, 4.0, 5.0]));
|
|
let v2 = Arc::new(RuVector::from_slice(&[5.0, 4.0, 3.0, 2.0, 1.0]));
|
|
|
|
let handles: Vec<_> = (0..num_threads)
|
|
.map(|_| {
|
|
let v1 = Arc::clone(&v1);
|
|
let v2 = Arc::clone(&v2);
|
|
|
|
thread::spawn(move || {
|
|
for _ in 0..calculations_per_thread {
|
|
let norm1 = v1.norm();
|
|
let norm2 = v2.norm();
|
|
let dot = v1.dot(&*v2);
|
|
|
|
assert!(norm1.is_finite());
|
|
assert!(norm2.is_finite());
|
|
assert!(dot.is_finite());
|
|
}
|
|
})
|
|
})
|
|
.collect();
|
|
|
|
for handle in handles {
|
|
handle.join().expect("Thread panicked");
|
|
}
|
|
}
|
|
|
|
#[test]
|
|
fn test_concurrent_normalization() {
|
|
let num_threads = 8;
|
|
let operations_per_thread = 500;
|
|
|
|
let handles: Vec<_> = (0..num_threads)
|
|
.map(|thread_id| {
|
|
thread::spawn(move || {
|
|
for i in 0..operations_per_thread {
|
|
let data: Vec<f32> = (0..64)
|
|
.map(|j| ((thread_id * 100 + i + j) as f32) * 0.1)
|
|
.collect();
|
|
|
|
let v = RuVector::from_slice(&data);
|
|
let normalized = v.normalize();
|
|
|
|
let norm = normalized.norm();
|
|
if !data.iter().all(|&x| x == 0.0) {
|
|
assert!(
|
|
(norm - 1.0).abs() < 1e-5,
|
|
"Normalized vector should have unit norm"
|
|
);
|
|
}
|
|
}
|
|
})
|
|
})
|
|
.collect();
|
|
|
|
for handle in handles {
|
|
handle.join().expect("Thread panicked");
|
|
}
|
|
}
|
|
|
|
// ========================================================================
|
|
// Memory Pressure Tests
|
|
// ========================================================================
|
|
|
|
#[test]
|
|
fn test_large_batch_allocation() {
|
|
let num_vectors = 10_000;
|
|
let dimensions = 128;
|
|
|
|
let mut vectors = Vec::with_capacity(num_vectors);
|
|
|
|
for i in 0..num_vectors {
|
|
let data: Vec<f32> = (0..dimensions)
|
|
.map(|j| ((i * dimensions + j) as f32) * 0.001)
|
|
.collect();
|
|
|
|
vectors.push(RuVector::from_slice(&data));
|
|
}
|
|
|
|
// Verify all vectors are intact
|
|
for (i, v) in vectors.iter().enumerate() {
|
|
assert_eq!(v.dimensions(), dimensions);
|
|
assert!(v.as_slice()[0] == (i * dimensions) as f32 * 0.001 || v.as_slice()[0] == 0.0);
|
|
}
|
|
}
|
|
|
|
#[test]
|
|
fn test_large_vector_dimensions() {
|
|
// Test with maximum supported dimensions
|
|
let max_dims = 10_000;
|
|
|
|
let data: Vec<f32> = (0..max_dims).map(|i| (i as f32) * 0.0001).collect();
|
|
|
|
let v = RuVector::from_slice(&data);
|
|
assert_eq!(v.dimensions(), max_dims);
|
|
|
|
let norm = v.norm();
|
|
assert!(norm.is_finite() && norm > 0.0);
|
|
}
|
|
|
|
#[test]
|
|
fn test_memory_reuse_pattern() {
|
|
// Simulate a pattern of allocation and deallocation
|
|
let iterations = 1000;
|
|
let dimensions = 256;
|
|
|
|
for _ in 0..iterations {
|
|
let data: Vec<f32> = (0..dimensions).map(|i| i as f32).collect();
|
|
let v = RuVector::from_slice(&data);
|
|
|
|
assert_eq!(v.dimensions(), dimensions);
|
|
|
|
// Do some operations
|
|
let _ = v.norm();
|
|
let _ = v.normalize();
|
|
|
|
// Vector drops here, memory should be freed
|
|
}
|
|
}
|
|
|
|
#[test]
|
|
fn test_concurrent_allocation_deallocation() {
|
|
let num_threads = 8;
|
|
let iterations_per_thread = 500;
|
|
|
|
let handles: Vec<_> = (0..num_threads)
|
|
.map(|_| {
|
|
thread::spawn(move || {
|
|
for _ in 0..iterations_per_thread {
|
|
let data: Vec<f32> = (0..128).map(|i| i as f32).collect();
|
|
let v = RuVector::from_slice(&data);
|
|
|
|
// Perform operations
|
|
let _ = v.norm();
|
|
let _ = v.add(&v);
|
|
let _ = v.normalize();
|
|
|
|
// Implicit drop here
|
|
}
|
|
})
|
|
})
|
|
.collect();
|
|
|
|
for handle in handles {
|
|
handle.join().expect("Thread panicked");
|
|
}
|
|
}
|
|
|
|
// ========================================================================
|
|
// Batch Operations Tests
|
|
// ========================================================================
|
|
|
|
#[test]
|
|
fn test_batch_distance_calculations() {
|
|
let query = RuVector::from_slice(&[1.0, 2.0, 3.0, 4.0, 5.0]);
|
|
let num_candidates = 10_000;
|
|
|
|
let candidates: Vec<_> = (0..num_candidates)
|
|
.map(|i| {
|
|
let data: Vec<f32> = (0..5).map(|j| ((i * 5 + j) as f32) * 0.01).collect();
|
|
RuVector::from_slice(&data)
|
|
})
|
|
.collect();
|
|
|
|
let distances: Vec<_> = candidates
|
|
.iter()
|
|
.map(|c| {
|
|
use ruvector_postgres::distance::euclidean_distance;
|
|
euclidean_distance(query.as_slice(), c.as_slice())
|
|
})
|
|
.collect();
|
|
|
|
assert_eq!(distances.len(), num_candidates);
|
|
assert!(distances.iter().all(|&d| d.is_finite()));
|
|
}
|
|
|
|
#[test]
|
|
fn test_batch_normalization() {
|
|
let num_vectors = 5000;
|
|
let dimensions = 64;
|
|
|
|
let vectors: Vec<_> = (0..num_vectors)
|
|
.map(|i| {
|
|
let data: Vec<f32> = (0..dimensions).map(|j| ((i + j) as f32) * 0.1).collect();
|
|
RuVector::from_slice(&data)
|
|
})
|
|
.collect();
|
|
|
|
let normalized: Vec<_> = vectors.iter().map(|v| v.normalize()).collect();
|
|
|
|
for n in &normalized {
|
|
let norm = n.norm();
|
|
assert!((norm - 1.0).abs() < 1e-4 || n.as_slice().iter().all(|&x| x == 0.0));
|
|
}
|
|
}
|
|
|
|
// ========================================================================
|
|
// Stress Tests with Random Data
|
|
// ========================================================================
|
|
|
|
#[test]
|
|
fn test_random_operations_single_threaded() {
|
|
use rand::Rng;
|
|
let mut rng = rand::thread_rng();
|
|
|
|
for _ in 0..1000 {
|
|
let dim = rng.gen_range(1..256);
|
|
let data1: Vec<f32> = (0..dim).map(|_| rng.gen_range(-100.0..100.0)).collect();
|
|
let data2: Vec<f32> = (0..dim).map(|_| rng.gen_range(-100.0..100.0)).collect();
|
|
|
|
let v1 = RuVector::from_slice(&data1);
|
|
let v2 = RuVector::from_slice(&data2);
|
|
|
|
// Random operations
|
|
let _ = v1.add(&v2);
|
|
let _ = v1.sub(&v2);
|
|
let _ = v1.dot(&v2);
|
|
let _ = v1.norm();
|
|
let _ = v1.normalize();
|
|
|
|
use ruvector_postgres::distance::{
|
|
cosine_distance, euclidean_distance, manhattan_distance,
|
|
};
|
|
|
|
let d1 = euclidean_distance(&data1, &data2);
|
|
let d2 = manhattan_distance(&data1, &data2);
|
|
|
|
assert!(d1.is_finite());
|
|
assert!(d2.is_finite());
|
|
|
|
if data1.iter().any(|&x| x != 0.0) && data2.iter().any(|&x| x != 0.0) {
|
|
let d3 = cosine_distance(&data1, &data2);
|
|
assert!(d3.is_finite());
|
|
}
|
|
}
|
|
}
|
|
|
|
#[test]
|
|
fn test_extreme_values_handling() {
|
|
// Test with very small values
|
|
let small = RuVector::from_slice(&[1e-10, 1e-10, 1e-10]);
|
|
assert!(small.norm().is_finite());
|
|
|
|
// Test with large values
|
|
let large = RuVector::from_slice(&[1e6, 1e6, 1e6]);
|
|
assert!(large.norm().is_finite());
|
|
|
|
// Test with mixed scales
|
|
let mixed = RuVector::from_slice(&[1e-10, 1.0, 1e10]);
|
|
assert!(mixed.norm().is_finite());
|
|
|
|
// Operations should not overflow/underflow
|
|
let result = small.add(&large);
|
|
assert!(result.as_slice().iter().all(|&x| x.is_finite()));
|
|
}
|
|
|
|
#[test]
|
|
fn test_alternating_pattern_stress() {
|
|
// Create a pattern that might trigger SIMD edge cases
|
|
for size in [63, 64, 65, 127, 128, 129, 255, 256, 257] {
|
|
let data: Vec<f32> = (0..size)
|
|
.map(|i| if i % 2 == 0 { 1.0 } else { -1.0 })
|
|
.collect();
|
|
|
|
let v = RuVector::from_slice(&data);
|
|
let norm = v.norm();
|
|
|
|
let expected = (size as f32).sqrt();
|
|
assert!(
|
|
(norm - expected).abs() < 0.01,
|
|
"Size {}: expected {}, got {}",
|
|
size,
|
|
expected,
|
|
norm
|
|
);
|
|
}
|
|
}
|
|
|
|
// ========================================================================
|
|
// Thread Safety Tests
|
|
// ========================================================================
|
|
|
|
#[test]
|
|
fn test_shared_vector_read_only() {
|
|
let v = Arc::new(RuVector::from_slice(&[1.0, 2.0, 3.0, 4.0, 5.0]));
|
|
let num_threads = 16;
|
|
|
|
let handles: Vec<_> = (0..num_threads)
|
|
.map(|_| {
|
|
let v = Arc::clone(&v);
|
|
|
|
thread::spawn(move || {
|
|
for _ in 0..10000 {
|
|
assert_eq!(v.dimensions(), 5);
|
|
let _ = v.norm();
|
|
let _ = v.as_slice();
|
|
}
|
|
})
|
|
})
|
|
.collect();
|
|
|
|
for handle in handles {
|
|
handle.join().expect("Thread panicked");
|
|
}
|
|
}
|
|
|
|
#[test]
|
|
fn test_varlena_roundtrip_stress() {
|
|
let iterations = 10000;
|
|
|
|
for i in 0..iterations {
|
|
let size = (i % 100) + 1;
|
|
let data: Vec<f32> = (0..size).map(|j| (i * 100 + j) as f32 * 0.01).collect();
|
|
|
|
unsafe {
|
|
let v1 = RuVector::from_slice(&data);
|
|
let varlena = v1.to_varlena();
|
|
let v2 = RuVector::from_varlena(varlena);
|
|
|
|
assert_eq!(v1.dimensions(), v2.dimensions());
|
|
for (a, b) in v1.as_slice().iter().zip(v2.as_slice()) {
|
|
assert!((a - b).abs() < 1e-6);
|
|
}
|
|
|
|
pgrx::pg_sys::pfree(varlena as *mut std::ffi::c_void);
|
|
}
|
|
}
|
|
}
|
|
}
|