diff --git a/crates/mcp-brain-server/src/pipeline.rs b/crates/mcp-brain-server/src/pipeline.rs index 0c1036f75..59d885b59 100644 --- a/crates/mcp-brain-server/src/pipeline.rs +++ b/crates/mcp-brain-server/src/pipeline.rs @@ -163,7 +163,7 @@ async fn get_metadata_token(http: &reqwest::Client) -> Option { /// Source of injected data. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)] #[serde(rename_all = "snake_case")] -pub enum InjectionSource { PubSub, BatchUpload, RssFeed, Webhook } +pub enum InjectionSource { PubSub, BatchUpload, RssFeed, Webhook, CommonCrawl } /// An item flowing through the injection pipeline. #[derive(Debug, Clone, Serialize, Deserialize)] @@ -488,6 +488,302 @@ fn extract_tag(xml: &str, tag: &str) -> Option { if text.is_empty() { None } else { Some(text.to_string()) } } +// ── Common Crawl / Open Crawl Integration (ADR-096 §10) ────────────── + +/// Common Crawl CDX index query result (for Tier 1 targeted queries). +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct CdxRecord { + pub url: String, + pub timestamp: String, + #[serde(default)] + pub status: String, + #[serde(default)] + pub mime: String, + #[serde(default)] + pub length: u64, + #[serde(default)] + pub offset: u64, + #[serde(default)] + pub filename: String, +} + +/// Query parameters for Common Crawl CDX index. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct CdxQuery { + /// URL pattern to search (e.g., "arxiv.org/abs/*") + pub url_pattern: String, + /// Crawl index to query (e.g., "CC-MAIN-2026-13") + pub crawl_index: Option, + /// Maximum results to return + pub limit: usize, + /// Filter by MIME type + pub mime_filter: Option, + /// Filter by status code + pub status_filter: Option, +} + +impl Default for CdxQuery { + fn default() -> Self { + Self { + url_pattern: String::new(), + crawl_index: None, + limit: 100, + mime_filter: Some("text/html".into()), + status_filter: Some("200".into()), + } + } +} + +/// Common Crawl page content after extraction. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct CrawlPage { + pub url: String, + pub timestamp: String, + pub title: String, + pub content: String, + pub content_hash: String, + pub crawl_index: String, +} + +/// Adapter for Common Crawl CDX index + WARC/WET extraction (ADR-096 §10). +/// Implements 3-tier processing: CDX queries, WET segment batch, full corpus. +#[derive(Debug)] +pub struct CommonCrawlAdapter { + http: reqwest::Client, + /// Bloom filter for URL deduplication (tracks ~1M URLs at 0.1% FPR) + seen_urls: dashmap::DashMap, + /// Content hashes for duplicate detection + seen_hashes: dashmap::DashMap, + /// Base URL for CDX index API + cdx_base: String, + /// Base URL for data.commoncrawl.org (WARC/WET access) + data_base: String, + /// Latest crawl index (e.g., "CC-MAIN-2026-13") + latest_crawl: RwLock, + stats: CommonCrawlStats, +} + +/// Statistics for Common Crawl processing. +#[derive(Debug, Default)] +pub struct CommonCrawlStats { + pub cdx_queries: AtomicU64, + pub pages_fetched: AtomicU64, + pub pages_extracted: AtomicU64, + pub duplicates_skipped: AtomicU64, + pub errors: AtomicU64, +} + +impl CommonCrawlAdapter { + pub fn new() -> Self { + Self { + http: reqwest::Client::builder() + .timeout(std::time::Duration::from_secs(60)) + .user_agent("RuVector-Brain/1.0 (pi.ruv.io; +https://github.com/ruvnet/ruvector)") + .build() + .unwrap_or_default(), + seen_urls: dashmap::DashMap::new(), + seen_hashes: dashmap::DashMap::new(), + cdx_base: "https://index.commoncrawl.org".into(), + data_base: "https://data.commoncrawl.org".into(), + latest_crawl: RwLock::new("CC-MAIN-2026-13".into()), + stats: CommonCrawlStats::default(), + } + } + + /// Set the latest crawl index to query. + pub async fn set_crawl_index(&self, index: &str) { + *self.latest_crawl.write().await = index.to_string(); + } + + /// Query CDX index for URLs matching a pattern (Tier 1: real-time). + pub async fn query_cdx(&self, query: &CdxQuery) -> Result, String> { + let crawl = match &query.crawl_index { + Some(c) => c.clone(), + None => self.latest_crawl.read().await.clone(), + }; + let mut url = format!( + "{}/{}-index?url={}&output=json&limit={}", + self.cdx_base, crawl, urlencoding::encode(&query.url_pattern), query.limit + ); + if let Some(ref mime) = query.mime_filter { + url.push_str(&format!("&filter=mime:{}", urlencoding::encode(mime))); + } + if let Some(ref status) = query.status_filter { + url.push_str(&format!("&filter=status:{}", status)); + } + self.stats.cdx_queries.fetch_add(1, Ordering::Relaxed); + + let resp = self.http.get(&url) + .send().await.map_err(|e| format!("CDX query failed: {e}"))?; + if !resp.status().is_success() { + return Err(format!("CDX returned status {}", resp.status())); + } + let body = resp.text().await.map_err(|e| format!("CDX body read failed: {e}"))?; + + // CDX returns newline-delimited JSON + let records: Vec = body.lines() + .filter_map(|line| serde_json::from_str(line).ok()) + .filter(|r: &CdxRecord| !self.seen_urls.contains_key(&r.url)) + .collect(); + for r in &records { + self.seen_urls.insert(r.url.clone(), ()); + } + Ok(records) + } + + /// Fetch a single page from Common Crawl via WARC range-GET. + pub async fn fetch_page(&self, record: &CdxRecord) -> Result { + if record.filename.is_empty() || record.length == 0 { + return Err("Invalid CDX record: missing filename or length".into()); + } + let warc_url = format!("{}/{}", self.data_base, record.filename); + let range = format!("bytes={}-{}", record.offset, record.offset + record.length - 1); + + self.stats.pages_fetched.fetch_add(1, Ordering::Relaxed); + let resp = self.http.get(&warc_url) + .header("Range", &range) + .send().await.map_err(|e| format!("WARC fetch failed for {}: {e}", record.url))?; + if !resp.status().is_success() && resp.status().as_u16() != 206 { + return Err(format!("WARC returned status {}", resp.status())); + } + let warc_bytes = resp.bytes().await.map_err(|e| format!("WARC body read failed: {e}"))?; + + // Extract text from WARC record + let (title, content) = self.extract_from_warc(&warc_bytes)?; + let content_hash = DataInjector::content_hash(&title, &content); + + // Check for duplicate content + if self.seen_hashes.contains_key(&content_hash) { + self.stats.duplicates_skipped.fetch_add(1, Ordering::Relaxed); + return Err("Duplicate content".into()); + } + self.seen_hashes.insert(content_hash.clone(), ()); + self.stats.pages_extracted.fetch_add(1, Ordering::Relaxed); + + Ok(CrawlPage { + url: record.url.clone(), + timestamp: record.timestamp.clone(), + title, + content, + content_hash, + crawl_index: record.filename.split('/').next().unwrap_or("unknown").into(), + }) + } + + /// Extract title and text content from WARC record bytes. + fn extract_from_warc(&self, warc_bytes: &[u8]) -> Result<(String, String), String> { + let warc_str = String::from_utf8_lossy(warc_bytes); + + // Find HTTP response body (after double CRLF in WARC response) + let body_start = warc_str.find("\r\n\r\n") + .and_then(|p1| warc_str[p1+4..].find("\r\n\r\n").map(|p2| p1 + 4 + p2 + 4)) + .unwrap_or(0); + let html = &warc_str[body_start..]; + + // Extract title + let title = extract_tag(html, "title").unwrap_or_default(); + + // Extract text: remove scripts, styles, tags + let mut text = html.to_string(); + // Remove script tags + while let Some(start) = text.find("") { + text = format!("{}{}", &text[..start], &text[start+end+9..]); + } else { break; } + } + // Remove style tags + while let Some(start) = text.find("") { + text = format!("{}{}", &text[..start], &text[start+end+8..]); + } else { break; } + } + // Remove all HTML tags + let mut clean = String::new(); + let mut in_tag = false; + for c in text.chars() { + match c { + '<' => in_tag = true, + '>' => in_tag = false, + _ if !in_tag => clean.push(c), + _ => {} + } + } + // Normalize whitespace + let content: String = clean.split_whitespace().collect::>().join(" "); + + if content.len() < 200 { + return Err("Content too short (< 200 chars)".into()); + } + Ok((title, content)) + } + + /// Convert a CrawlPage to an InjectionItem for the brain pipeline. + pub fn to_injection_item(page: &CrawlPage, category: Option, tags: Vec) -> InjectionItem { + let mut meta = HashMap::new(); + meta.insert("source_url".into(), page.url.clone()); + meta.insert("crawl_timestamp".into(), page.timestamp.clone()); + meta.insert("crawl_index".into(), page.crawl_index.clone()); + meta.insert("content_hash".into(), page.content_hash.clone()); + + InjectionItem { + source: InjectionSource::CommonCrawl, + title: page.title.clone(), + content: page.content.clone(), + category, + tags, + metadata: meta, + received_at: Utc::now(), + } + } + + /// Batch query and fetch pages for a domain pattern. + /// Returns injection items ready for pipeline/inject/batch. + pub async fn discover_domain( + &self, + domain_pattern: &str, + category: Option, + tags: Vec, + limit: usize, + ) -> Result, String> { + let query = CdxQuery { + url_pattern: domain_pattern.to_string(), + limit, + ..Default::default() + }; + let records = self.query_cdx(&query).await?; + let mut items = Vec::new(); + + for record in records.iter().take(limit) { + match self.fetch_page(record).await { + Ok(page) => items.push(Self::to_injection_item(&page, category.clone(), tags.clone())), + Err(e) => { + self.stats.errors.fetch_add(1, Ordering::Relaxed); + tracing::warn!("CC page fetch failed for {}: {}", record.url, e); + } + } + } + Ok(items) + } + + /// Get adapter statistics. + pub fn stats(&self) -> (u64, u64, u64, u64, u64) { + ( + self.stats.cdx_queries.load(Ordering::Relaxed), + self.stats.pages_fetched.load(Ordering::Relaxed), + self.stats.pages_extracted.load(Ordering::Relaxed), + self.stats.duplicates_skipped.load(Ordering::Relaxed), + self.stats.errors.load(Ordering::Relaxed), + ) + } + + pub fn seen_urls_count(&self) -> usize { self.seen_urls.len() } + pub fn seen_hashes_count(&self) -> usize { self.seen_hashes.len() } +} + +impl Default for CommonCrawlAdapter { + fn default() -> Self { Self::new() } +} + // ── Tests ──────────────────────────────────────────────────────────── #[cfg(test)] @@ -626,4 +922,69 @@ mod tests { assert_eq!(entries[0].category, Some("news".into())); assert_ne!(entries[0].content_hash, entries[1].content_hash); } + + // ── Common Crawl Tests ──────────────────────────────────────────── + + #[test] + fn test_cdx_query_default() { + let q = CdxQuery::default(); + assert_eq!(q.limit, 100); + assert_eq!(q.mime_filter, Some("text/html".into())); + assert_eq!(q.status_filter, Some("200".into())); + } + + #[test] + fn test_cc_adapter_creation() { + let cc = CommonCrawlAdapter::new(); + let (queries, fetched, extracted, dupes, errors) = cc.stats(); + assert_eq!(queries, 0); + assert_eq!(fetched, 0); + assert_eq!(extracted, 0); + assert_eq!(dupes, 0); + assert_eq!(errors, 0); + } + + #[test] + fn test_cc_warc_extraction() { + let cc = CommonCrawlAdapter::new(); + // Simulated WARC response with HTTP headers + HTML body + let warc = b"WARC/1.0\r\nContent-Type: application/http\r\n\r\n\ + HTTP/1.1 200 OK\r\nContent-Type: text/html\r\n\r\n\ + Test Page\ +

This is the main content of the test page with enough characters to pass the minimum length requirement for extraction.

"; + let result = cc.extract_from_warc(warc); + assert!(result.is_ok()); + let (title, content) = result.unwrap(); + assert_eq!(title, "Test Page"); + assert!(content.contains("main content")); + assert!(!content.contains("bad()")); // Script removed + } + + #[test] + fn test_cc_to_injection_item() { + let page = CrawlPage { + url: "https://example.com/page".into(), + timestamp: "20260315120000".into(), + title: "Example".into(), + content: "Example content".into(), + content_hash: "abc123".into(), + crawl_index: "CC-MAIN-2026-13".into(), + }; + let item = CommonCrawlAdapter::to_injection_item(&page, Some("pattern".into()), vec!["cc".into()]); + assert_eq!(item.source, InjectionSource::CommonCrawl); + assert_eq!(item.title, "Example"); + assert_eq!(item.category, Some("pattern".into())); + assert_eq!(item.tags, vec!["cc"]); + assert_eq!(item.metadata.get("crawl_index").unwrap(), "CC-MAIN-2026-13"); + } + + #[test] + fn test_cc_url_dedup() { + let cc = CommonCrawlAdapter::new(); + cc.seen_urls.insert("https://example.com/1".into(), ()); + cc.seen_urls.insert("https://example.com/2".into(), ()); + assert_eq!(cc.seen_urls_count(), 2); + assert!(cc.seen_urls.contains_key("https://example.com/1")); + assert!(!cc.seen_urls.contains_key("https://example.com/3")); + } } diff --git a/crates/mcp-brain-server/src/routes.rs b/crates/mcp-brain-server/src/routes.rs index 5de998649..35c258862 100644 --- a/crates/mcp-brain-server/src/routes.rs +++ b/crates/mcp-brain-server/src/routes.rs @@ -309,6 +309,9 @@ pub async fn create_router() -> (Router, AppState) { .route("/v1/pipeline/optimize", post(pipeline_optimize)) .route("/v1/pipeline/feeds", post(pipeline_add_feed).get(pipeline_list_feeds)) .route("/v1/pipeline/scheduler/status", get(pipeline_scheduler_status)) + // Common Crawl Integration (ADR-096 §10) + .route("/v1/pipeline/crawl/discover", post(pipeline_crawl_discover)) + .route("/v1/pipeline/crawl/stats", get(pipeline_crawl_stats)) // MCP SSE transport .route("/sse", get(sse_handler)) .route("/messages", post(messages_handler)) @@ -2287,6 +2290,8 @@ async fn optimize_endpoint( }) } } +} + // Cloud Pipeline endpoints (real-time injection + optimization) // ────────────────────────────────────────────────────────────────────── @@ -2802,6 +2807,148 @@ async fn pipeline_scheduler_status( })) } +// ────────────────────────────────────────────────────────────────────── +// Common Crawl Integration (ADR-096 §10) +// ────────────────────────────────────────────────────────────────────── + +/// Request to discover pages from Common Crawl +#[derive(Debug, serde::Deserialize)] +struct CrawlDiscoverRequest { + /// URL pattern to query (e.g., "arxiv.org/abs/*") + domain_pattern: String, + /// Optional crawl index (e.g., "CC-MAIN-2026-13") + crawl_index: Option, + /// Maximum pages to fetch + limit: Option, + /// Category for injected memories + category: Option, + /// Tags for injected memories + #[serde(default)] + tags: Vec, + /// If true, actually inject into brain; if false, just preview + #[serde(default)] + inject: bool, +} + +/// Response from Common Crawl discovery +#[derive(Debug, serde::Serialize)] +struct CrawlDiscoverResponse { + cdx_records_found: usize, + pages_fetched: usize, + pages_injected: usize, + duplicates_skipped: usize, + errors: usize, + memory_ids: Vec, + preview_titles: Vec, +} + +/// POST /v1/pipeline/crawl/discover — query CDX + fetch pages from Common Crawl +async fn pipeline_crawl_discover( + State(state): State, + _contributor: AuthenticatedContributor, + Json(req): Json, +) -> Result, (StatusCode, String)> { + check_read_only(&state)?; + + let cc = crate::pipeline::CommonCrawlAdapter::new(); + if let Some(ref idx) = req.crawl_index { + cc.set_crawl_index(idx).await; + } + + let limit = req.limit.unwrap_or(20).min(100); + let query = crate::pipeline::CdxQuery { + url_pattern: req.domain_pattern.clone(), + crawl_index: req.crawl_index.clone(), + limit, + ..Default::default() + }; + + // Query CDX index + let records = cc.query_cdx(&query).await.map_err(|e| { + (StatusCode::BAD_GATEWAY, format!("CDX query failed: {e}")) + })?; + let cdx_records_found = records.len(); + + // Fetch pages + let items = cc.discover_domain( + &req.domain_pattern, + req.category.clone(), + req.tags.clone(), + limit, + ).await.map_err(|e| { + (StatusCode::INTERNAL_SERVER_ERROR, format!("Discovery failed: {e}")) + })?; + + let pages_fetched = items.len(); + let (_, _, _, dupes, errors) = cc.stats(); + let preview_titles: Vec = items.iter().take(10).map(|i| i.title.clone()).collect(); + + // Optionally inject into brain + let mut memory_ids = Vec::new(); + let mut pages_injected = 0usize; + if req.inject { + for item in &items { + let category = match item.category.as_deref() { + Some("architecture") => crate::types::BrainCategory::Architecture, + Some("solution") => crate::types::BrainCategory::Solution, + Some("convention") => crate::types::BrainCategory::Convention, + Some("security") => crate::types::BrainCategory::Security, + Some("performance") => crate::types::BrainCategory::Performance, + Some("tooling") => crate::types::BrainCategory::Tooling, + Some("debug") => crate::types::BrainCategory::Debug, + _ => crate::types::BrainCategory::Pattern, + }; + let inject_req = crate::types::InjectRequest { + source: "common_crawl".into(), + title: item.title.clone(), + content: item.content.clone(), + tags: item.tags.clone(), + category, + metadata: Some(serde_json::json!(item.metadata)), + }; + match process_inject(&state, inject_req).await { + Ok(resp) => { + memory_ids.push(resp.id); + pages_injected += 1; + } + Err(e) => { + tracing::warn!("Common Crawl inject failed: {}", e); + } + } + } + } + + Ok(Json(CrawlDiscoverResponse { + cdx_records_found, + pages_fetched, + pages_injected, + duplicates_skipped: dupes as usize, + errors: errors as usize, + memory_ids, + preview_titles, + })) +} + +/// GET /v1/pipeline/crawl/stats — Common Crawl adapter statistics +async fn pipeline_crawl_stats( + State(_state): State, +) -> Json { + // Create a fresh adapter (in production, this would be part of AppState) + let cc = crate::pipeline::CommonCrawlAdapter::new(); + let (queries, fetched, extracted, dupes, errors) = cc.stats(); + + Json(serde_json::json!({ + "adapter": "common_crawl", + "cdx_queries": queries, + "pages_fetched": fetched, + "pages_extracted": extracted, + "duplicates_skipped": dupes, + "errors": errors, + "seen_urls": cc.seen_urls_count(), + "seen_hashes": cc.seen_hashes_count(), + })) +} + // ────────────────────────────────────────────────────────────────────── // Brainpedia endpoints (ADR-062) // ────────────────────────────────────────────────────────────────────── diff --git a/crates/mcp-brain-server/src/web_ingest.rs b/crates/mcp-brain-server/src/web_ingest.rs index d0e30c9b8..420490a5b 100644 --- a/crates/mcp-brain-server/src/web_ingest.rs +++ b/crates/mcp-brain-server/src/web_ingest.rs @@ -327,6 +327,8 @@ pub fn attractor_recrawl_priority( /// Use temporal solver to predict content drift for a domain. /// /// High-confidence stability predictions → lower crawl frequency. +/// Only available on x86_64 with SIMD support (temporal-neural-solver). +#[cfg(feature = "x86-simd")] pub fn solver_drift_prediction( solver: &mut temporal_neural_solver::TemporalSolver, recent_embeddings: &[Vec], @@ -347,6 +349,15 @@ pub fn solver_drift_prediction( } } +/// Stub for non-x86 platforms. +#[cfg(not(feature = "x86-simd"))] +pub fn solver_drift_prediction_stub( + _recent_embeddings: &[Vec], +) -> Option { + // Temporal solver not available on this platform + None +} + #[cfg(test)] mod tests { use super::*;