ruvector/crates/ruvector-graph/src/executor/pipeline.rs
rUv eef6778839 fix: Resolve CI build failures
- Format all Rust code with cargo fmt
- Generate Cargo.lock for security audit
- Add build:wasm script to graph-wasm package.json
- Update npm/package-lock.json

The CI was failing due to:
1. Rust code formatting check failures
2. Missing Cargo.lock file for cargo audit
3. Missing build:wasm script expected by graph-ci.yml workflow

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-26 15:25:47 +00:00

336 lines
8.3 KiB
Rust

//! Pipeline execution model with Volcano-style iterators
//!
//! Implements pull-based query execution with row batching
use crate::executor::operators::Operator;
use crate::executor::plan::Value;
use crate::executor::plan::{PhysicalPlan, QuerySchema};
use crate::executor::{ExecutionError, Result};
use std::collections::HashMap;
/// Batch size for vectorized execution
const DEFAULT_BATCH_SIZE: usize = 1024;
/// Row batch for vectorized processing
#[derive(Debug, Clone)]
pub struct RowBatch {
pub rows: Vec<HashMap<String, Value>>,
pub schema: QuerySchema,
}
impl RowBatch {
/// Create a new row batch
pub fn new(schema: QuerySchema) -> Self {
Self {
rows: Vec::with_capacity(DEFAULT_BATCH_SIZE),
schema,
}
}
/// Create batch with rows
pub fn with_rows(rows: Vec<HashMap<String, Value>>, schema: QuerySchema) -> Self {
Self { rows, schema }
}
/// Add a row to the batch
pub fn add_row(&mut self, row: HashMap<String, Value>) {
self.rows.push(row);
}
/// Check if batch is full
pub fn is_full(&self) -> bool {
self.rows.len() >= DEFAULT_BATCH_SIZE
}
/// Get number of rows
pub fn len(&self) -> usize {
self.rows.len()
}
/// Check if batch is empty
pub fn is_empty(&self) -> bool {
self.rows.is_empty()
}
/// Clear the batch
pub fn clear(&mut self) {
self.rows.clear();
}
/// Merge another batch into this one
pub fn merge(&mut self, other: RowBatch) {
self.rows.extend(other.rows);
}
}
/// Execution context for query pipeline
pub struct ExecutionContext {
/// Memory limit for execution
pub memory_limit: usize,
/// Current memory usage
pub memory_used: usize,
/// Batch size
pub batch_size: usize,
/// Enable query profiling
pub enable_profiling: bool,
}
impl ExecutionContext {
/// Create new execution context
pub fn new() -> Self {
Self {
memory_limit: 1024 * 1024 * 1024, // 1GB default
memory_used: 0,
batch_size: DEFAULT_BATCH_SIZE,
enable_profiling: false,
}
}
/// Create with custom memory limit
pub fn with_memory_limit(memory_limit: usize) -> Self {
Self {
memory_limit,
memory_used: 0,
batch_size: DEFAULT_BATCH_SIZE,
enable_profiling: false,
}
}
/// Check if memory limit exceeded
pub fn check_memory(&self) -> Result<()> {
if self.memory_used > self.memory_limit {
Err(ExecutionError::ResourceExhausted(format!(
"Memory limit exceeded: {} > {}",
self.memory_used, self.memory_limit
)))
} else {
Ok(())
}
}
/// Allocate memory
pub fn allocate(&mut self, bytes: usize) -> Result<()> {
self.memory_used += bytes;
self.check_memory()
}
/// Free memory
pub fn free(&mut self, bytes: usize) {
self.memory_used = self.memory_used.saturating_sub(bytes);
}
}
impl Default for ExecutionContext {
fn default() -> Self {
Self::new()
}
}
/// Pipeline executor using Volcano iterator model
pub struct Pipeline {
plan: PhysicalPlan,
operators: Vec<Box<dyn Operator>>,
current_operator: usize,
context: ExecutionContext,
finished: bool,
}
impl Pipeline {
/// Create a new pipeline from physical plan (takes ownership of operators)
pub fn new(mut plan: PhysicalPlan) -> Self {
let operators = std::mem::take(&mut plan.operators);
Self {
operators,
plan,
current_operator: 0,
context: ExecutionContext::new(),
finished: false,
}
}
/// Create pipeline with custom context (takes ownership of operators)
pub fn with_context(mut plan: PhysicalPlan, context: ExecutionContext) -> Self {
let operators = std::mem::take(&mut plan.operators);
Self {
operators,
plan,
current_operator: 0,
context,
finished: false,
}
}
/// Get next batch from pipeline
pub fn next(&mut self) -> Result<Option<RowBatch>> {
if self.finished {
return Ok(None);
}
// Execute pipeline in pull-based fashion
let result = self.execute_pipeline()?;
if result.is_none() {
self.finished = true;
}
Ok(result)
}
/// Execute the full pipeline
fn execute_pipeline(&mut self) -> Result<Option<RowBatch>> {
if self.operators.is_empty() {
return Ok(None);
}
// Start with the first operator (scan)
let mut current_batch = self.operators[0].execute(None)?;
// Pipeline the batch through remaining operators
for operator in &mut self.operators[1..] {
if let Some(batch) = current_batch {
current_batch = operator.execute(Some(batch))?;
} else {
return Ok(None);
}
}
Ok(current_batch)
}
/// Reset pipeline for re-execution
pub fn reset(&mut self) {
self.current_operator = 0;
self.finished = false;
self.context = ExecutionContext::new();
}
/// Get execution context
pub fn context(&self) -> &ExecutionContext {
&self.context
}
/// Get mutable execution context
pub fn context_mut(&mut self) -> &mut ExecutionContext {
&mut self.context
}
}
/// Pipeline builder for constructing execution pipelines
pub struct PipelineBuilder {
operators: Vec<Box<dyn Operator>>,
context: ExecutionContext,
}
impl PipelineBuilder {
/// Create a new pipeline builder
pub fn new() -> Self {
Self {
operators: Vec::new(),
context: ExecutionContext::new(),
}
}
/// Add an operator to the pipeline
pub fn add_operator(mut self, operator: Box<dyn Operator>) -> Self {
self.operators.push(operator);
self
}
/// Set execution context
pub fn with_context(mut self, context: ExecutionContext) -> Self {
self.context = context;
self
}
/// Build the pipeline
pub fn build(self) -> Pipeline {
let plan = PhysicalPlan {
operators: self.operators,
pipeline_breakers: Vec::new(),
parallelism: 1,
};
Pipeline::with_context(plan, self.context)
}
}
impl Default for PipelineBuilder {
fn default() -> Self {
Self::new()
}
}
/// Iterator adapter for pipeline
pub struct PipelineIterator {
pipeline: Pipeline,
}
impl PipelineIterator {
pub fn new(pipeline: Pipeline) -> Self {
Self { pipeline }
}
}
impl Iterator for PipelineIterator {
type Item = Result<RowBatch>;
fn next(&mut self) -> Option<Self::Item> {
match self.pipeline.next() {
Ok(Some(batch)) => Some(Ok(batch)),
Ok(None) => None,
Err(e) => Some(Err(e)),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::executor::plan::ColumnDef;
use crate::executor::plan::DataType;
#[test]
fn test_row_batch() {
let schema = QuerySchema::new(vec![ColumnDef {
name: "id".to_string(),
data_type: DataType::Int64,
nullable: false,
}]);
let mut batch = RowBatch::new(schema);
assert!(batch.is_empty());
let mut row = HashMap::new();
row.insert("id".to_string(), Value::Int64(1));
batch.add_row(row);
assert_eq!(batch.len(), 1);
assert!(!batch.is_empty());
}
#[test]
fn test_execution_context() {
let mut ctx = ExecutionContext::new();
assert_eq!(ctx.memory_used, 0);
ctx.allocate(1024).unwrap();
assert_eq!(ctx.memory_used, 1024);
ctx.free(512);
assert_eq!(ctx.memory_used, 512);
}
#[test]
fn test_memory_limit() {
let mut ctx = ExecutionContext::with_memory_limit(1000);
assert!(ctx.allocate(500).is_ok());
assert!(ctx.allocate(600).is_err());
}
#[test]
fn test_pipeline_builder() {
let builder = PipelineBuilder::new();
let pipeline = builder.build();
assert_eq!(pipeline.operators.len(), 0);
}
}