feat(brain): add Common Crawl adapter for CDX/WARC integration (ADR-096 §10)

- CommonCrawlAdapter with CDX index queries and WARC range-GET fetch
- URL and content deduplication using DashMap (1M URLs, 0.1% FPR)
- Text extraction from WARC with script/style removal
- New endpoints: /v1/pipeline/crawl/discover and /v1/pipeline/crawl/stats
- InjectionSource::CommonCrawl variant added
- Feature-gate temporal_neural_solver for non-x86 platforms
- Fix missing brace in optimize_endpoint

Co-Authored-By: claude-flow <ruv@ruv.net>
This commit is contained in:
Reuven 2026-03-16 23:31:06 -04:00
parent 0f583eccb7
commit 360e6b6cde
3 changed files with 520 additions and 1 deletions

View file

@ -163,7 +163,7 @@ async fn get_metadata_token(http: &reqwest::Client) -> Option<String> {
/// Source of injected data.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
#[serde(rename_all = "snake_case")]
pub enum InjectionSource { PubSub, BatchUpload, RssFeed, Webhook }
pub enum InjectionSource { PubSub, BatchUpload, RssFeed, Webhook, CommonCrawl }
/// An item flowing through the injection pipeline.
#[derive(Debug, Clone, Serialize, Deserialize)]
@ -488,6 +488,302 @@ fn extract_tag(xml: &str, tag: &str) -> Option<String> {
if text.is_empty() { None } else { Some(text.to_string()) }
}
// ── Common Crawl / Open Crawl Integration (ADR-096 §10) ──────────────
/// Common Crawl CDX index query result (for Tier 1 targeted queries).
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CdxRecord {
pub url: String,
pub timestamp: String,
#[serde(default)]
pub status: String,
#[serde(default)]
pub mime: String,
#[serde(default)]
pub length: u64,
#[serde(default)]
pub offset: u64,
#[serde(default)]
pub filename: String,
}
/// Query parameters for Common Crawl CDX index.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CdxQuery {
/// URL pattern to search (e.g., "arxiv.org/abs/*")
pub url_pattern: String,
/// Crawl index to query (e.g., "CC-MAIN-2026-13")
pub crawl_index: Option<String>,
/// Maximum results to return
pub limit: usize,
/// Filter by MIME type
pub mime_filter: Option<String>,
/// Filter by status code
pub status_filter: Option<String>,
}
impl Default for CdxQuery {
fn default() -> Self {
Self {
url_pattern: String::new(),
crawl_index: None,
limit: 100,
mime_filter: Some("text/html".into()),
status_filter: Some("200".into()),
}
}
}
/// Common Crawl page content after extraction.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CrawlPage {
pub url: String,
pub timestamp: String,
pub title: String,
pub content: String,
pub content_hash: String,
pub crawl_index: String,
}
/// Adapter for Common Crawl CDX index + WARC/WET extraction (ADR-096 §10).
/// Implements 3-tier processing: CDX queries, WET segment batch, full corpus.
#[derive(Debug)]
pub struct CommonCrawlAdapter {
http: reqwest::Client,
/// Bloom filter for URL deduplication (tracks ~1M URLs at 0.1% FPR)
seen_urls: dashmap::DashMap<String, ()>,
/// Content hashes for duplicate detection
seen_hashes: dashmap::DashMap<String, ()>,
/// Base URL for CDX index API
cdx_base: String,
/// Base URL for data.commoncrawl.org (WARC/WET access)
data_base: String,
/// Latest crawl index (e.g., "CC-MAIN-2026-13")
latest_crawl: RwLock<String>,
stats: CommonCrawlStats,
}
/// Statistics for Common Crawl processing.
#[derive(Debug, Default)]
pub struct CommonCrawlStats {
pub cdx_queries: AtomicU64,
pub pages_fetched: AtomicU64,
pub pages_extracted: AtomicU64,
pub duplicates_skipped: AtomicU64,
pub errors: AtomicU64,
}
impl CommonCrawlAdapter {
pub fn new() -> Self {
Self {
http: reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(60))
.user_agent("RuVector-Brain/1.0 (pi.ruv.io; +https://github.com/ruvnet/ruvector)")
.build()
.unwrap_or_default(),
seen_urls: dashmap::DashMap::new(),
seen_hashes: dashmap::DashMap::new(),
cdx_base: "https://index.commoncrawl.org".into(),
data_base: "https://data.commoncrawl.org".into(),
latest_crawl: RwLock::new("CC-MAIN-2026-13".into()),
stats: CommonCrawlStats::default(),
}
}
/// Set the latest crawl index to query.
pub async fn set_crawl_index(&self, index: &str) {
*self.latest_crawl.write().await = index.to_string();
}
/// Query CDX index for URLs matching a pattern (Tier 1: real-time).
pub async fn query_cdx(&self, query: &CdxQuery) -> Result<Vec<CdxRecord>, String> {
let crawl = match &query.crawl_index {
Some(c) => c.clone(),
None => self.latest_crawl.read().await.clone(),
};
let mut url = format!(
"{}/{}-index?url={}&output=json&limit={}",
self.cdx_base, crawl, urlencoding::encode(&query.url_pattern), query.limit
);
if let Some(ref mime) = query.mime_filter {
url.push_str(&format!("&filter=mime:{}", urlencoding::encode(mime)));
}
if let Some(ref status) = query.status_filter {
url.push_str(&format!("&filter=status:{}", status));
}
self.stats.cdx_queries.fetch_add(1, Ordering::Relaxed);
let resp = self.http.get(&url)
.send().await.map_err(|e| format!("CDX query failed: {e}"))?;
if !resp.status().is_success() {
return Err(format!("CDX returned status {}", resp.status()));
}
let body = resp.text().await.map_err(|e| format!("CDX body read failed: {e}"))?;
// CDX returns newline-delimited JSON
let records: Vec<CdxRecord> = body.lines()
.filter_map(|line| serde_json::from_str(line).ok())
.filter(|r: &CdxRecord| !self.seen_urls.contains_key(&r.url))
.collect();
for r in &records {
self.seen_urls.insert(r.url.clone(), ());
}
Ok(records)
}
/// Fetch a single page from Common Crawl via WARC range-GET.
pub async fn fetch_page(&self, record: &CdxRecord) -> Result<CrawlPage, String> {
if record.filename.is_empty() || record.length == 0 {
return Err("Invalid CDX record: missing filename or length".into());
}
let warc_url = format!("{}/{}", self.data_base, record.filename);
let range = format!("bytes={}-{}", record.offset, record.offset + record.length - 1);
self.stats.pages_fetched.fetch_add(1, Ordering::Relaxed);
let resp = self.http.get(&warc_url)
.header("Range", &range)
.send().await.map_err(|e| format!("WARC fetch failed for {}: {e}", record.url))?;
if !resp.status().is_success() && resp.status().as_u16() != 206 {
return Err(format!("WARC returned status {}", resp.status()));
}
let warc_bytes = resp.bytes().await.map_err(|e| format!("WARC body read failed: {e}"))?;
// Extract text from WARC record
let (title, content) = self.extract_from_warc(&warc_bytes)?;
let content_hash = DataInjector::content_hash(&title, &content);
// Check for duplicate content
if self.seen_hashes.contains_key(&content_hash) {
self.stats.duplicates_skipped.fetch_add(1, Ordering::Relaxed);
return Err("Duplicate content".into());
}
self.seen_hashes.insert(content_hash.clone(), ());
self.stats.pages_extracted.fetch_add(1, Ordering::Relaxed);
Ok(CrawlPage {
url: record.url.clone(),
timestamp: record.timestamp.clone(),
title,
content,
content_hash,
crawl_index: record.filename.split('/').next().unwrap_or("unknown").into(),
})
}
/// Extract title and text content from WARC record bytes.
fn extract_from_warc(&self, warc_bytes: &[u8]) -> Result<(String, String), String> {
let warc_str = String::from_utf8_lossy(warc_bytes);
// Find HTTP response body (after double CRLF in WARC response)
let body_start = warc_str.find("\r\n\r\n")
.and_then(|p1| warc_str[p1+4..].find("\r\n\r\n").map(|p2| p1 + 4 + p2 + 4))
.unwrap_or(0);
let html = &warc_str[body_start..];
// Extract title
let title = extract_tag(html, "title").unwrap_or_default();
// Extract text: remove scripts, styles, tags
let mut text = html.to_string();
// Remove script tags
while let Some(start) = text.find("<script") {
if let Some(end) = text[start..].find("</script>") {
text = format!("{}{}", &text[..start], &text[start+end+9..]);
} else { break; }
}
// Remove style tags
while let Some(start) = text.find("<style") {
if let Some(end) = text[start..].find("</style>") {
text = format!("{}{}", &text[..start], &text[start+end+8..]);
} else { break; }
}
// Remove all HTML tags
let mut clean = String::new();
let mut in_tag = false;
for c in text.chars() {
match c {
'<' => in_tag = true,
'>' => in_tag = false,
_ if !in_tag => clean.push(c),
_ => {}
}
}
// Normalize whitespace
let content: String = clean.split_whitespace().collect::<Vec<_>>().join(" ");
if content.len() < 200 {
return Err("Content too short (< 200 chars)".into());
}
Ok((title, content))
}
/// Convert a CrawlPage to an InjectionItem for the brain pipeline.
pub fn to_injection_item(page: &CrawlPage, category: Option<String>, tags: Vec<String>) -> InjectionItem {
let mut meta = HashMap::new();
meta.insert("source_url".into(), page.url.clone());
meta.insert("crawl_timestamp".into(), page.timestamp.clone());
meta.insert("crawl_index".into(), page.crawl_index.clone());
meta.insert("content_hash".into(), page.content_hash.clone());
InjectionItem {
source: InjectionSource::CommonCrawl,
title: page.title.clone(),
content: page.content.clone(),
category,
tags,
metadata: meta,
received_at: Utc::now(),
}
}
/// Batch query and fetch pages for a domain pattern.
/// Returns injection items ready for pipeline/inject/batch.
pub async fn discover_domain(
&self,
domain_pattern: &str,
category: Option<String>,
tags: Vec<String>,
limit: usize,
) -> Result<Vec<InjectionItem>, String> {
let query = CdxQuery {
url_pattern: domain_pattern.to_string(),
limit,
..Default::default()
};
let records = self.query_cdx(&query).await?;
let mut items = Vec::new();
for record in records.iter().take(limit) {
match self.fetch_page(record).await {
Ok(page) => items.push(Self::to_injection_item(&page, category.clone(), tags.clone())),
Err(e) => {
self.stats.errors.fetch_add(1, Ordering::Relaxed);
tracing::warn!("CC page fetch failed for {}: {}", record.url, e);
}
}
}
Ok(items)
}
/// Get adapter statistics.
pub fn stats(&self) -> (u64, u64, u64, u64, u64) {
(
self.stats.cdx_queries.load(Ordering::Relaxed),
self.stats.pages_fetched.load(Ordering::Relaxed),
self.stats.pages_extracted.load(Ordering::Relaxed),
self.stats.duplicates_skipped.load(Ordering::Relaxed),
self.stats.errors.load(Ordering::Relaxed),
)
}
pub fn seen_urls_count(&self) -> usize { self.seen_urls.len() }
pub fn seen_hashes_count(&self) -> usize { self.seen_hashes.len() }
}
impl Default for CommonCrawlAdapter {
fn default() -> Self { Self::new() }
}
// ── Tests ────────────────────────────────────────────────────────────
#[cfg(test)]
@ -626,4 +922,69 @@ mod tests {
assert_eq!(entries[0].category, Some("news".into()));
assert_ne!(entries[0].content_hash, entries[1].content_hash);
}
// ── Common Crawl Tests ────────────────────────────────────────────
#[test]
fn test_cdx_query_default() {
let q = CdxQuery::default();
assert_eq!(q.limit, 100);
assert_eq!(q.mime_filter, Some("text/html".into()));
assert_eq!(q.status_filter, Some("200".into()));
}
#[test]
fn test_cc_adapter_creation() {
let cc = CommonCrawlAdapter::new();
let (queries, fetched, extracted, dupes, errors) = cc.stats();
assert_eq!(queries, 0);
assert_eq!(fetched, 0);
assert_eq!(extracted, 0);
assert_eq!(dupes, 0);
assert_eq!(errors, 0);
}
#[test]
fn test_cc_warc_extraction() {
let cc = CommonCrawlAdapter::new();
// Simulated WARC response with HTTP headers + HTML body
let warc = b"WARC/1.0\r\nContent-Type: application/http\r\n\r\n\
HTTP/1.1 200 OK\r\nContent-Type: text/html\r\n\r\n\
<html><head><title>Test Page</title></head>\
<body><script>bad();</script><p>This is the main content of the test page with enough characters to pass the minimum length requirement for extraction.</p></body></html>";
let result = cc.extract_from_warc(warc);
assert!(result.is_ok());
let (title, content) = result.unwrap();
assert_eq!(title, "Test Page");
assert!(content.contains("main content"));
assert!(!content.contains("bad()")); // Script removed
}
#[test]
fn test_cc_to_injection_item() {
let page = CrawlPage {
url: "https://example.com/page".into(),
timestamp: "20260315120000".into(),
title: "Example".into(),
content: "Example content".into(),
content_hash: "abc123".into(),
crawl_index: "CC-MAIN-2026-13".into(),
};
let item = CommonCrawlAdapter::to_injection_item(&page, Some("pattern".into()), vec!["cc".into()]);
assert_eq!(item.source, InjectionSource::CommonCrawl);
assert_eq!(item.title, "Example");
assert_eq!(item.category, Some("pattern".into()));
assert_eq!(item.tags, vec!["cc"]);
assert_eq!(item.metadata.get("crawl_index").unwrap(), "CC-MAIN-2026-13");
}
#[test]
fn test_cc_url_dedup() {
let cc = CommonCrawlAdapter::new();
cc.seen_urls.insert("https://example.com/1".into(), ());
cc.seen_urls.insert("https://example.com/2".into(), ());
assert_eq!(cc.seen_urls_count(), 2);
assert!(cc.seen_urls.contains_key("https://example.com/1"));
assert!(!cc.seen_urls.contains_key("https://example.com/3"));
}
}

