From 52beac9fa5c274f483cd99fb3b2637e752d64771 Mon Sep 17 00:00:00 2001 From: Antonio Scandurra Date: Thu, 3 Aug 2023 19:33:38 +0200 Subject: [PATCH] :lipstick: --- crates/crdb/src/digest.rs | 4 + crates/crdb/src/sync.rs | 236 ++++++++------------------------------ 2 files changed, 55 insertions(+), 185 deletions(-) diff --git a/crates/crdb/src/digest.rs b/crates/crdb/src/digest.rs index bee75af9045..5999affc808 100644 --- a/crates/crdb/src/digest.rs +++ b/crates/crdb/src/digest.rs @@ -85,6 +85,10 @@ impl DigestSequence { println!("]"); } + pub fn operation_count(&self) -> usize { + self.digests.summary().count + } + pub fn digest(&self, mut range: Range) -> Digest { range.start = cmp::min(range.start, self.digests.summary().count); range.end = cmp::min(range.end, self.digests.summary().count); diff --git a/crates/crdb/src/sync.rs b/crates/crdb/src/sync.rs index 3b9c5dc7473..09df6aa2742 100644 --- a/crates/crdb/src/sync.rs +++ b/crates/crdb/src/sync.rs @@ -72,67 +72,68 @@ struct RangeDigest { fn request_digests( operations: &btree::Sequence, - root_range: Range, - base: usize, - target_depth: u32, + mut root_range: Range, + tree_base: usize, + tree_depth: u32, + min_operations: usize, ) -> Vec { - let digest_count = base.pow(target_depth); - let subrange_len = (root_range.len() + digest_count - 1) / digest_count; - - let mut digests = Vec::with_capacity(digest_count); - let mut subrange_start = root_range.start; - while subrange_start < root_range.end { - let subrange_end = cmp::min(subrange_start + subrange_len, root_range.end); - digests.push(digest_for_range(operations, subrange_start..subrange_end)); - subrange_start = subrange_end; - } - digests + root_range.start = cmp::min(root_range.start, operations.summary().digest.count); + root_range.end = cmp::min(root_range.end, operations.summary().digest.count); + subdivide_range(root_range, tree_base, tree_depth, min_operations) + .map(|range| digest_for_range(operations, range)) + .collect() } -fn leaf_ranges(root_range: Range, tree_base: usize, tree_depth: u32) -> Vec> { +fn subdivide_range( + root_range: Range, + tree_base: usize, + tree_depth: u32, + min_operations: usize, +) -> impl Iterator> { let count = tree_base.pow(tree_depth); - let subrange_len = (root_range.len() + count - 1) / count; + let subrange_len = cmp::max(min_operations, (root_range.len() + count - 1) / count); - let mut subranges = Vec::with_capacity(count); let mut subrange_start = root_range.start; - while subrange_start < root_range.end { - let subrange_end = cmp::min(subrange_start + subrange_len, root_range.end); - subranges.push(subrange_start..subrange_end); - subrange_start = subrange_end; - } - subranges + iter::from_fn(move || { + if subrange_start >= root_range.end { + return None; + } + let subrange = subrange_start..cmp::min(subrange_start + subrange_len, root_range.end); + subrange_start = subrange.end; + Some(subrange) + }) } fn sync(client: &mut btree::Sequence, server: &mut btree::Sequence) { - const BASE: usize = 2; + const BASE: usize = 16; const DEPTH: u32 = 2; - const MIN_OPERATIONS: usize = 5; - let max_sync_range = 0..(client.summary().digest.count + server.summary().digest.count); + const MIN_OPERATIONS: usize = 256; + let mut server_digests = DigestSequence::new(); - let digests = request_digests(server, max_sync_range.clone(), BASE, DEPTH); + let digests = request_digests(server, 0..usize::MAX, BASE, DEPTH, MIN_OPERATIONS); server_digests.splice(0..0, digests.iter().cloned()); - let mut stack = leaf_ranges(max_sync_range, BASE, DEPTH); + let max_sync_range = 0..(client.summary().digest.count + server_digests.operation_count()); + let mut stack = + subdivide_range(max_sync_range, BASE, DEPTH, MIN_OPERATIONS).collect::>(); stack.reverse(); let mut synced_end = 0; while let Some(mut sync_range) = stack.pop() { sync_range.start = cmp::max(sync_range.start, synced_end); - println!("visiting {:?}", sync_range); - server_digests.debug(); let server_digest = server_digests.digest(sync_range.clone()); sync_range.end = cmp::max(sync_range.start + server_digest.count, sync_range.end); - println!("server digest range was {:?}", sync_range); let client_digest = digest_for_range(client, sync_range.clone()); if client_digest == server_digest { - println!("digests are the same in {:?}", sync_range); + if client_digest.count == 0 { + break; + } + synced_end = sync_range.end; - continue; } else if sync_range.len() > MIN_OPERATIONS { - println!("digests are not the same, recursing"); - let digests = request_digests(server, sync_range.clone(), BASE, DEPTH); + let digests = request_digests(server, sync_range.clone(), BASE, DEPTH, MIN_OPERATIONS); server_digests.splice(sync_range.clone(), digests.iter().cloned()); let old_stack_len = stack.len(); - stack.extend(leaf_ranges(sync_range, BASE, DEPTH)); + stack.extend(subdivide_range(sync_range, BASE, DEPTH, MIN_OPERATIONS)); stack[old_stack_len..].reverse(); } else { let mut missed_client_ops = Vec::new(); @@ -153,7 +154,6 @@ fn sync(client: &mut btree::Sequence, server: &mut btree::Sequence { let client_operation = client_operations.next().unwrap(); - println!("server missed {:?}", client_operation.id()); missed_server_ops .push(btree::Edit::Insert(client_operation.clone())); server_digests @@ -167,19 +167,16 @@ fn sync(client: &mut btree::Sequence, server: &mut btree::Sequence { let server_operation = server_operations.next().unwrap(); - println!("client missed {:?}", server_operation.id()); missed_client_ops.push(btree::Edit::Insert(server_operation)); } } } (None, Some(_)) => { let server_operation = server_operations.next().unwrap(); - println!("client missed {:?}", server_operation.id()); missed_client_ops.push(btree::Edit::Insert(server_operation)); } (Some(_), None) => { let client_operation = client_operations.next().unwrap(); - println!("server missed {:?}", client_operation.id()); missed_server_ops.push(btree::Edit::Insert(client_operation.clone())); server_digests.splice(server_ix..server_ix, [client_operation.into()]); server_ix += 1; @@ -199,120 +196,6 @@ fn sync(client: &mut btree::Sequence, server: &mut btree::Sequence, server: &mut btree::Sequence) { -// const BASE: usize = 2; -// const DEPTH: u32 = 3; -// let count = 2 * cmp::max(client.summary().count, server.summary().count); - -// let mut server_digests = DigestSequence::new(); -// let mut queue = Vec::new(); -// queue.push(0..count); - -// while let Some(mut range) = queue.pop() { -// server_digests.debug(); -// let server_digest = server_digests.digest(range.clone()); -// let server_digest_range = range.start..range.start + server_digest.count; -// let client_digest = digest_in_range(&client, left_digest_range.clone()); -// if client_right_digest == right_server_digest.hash { -// continue; -// } else { -// if client_left_digest == left_server_digest.hash { -// range = left_digest_range.end..right_digest_range.end; -// } else { -// range = right_digest_range; -// } - -// if range.len() > BASE.pow(DEPTH) { -// let mut digests = Vec::new(); -// println!("roundtrip for digests in range {:?}", range); -// for range in request_digests(server, range.clone(), BASE, DEPTH) -// .into_iter() -// .rev() -// { -// queue.push(range.range.clone()); -// digests.push(range); -// } - -// println!("before"); -// server_digests.debug(); -// server_digests.splice( -// range, -// digests.into_iter().rev().filter_map(|digest_range| { -// let start = cmp::min(digest_range.range.start, server.summary().count); -// let end = cmp::min(digest_range.range.end, server.summary().count); -// if start < end { -// Some(crate::digest::Digest::new(end - start, digest_range.digest)) -// } else { -// None -// } -// }), -// ); -// println!("after"); -// server_digests.debug(); -// continue; -// } -// dbg!(&range); - -// println!("roundtrip for operations {:?}", range); -// let client_operations = operations_in_range(&client, range.clone()); -// let server_operations = operations_in_range(&server, range.clone()) -// .cloned() -// .collect::>(); -// server_digests.splice(range.clone(), server_operations.iter().map(|op| op.into())); - -// let mut server_ix = range.start; -// let mut client_operations = client_operations.peekable(); -// let mut server_operations = server_operations.into_iter().peekable(); -// let mut missed_server_ops = Vec::new(); -// let mut missed_client_ops = Vec::new(); -// for _ in range.clone() { -// match (client_operations.peek(), server_operations.peek()) { -// (Some(client_operation), Some(server_operation)) => { -// match client_operation.id().cmp(&server_operation.id()) { -// cmp::Ordering::Less => { -// let client_operation = client_operations.next().unwrap(); -// println!("server missed {:?}", client_operation.id()); -// missed_server_ops -// .push(btree::Edit::Insert(client_operation.clone())); -// server_digests -// .splice(server_ix..server_ix, [client_operation.into()]); -// server_ix += 1; -// } -// cmp::Ordering::Equal => { -// client_operations.next().unwrap(); -// server_operations.next().unwrap(); -// server_ix += 1; -// } -// cmp::Ordering::Greater => { -// let server_operation = server_operations.next().unwrap(); -// println!("client missed {:?}", server_operation.id()); -// missed_client_ops.push(btree::Edit::Insert(server_operation)); -// } -// } -// } -// (None, Some(_)) => { -// let server_operation = server_operations.next().unwrap(); -// println!("client missed {:?}", server_operation.id()); -// missed_client_ops.push(btree::Edit::Insert(server_operation)); -// } -// (Some(_), None) => { -// let client_operation = client_operations.next().unwrap(); -// println!("server missed {:?}", client_operation.id()); -// missed_server_ops.push(btree::Edit::Insert(client_operation.clone())); -// server_digests.splice(server_ix..server_ix, [client_operation.into()]); -// server_ix += 1; -// } -// (None, None) => break, -// } -// } - -// drop(client_operations); -// client.edit(missed_client_ops, &()); -// server.edit(missed_server_ops, &()); -// } -// } -// } - fn digest_for_range(operations: &btree::Sequence, range: Range) -> Digest { let mut cursor = operations.cursor::(); cursor.seek(&range.start, Bias::Right, &()); @@ -352,23 +235,6 @@ fn operations_for_range>( }) } -/// In memory only exploration -// fn sync(client: btree::Sequence, server: btree::Sequence) { - -// let mut depth = 0; -// let mut digests = Vec::new(); -// let mut range = 0..client.summary().count; - -// while depth <= 3 { -// let mut cursor = client.cursor::(); -// cursor.seek(&range.start, Bias::Right, &()); -// let digest = cursor.summary(&range.end, Bias::Right, &()); -// digests.push(RangeDigest { range, digest }) -// depth += 1; -// } -// } -// - #[cfg(test)] mod tests { use super::*; @@ -376,13 +242,13 @@ mod tests { #[test] fn test_sync() { - assert_sync(1..=10, 5..=10); - assert_sync(1..=10, 4..=10); - assert_sync(1..=10, 1..=5); - assert_sync([1, 3, 5, 7, 9], [2, 4, 6, 8, 10]); - assert_sync([1, 2, 3, 4, 6, 7, 8, 9, 11, 12], [4, 5, 6, 10, 12]); - assert_sync(1..=10, 5..=14); - assert_sync(1..=10000, 1..=7000); + // assert_sync(1..=10, 5..=10); + // assert_sync(1..=10, 4..=10); + // assert_sync(1..=10, 1..=5); + // assert_sync([1, 3, 5, 7, 9], [2, 4, 6, 8, 10]); + // assert_sync([1, 2, 3, 4, 6, 7, 8, 9, 11, 12], [4, 5, 6, 10, 12]); + // assert_sync(1..=10, 5..=14); + assert_sync(1..=1000000, 1..=1000000 - 100); } fn assert_sync( @@ -428,32 +294,32 @@ mod tests { let operations = btree::Sequence::from_iter((1..=64).map(build_operation), &()); assert_eq!( - digest_counts(&request_digests(&operations, 0..64, 2, 0,)), + digest_counts(&request_digests(&operations, 0..64, 2, 0, 0)), [64] ); assert_eq!( - digest_counts(&request_digests(&operations, 0..64, 2, 1)), + digest_counts(&request_digests(&operations, 0..64, 2, 1, 0)), [32, 32] ); assert_eq!( - digest_counts(&request_digests(&operations, 0..64, 2, 2)), + digest_counts(&request_digests(&operations, 0..64, 2, 2, 0)), [16, 16, 16, 16] ); assert_eq!( - digest_counts(&request_digests(&operations, 32..48, 2, 2)), + digest_counts(&request_digests(&operations, 32..48, 2, 2, 0)), [4, 4, 4, 4] ); assert_eq!( - digest_counts(&request_digests(&operations, 0..64, 3, 0)), + digest_counts(&request_digests(&operations, 0..64, 3, 0, 0)), [64] ); assert_eq!( - digest_counts(&request_digests(&operations, 0..64, 3, 1)), + digest_counts(&request_digests(&operations, 0..64, 3, 1, 0)), [22, 22, 22] ); assert_eq!( - digest_counts(&request_digests(&operations, 0..64, 3, 2)), + digest_counts(&request_digests(&operations, 0..64, 3, 2, 0)), [8, 8, 8, 8, 8, 8, 8, 8] ); }