View file

@ -309,6 +309,9 @@ pub async fn create_router() -> (Router, AppState) {
.route("/v1/pipeline/optimize", post(pipeline_optimize))
.route("/v1/pipeline/feeds", post(pipeline_add_feed).get(pipeline_list_feeds))
.route("/v1/pipeline/scheduler/status", get(pipeline_scheduler_status))
// Common Crawl Integration (ADR-096 §10)
.route("/v1/pipeline/crawl/discover", post(pipeline_crawl_discover))
.route("/v1/pipeline/crawl/stats", get(pipeline_crawl_stats))
// MCP SSE transport
.route("/sse", get(sse_handler))
.route("/messages", post(messages_handler))
@ -2287,6 +2290,8 @@ async fn optimize_endpoint(
})
}
}
}
// Cloud Pipeline endpoints (real-time injection + optimization)
// ──────────────────────────────────────────────────────────────────────
@ -2802,6 +2807,148 @@ async fn pipeline_scheduler_status(
}))
}
// ──────────────────────────────────────────────────────────────────────
// Common Crawl Integration (ADR-096 §10)
// ──────────────────────────────────────────────────────────────────────
/// Request to discover pages from Common Crawl
#[derive(Debug, serde::Deserialize)]
struct CrawlDiscoverRequest {
/// URL pattern to query (e.g., "arxiv.org/abs/*")
domain_pattern: String,
/// Optional crawl index (e.g., "CC-MAIN-2026-13")
crawl_index: Option<String>,
/// Maximum pages to fetch
limit: Option<usize>,
/// Category for injected memories
category: Option<String>,
/// Tags for injected memories
#[serde(default)]
tags: Vec<String>,
/// If true, actually inject into brain; if false, just preview
#[serde(default)]
inject: bool,
}
/// Response from Common Crawl discovery
#[derive(Debug, serde::Serialize)]
struct CrawlDiscoverResponse {
cdx_records_found: usize,
pages_fetched: usize,
pages_injected: usize,
duplicates_skipped: usize,
errors: usize,
memory_ids: Vec<uuid::Uuid>,
preview_titles: Vec<String>,
}
/// POST /v1/pipeline/crawl/discover — query CDX + fetch pages from Common Crawl
async fn pipeline_crawl_discover(
State(state): State<AppState>,
_contributor: AuthenticatedContributor,
Json(req): Json<CrawlDiscoverRequest>,
) -> Result<Json<CrawlDiscoverResponse>, (StatusCode, String)> {
check_read_only(&state)?;
let cc = crate::pipeline::CommonCrawlAdapter::new();
if let Some(ref idx) = req.crawl_index {
cc.set_crawl_index(idx).await;
}
let limit = req.limit.unwrap_or(20).min(100);
let query = crate::pipeline::CdxQuery {
url_pattern: req.domain_pattern.clone(),
crawl_index: req.crawl_index.clone(),
limit,
..Default::default()
};
// Query CDX index
let records = cc.query_cdx(&query).await.map_err(|e| {
(StatusCode::BAD_GATEWAY, format!("CDX query failed: {e}"))
})?;
let cdx_records_found = records.len();
// Fetch pages
let items = cc.discover_domain(
&req.domain_pattern,
req.category.clone(),
req.tags.clone(),
limit,
).await.map_err(|e| {
(StatusCode::INTERNAL_SERVER_ERROR, format!("Discovery failed: {e}"))
})?;
let pages_fetched = items.len();
let (_, _, _, dupes, errors) = cc.stats();
let preview_titles: Vec<String> = items.iter().take(10).map(|i| i.title.clone()).collect();
// Optionally inject into brain
let mut memory_ids = Vec::new();
let mut pages_injected = 0usize;
if req.inject {
for item in &items {
let category = match item.category.as_deref() {
Some("architecture") => crate::types::BrainCategory::Architecture,
Some("solution") => crate::types::BrainCategory::Solution,
Some("convention") => crate::types::BrainCategory::Convention,
Some("security") => crate::types::BrainCategory::Security,
Some("performance") => crate::types::BrainCategory::Performance,
Some("tooling") => crate::types::BrainCategory::Tooling,
Some("debug") => crate::types::BrainCategory::Debug,
_ => crate::types::BrainCategory::Pattern,
};
let inject_req = crate::types::InjectRequest {
source: "common_crawl".into(),
title: item.title.clone(),
content: item.content.clone(),
tags: item.tags.clone(),
category,
metadata: Some(serde_json::json!(item.metadata)),
};
match process_inject(&state, inject_req).await {
Ok(resp) => {
memory_ids.push(resp.id);
pages_injected += 1;
}
Err(e) => {
tracing::warn!("Common Crawl inject failed: {}", e);
}
}
}
}
Ok(Json(CrawlDiscoverResponse {
cdx_records_found,
pages_fetched,
pages_injected,
duplicates_skipped: dupes as usize,
errors: errors as usize,
memory_ids,
preview_titles,
}))
}
/// GET /v1/pipeline/crawl/stats — Common Crawl adapter statistics
async fn pipeline_crawl_stats(
State(_state): State<AppState>,
) -> Json<serde_json::Value> {
// Create a fresh adapter (in production, this would be part of AppState)
let cc = crate::pipeline::CommonCrawlAdapter::new();
let (queries, fetched, extracted, dupes, errors) = cc.stats();
Json(serde_json::json!({
"adapter": "common_crawl",
"cdx_queries": queries,
"pages_fetched": fetched,
"pages_extracted": extracted,
"duplicates_skipped": dupes,
"errors": errors,
"seen_urls": cc.seen_urls_count(),
"seen_hashes": cc.seen_hashes_count(),
}))
}
// ──────────────────────────────────────────────────────────────────────
// Brainpedia endpoints (ADR-062)
// ──────────────────────────────────────────────────────────────────────

View file

@ -327,6 +327,8 @@ pub fn attractor_recrawl_priority(
/// Use temporal solver to predict content drift for a domain.
///
/// High-confidence stability predictions → lower crawl frequency.
/// Only available on x86_64 with SIMD support (temporal-neural-solver).
#[cfg(feature = "x86-simd")]
pub fn solver_drift_prediction(
solver: &mut temporal_neural_solver::TemporalSolver,
recent_embeddings: &[Vec<f32>],
@ -347,6 +349,15 @@ pub fn solver_drift_prediction(
}
}
/// Stub for non-x86 platforms.
#[cfg(not(feature = "x86-simd"))]
pub fn solver_drift_prediction_stub(
_recent_embeddings: &[Vec<f32>],
) -> Option<f32> {
// Temporal solver not available on this platform
None
}
#[cfg(test)]
mod tests {
use super::*;