diff --git a/.gitignore b/.gitignore index de0c163c..cde9884b 100644 --- a/.gitignore +++ b/.gitignore @@ -135,3 +135,5 @@ examples/real-eeg-analysis/data/*.edf examples/real-eeg-multi-seizure/data/*.edf agentdb.rvf agentdb.rvf.lock +.kalshi + diff --git a/Cargo.lock b/Cargo.lock index ed6bc0b3..1baaa4ae 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5921,6 +5921,17 @@ dependencies = [ "serde_json", ] +[[package]] +name = "neural-trader-strategies" +version = "0.1.0" +dependencies = [ + "anyhow", + "neural-trader-core", + "serde", + "serde_json", + "thiserror 2.0.18", +] + [[package]] name = "neural-trader-wasm" version = "0.1.1" @@ -8541,6 +8552,7 @@ dependencies = [ "pkcs1", "pkcs8", "rand_core 0.6.4", + "sha2 0.10.9", "signature", "spki", "subtle", @@ -9784,6 +9796,25 @@ dependencies = [ "thiserror 2.0.18", ] +[[package]] +name = "ruvector-kalshi" +version = "0.1.0" +dependencies = [ + "anyhow", + "base64 0.22.1", + "chrono", + "neural-trader-core", + "rand 0.8.5", + "reqwest 0.12.28", + "rsa", + "serde", + "serde_json", + "sha2 0.10.9", + "thiserror 2.0.18", + "tokio", + "tracing", +] + [[package]] name = "ruvector-learning-wasm" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index 90dd9cec..b2670e89 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -106,6 +106,9 @@ members = [ "crates/neural-trader-coherence", "crates/neural-trader-replay", "crates/neural-trader-wasm", + # Kalshi integration (ADR-151) + "crates/ruvector-kalshi", + "crates/neural-trader-strategies", # RuVix Cognition Kernel (organized under crates/ruvix/) "crates/ruvix/crates/types", "crates/ruvix/crates/region", diff --git a/crates/neural-trader-strategies/Cargo.toml b/crates/neural-trader-strategies/Cargo.toml new file mode 100644 index 00000000..7f0b4a54 --- /dev/null +++ b/crates/neural-trader-strategies/Cargo.toml @@ -0,0 +1,17 @@ +[package] +name = "neural-trader-strategies" +version = "0.1.0" +edition = "2021" +description = "Venue-agnostic strategy + risk-gate runtime for the RuVector Neural Trader (ADR-151)" +license = "MIT OR Apache-2.0" +publish = false + +[dependencies] +neural-trader-core = { path = "../neural-trader-core" } +serde = { workspace = true } +serde_json = { workspace = true } +thiserror = { workspace = true } +anyhow = { workspace = true } + +[dev-dependencies] +serde_json = { workspace = true } diff --git a/crates/neural-trader-strategies/src/ev_kelly.rs b/crates/neural-trader-strategies/src/ev_kelly.rs new file mode 100644 index 00000000..05230f11 --- /dev/null +++ b/crates/neural-trader-strategies/src/ev_kelly.rs @@ -0,0 +1,237 @@ +//! Expected-Value Kelly strategy. +//! +//! Given a model probability `p` for the YES side of a binary contract +//! and a current market price `m` in cents (0..=100), the canonical +//! fractional Kelly sizing for a YES bet is: +//! +//! edge = p - m / 100 +//! kelly = edge / (1 - m / 100) +//! +//! The strategy multiplies `kelly` by a conservative `kelly_fraction` +//! (default 0.25) and by `intent.confidence`, then converts the resulting +//! fraction of cash into a contract count at the current ask. +//! +//! The priors are supplied externally (e.g. by a ruvllm-based news model); +//! this crate simply consumes them via [`ExpectedValueKelly::set_prior`]. +//! +//! Strategies are venue-agnostic — they emit [`Intent`]s; the RiskGate +//! plus a venue adapter decide whether to send an order. + +use std::collections::HashMap; + +use neural_trader_core::{EventType, MarketEvent}; + +use crate::intent::{Action, Intent, Side}; +use crate::Strategy; + +/// Per-symbol state tracked by the strategy. +#[derive(Debug, Clone)] +struct SymbolState { + /// Latest YES-side mid in cents. + latest_mid_cents: Option, + /// External YES-side probability prior, `[0, 1]`. + prior: Option, + /// Last emission sequence — throttle to at most one intent per event kind. + last_emit_seq: u64, +} + +impl Default for SymbolState { + fn default() -> Self { + Self { latest_mid_cents: None, prior: None, last_emit_seq: 0 } + } +} + +#[derive(Debug, Clone)] +pub struct ExpectedValueKellyConfig { + /// Kelly fraction (0, 1]. Quarter-Kelly (0.25) is a common default. + pub kelly_fraction: f64, + /// Total bankroll in cents used for sizing. + pub bankroll_cents: i64, + /// Minimum edge in basis points to emit an intent. Defensive — the + /// RiskGate enforces the final bar, but we avoid spamming intents we + /// know will be rejected. + pub min_edge_bps: i64, + /// Attribution name. + pub strategy_name: &'static str, +} + +impl Default for ExpectedValueKellyConfig { + fn default() -> Self { + Self { + kelly_fraction: 0.25, + bankroll_cents: 100_000, + min_edge_bps: 100, + strategy_name: "ev-kelly", + } + } +} + +#[derive(Debug, Clone, Default)] +pub struct ExpectedValueKelly { + pub config: ExpectedValueKellyConfig, + symbols: HashMap, +} + +impl ExpectedValueKelly { + pub fn new(config: ExpectedValueKellyConfig) -> Self { + Self { config, symbols: HashMap::new() } + } + + /// Install or update the probability prior for a symbol. Priors are + /// clamped into `[0.01, 0.99]` to avoid divide-by-zero at the limits. + pub fn set_prior(&mut self, symbol_id: u32, prior: f64) { + let clamped = prior.clamp(0.01, 0.99); + self.symbols.entry(symbol_id).or_default().prior = Some(clamped); + } + + fn handle_snapshot(&mut self, event: &MarketEvent) -> Option { + // Use BookSnapshot YES-bid as the mid proxy. The normalizer already + // emits separate events per level; we track the most recent bid. + let cents = fp_to_cents(event.price_fp); + let entry = self.symbols.entry(event.symbol_id).or_default(); + entry.latest_mid_cents = Some(cents); + self.maybe_emit(event.symbol_id, event.seq) + } + + fn handle_ticker_or_trade(&mut self, event: &MarketEvent) -> Option { + let cents = fp_to_cents(event.price_fp); + if cents <= 0 { + return None; + } + let entry = self.symbols.entry(event.symbol_id).or_default(); + entry.latest_mid_cents = Some(cents); + self.maybe_emit(event.symbol_id, event.seq) + } + + fn maybe_emit(&mut self, symbol_id: u32, seq: u64) -> Option { + let state = self.symbols.get(&symbol_id)?; + let mid = state.latest_mid_cents?; + let prior = state.prior?; + if mid <= 0 || mid >= 100 { + return None; + } + let m = mid as f64 / 100.0; + let edge = prior - m; + let edge_bps = (edge * 10_000.0).round() as i64; + if edge_bps < self.config.min_edge_bps { + return None; + } + // Quarter-Kelly for YES side: fraction of bankroll = k × edge / (1 - m). + let kelly = (self.config.kelly_fraction * edge / (1.0 - m)).clamp(0.0, 1.0); + let alloc_cents = (self.config.bankroll_cents as f64 * kelly) as i64; + let contracts = alloc_cents / mid.max(1); + if contracts <= 0 { + return None; + } + let entry = self.symbols.get_mut(&symbol_id)?; + entry.last_emit_seq = seq; + Some(Intent { + symbol_id, + side: Side::Yes, + action: Action::Buy, + limit_price_cents: mid, + quantity: contracts, + edge_bps, + confidence: prior, + strategy: self.config.strategy_name, + }) + } +} + +impl Strategy for ExpectedValueKelly { + fn name(&self) -> &'static str { + self.config.strategy_name + } + + fn on_event(&mut self, event: &MarketEvent) -> Option { + match event.event_type { + EventType::Trade | EventType::VenueStatus => self.handle_ticker_or_trade(event), + EventType::BookSnapshot => self.handle_snapshot(event), + _ => None, + } + } +} + +/// The canonical price-fp scale used by `ruvector-kalshi` is `cents × 1e6`. +/// Reverse that here. If other venues use a different scale and want to +/// share this strategy, they should normalize into the same scale first. +fn fp_to_cents(fp: i64) -> i64 { + fp / 1_000_000 +} + +#[cfg(test)] +mod tests { + use super::*; + use neural_trader_core::{EventType, MarketEvent, Side as NtSide}; + + fn snapshot(sym: u32, price_cents: i64, seq: u64) -> MarketEvent { + MarketEvent { + event_id: [0u8; 16], + ts_exchange_ns: 0, + ts_ingest_ns: 0, + venue_id: 1001, + symbol_id: sym, + event_type: EventType::BookSnapshot, + side: Some(NtSide::Bid), + price_fp: price_cents * 1_000_000, + qty_fp: 1_000_000, + order_id_hash: None, + participant_id_hash: None, + flags: 0, + seq, + } + } + + #[test] + fn no_prior_no_intent() { + let mut s = ExpectedValueKelly::new(ExpectedValueKellyConfig::default()); + assert!(s.on_event(&snapshot(1, 24, 0)).is_none()); + } + + #[test] + fn fair_price_no_edge() { + let mut s = ExpectedValueKelly::new(ExpectedValueKellyConfig::default()); + s.set_prior(1, 0.25); + // Mid exactly at prior → edge ~ 0 bps → no intent. + let intent = s.on_event(&snapshot(1, 25, 0)); + assert!(intent.is_none(), "mid==prior should produce no intent"); + } + + #[test] + fn positive_edge_sizes_under_bankroll() { + let mut s = ExpectedValueKelly::new(ExpectedValueKellyConfig { + kelly_fraction: 0.25, + bankroll_cents: 100_000, + min_edge_bps: 100, + strategy_name: "test", + }); + s.set_prior(1, 0.40); + let intent = s.on_event(&snapshot(1, 24, 0)).expect("should emit"); + assert_eq!(intent.symbol_id, 1); + assert!(matches!(intent.side, Side::Yes)); + assert_eq!(intent.limit_price_cents, 24); + assert!(intent.quantity > 0); + // Notional must not exceed bankroll. + assert!(intent.notional_cents() <= 100_000); + // Edge = (0.40 - 0.24) × 10_000 = 1600 bps. + assert_eq!(intent.edge_bps, 1600); + } + + #[test] + fn extreme_prior_is_clamped() { + let mut s = ExpectedValueKelly::new(ExpectedValueKellyConfig::default()); + s.set_prior(1, 1.5); // nonsensical; clamp to 0.99 + let intent = s.on_event(&snapshot(1, 50, 0)).expect("should still emit"); + assert!((intent.confidence - 0.99).abs() < 1e-9); + } + + #[test] + fn mid_at_boundary_rejected() { + let mut s = ExpectedValueKelly::new(ExpectedValueKellyConfig::default()); + s.set_prior(1, 0.80); + // mid = 0 → boundary + assert!(s.on_event(&snapshot(1, 0, 0)).is_none()); + // mid = 100 → boundary + assert!(s.on_event(&snapshot(1, 100, 1)).is_none()); + } +} diff --git a/crates/neural-trader-strategies/src/intent.rs b/crates/neural-trader-strategies/src/intent.rs new file mode 100644 index 00000000..10c218f6 --- /dev/null +++ b/crates/neural-trader-strategies/src/intent.rs @@ -0,0 +1,48 @@ +//! Canonical strategy intent — venue-agnostic. + +use serde::{Deserialize, Serialize}; + +/// Buy (open) or sell (close) side of an action. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +pub enum Action { + Buy, + Sell, +} + +/// For binary event markets like Kalshi: which side of the contract. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +pub enum Side { + Yes, + No, +} + +/// A strategy's desired trade, before the risk gate. +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct Intent { + /// Canonical symbol id (`neural_trader_core::MarketEvent::symbol_id`). + pub symbol_id: u32, + /// Which side of the binary contract. + pub side: Side, + /// Buy to open, sell to close. + pub action: Action, + /// Limit price in Kalshi cents (0..=100). Strategies quote in cents + /// because event markets settle in cents; the adapter upscales into + /// `price_fp` if needed downstream. + pub limit_price_cents: i64, + /// Number of contracts. + pub quantity: i64, + /// Expected edge in basis points over the mid (1 bp = 0.01%). + pub edge_bps: i64, + /// Model confidence in `[0, 1]`. Used by the risk gate as a multiplier + /// on position sizing (higher conviction → larger fraction). + pub confidence: f64, + /// Strategy name for attribution / audit. + pub strategy: &'static str, +} + +impl Intent { + /// Notional in Kalshi cents (`price × quantity`). + pub fn notional_cents(&self) -> i64 { + self.limit_price_cents.saturating_mul(self.quantity) + } +} diff --git a/crates/neural-trader-strategies/src/lib.rs b/crates/neural-trader-strategies/src/lib.rs new file mode 100644 index 00000000..ce8c8838 --- /dev/null +++ b/crates/neural-trader-strategies/src/lib.rs @@ -0,0 +1,35 @@ +//! # neural-trader-strategies +//! +//! Venue-agnostic strategy runtime for the RuVector Neural Trader. +//! +//! Key types: +//! - [`Intent`]: what a strategy wants to do (canonical, venue-agnostic). +//! - [`RiskGate`]: the mandatory wrapper around every `Intent` that enforces +//! position cap, daily-loss kill, concentration, min-edge, and the +//! live-trade env flag. +//! - [`Strategy`]: the trait strategies implement. +//! - [`ev_kelly::ExpectedValueKelly`]: first concrete strategy. +//! +//! Strategies consume [`neural_trader_core::MarketEvent`] and hold no +//! venue-specific state, so the same strategy runs in paper replay and +//! against live Kalshi (or any future venue that normalizes to +//! `MarketEvent`). + +pub mod ev_kelly; +pub mod intent; +pub mod risk; + +pub use ev_kelly::ExpectedValueKelly; +pub use intent::{Action, Intent, Side}; +pub use risk::{PortfolioState, Position, RiskConfig, RiskDecision, RiskGate}; + +use neural_trader_core::MarketEvent; + +/// Strategy trait. Stateless strategies can be `&self`; stateful ones use +/// `&mut self`. Implementers return at most one [`Intent`] per call — if a +/// composite decision is needed, emit the primary intent and let the next +/// event drive follow-ups. +pub trait Strategy { + fn name(&self) -> &'static str; + fn on_event(&mut self, event: &MarketEvent) -> Option; +} diff --git a/crates/neural-trader-strategies/src/risk.rs b/crates/neural-trader-strategies/src/risk.rs new file mode 100644 index 00000000..1987b0bc --- /dev/null +++ b/crates/neural-trader-strategies/src/risk.rs @@ -0,0 +1,333 @@ +//! Risk gate — mandatory wrapper around every [`Intent`]. +//! +//! All checks are in Kalshi cents so we never cross scale boundaries on +//! the hot path. The gate is pure: it never mutates portfolio state, only +//! returns a decision. Updating the portfolio happens after a fill is +//! observed. + +use std::collections::HashMap; + +use crate::intent::{Action, Intent, Side}; + +/// Per-ticker open position. +#[derive(Debug, Clone, Default)] +pub struct Position { + pub symbol_id: u32, + /// Signed contract quantity (YES = +, NO = -). For Kalshi we store + /// YES/NO on separate symbol ids using the same ticker so this stays + /// unsigned in practice, but we keep i64 for future flexibility. + pub quantity: i64, + /// Average fill price in cents. + pub avg_price_cents: i64, +} + +impl Position { + pub fn notional_cents(&self) -> i64 { + self.quantity.saturating_mul(self.avg_price_cents) + } +} + +/// Cluster identifier — a group of correlated contracts (e.g. "all Fed +/// December strikes"). Strategy authors assign cluster ids; the neural- +/// trader-coherence crate produces them in production but tests can use +/// any `u32`. +pub type ClusterId = u32; + +/// Portfolio state observed by the gate. Pure snapshot — the gate never +/// mutates it. +#[derive(Debug, Clone, Default)] +pub struct PortfolioState { + pub cash_cents: i64, + pub starting_cash_cents: i64, + /// Realized P&L for the current UTC day, in cents. Negative = loss. + pub day_pnl_cents: i64, + pub positions: HashMap, + /// Optional symbol→cluster mapping. When populated, concentration is + /// enforced by cluster; otherwise by symbol. + pub clusters: HashMap, +} + +impl PortfolioState { + pub fn total_notional_cents(&self) -> i64 { + self.positions + .values() + .map(|p| p.notional_cents().abs()) + .fold(0i64, |a, b| a.saturating_add(b)) + } + + fn cluster_notional_cents(&self, cluster: ClusterId) -> i64 { + self.positions + .values() + .filter(|p| self.clusters.get(&p.symbol_id).copied() == Some(cluster)) + .map(|p| p.notional_cents().abs()) + .fold(0i64, |a, b| a.saturating_add(b)) + } +} + +/// Gate configuration. All limits are hard — breach → reject. +#[derive(Debug, Clone)] +pub struct RiskConfig { + /// Max notional for a single new position, as a fraction of cash. + /// Default: 0.10 (10%). + pub max_position_frac: f64, + /// Stop trading when realized day P&L ≤ `-max_daily_loss_frac × starting_cash`. + /// Default: 0.03 (3%). + pub max_daily_loss_frac: f64, + /// Minimum edge to open in basis points. Default: 300 bps. + pub min_edge_bps: i64, + /// Max fraction of cash allocated to one cluster. Default: 0.40 (40%). + pub max_cluster_frac: f64, + /// If true, live orders require `KALSHI_ENABLE_LIVE=1`. If false, the + /// gate allows orders regardless (paper mode). + pub require_live_flag: bool, +} + +impl Default for RiskConfig { + fn default() -> Self { + Self { + max_position_frac: 0.10, + max_daily_loss_frac: 0.03, + min_edge_bps: 300, + max_cluster_frac: 0.40, + require_live_flag: true, + } + } +} + +/// Outcome of the gate for a single intent. +#[derive(Debug, Clone, PartialEq)] +pub enum RiskDecision { + Approve(Intent), + Reject { reason: RejectReason, intent: Intent }, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum RejectReason { + EdgeTooThin, + PositionTooLarge, + DailyLossKill, + ClusterConcentration, + LiveTradingDisabled, + InsufficientCash, + NonPositiveQuantity, + PriceOutOfRange, +} + +#[derive(Debug, Clone, Default)] +pub struct RiskGate { + pub config: RiskConfig, +} + +impl RiskGate { + pub fn new(config: RiskConfig) -> Self { + Self { config } + } + + /// Evaluate a single intent against the portfolio snapshot and env. + pub fn evaluate(&self, intent: Intent, portfolio: &PortfolioState) -> RiskDecision { + // 1. Basic sanity (cheapest checks first — failures are immediate). + if intent.quantity <= 0 { + return RiskDecision::Reject { + reason: RejectReason::NonPositiveQuantity, + intent, + }; + } + if intent.limit_price_cents <= 0 || intent.limit_price_cents >= 100 { + return RiskDecision::Reject { + reason: RejectReason::PriceOutOfRange, + intent, + }; + } + + // 2. Daily loss kill — stop *all* opening trades after breach. + let max_loss = (portfolio.starting_cash_cents as f64 + * self.config.max_daily_loss_frac) + .round() as i64; + if intent.action == Action::Buy && portfolio.day_pnl_cents <= -max_loss { + return RiskDecision::Reject { + reason: RejectReason::DailyLossKill, + intent, + }; + } + + // 3. Edge threshold. + if intent.edge_bps < self.config.min_edge_bps { + return RiskDecision::Reject { + reason: RejectReason::EdgeTooThin, + intent, + }; + } + + // 4. Hard cash check — can we actually pay for the contracts? + // Checked before policy caps so the "not enough money" reason is + // distinguishable from "over the configured cap." + let notional = intent.notional_cents(); + if intent.action == Action::Buy && notional > portfolio.cash_cents { + return RiskDecision::Reject { + reason: RejectReason::InsufficientCash, + intent, + }; + } + + // 5. Single-position notional cap (policy). + let position_cap = (portfolio.cash_cents as f64 + * self.config.max_position_frac) as i64; + if intent.action == Action::Buy && notional > position_cap { + return RiskDecision::Reject { + reason: RejectReason::PositionTooLarge, + intent, + }; + } + + // 6. Cluster concentration (policy). + if let Some(cluster) = portfolio.clusters.get(&intent.symbol_id).copied() { + let cluster_cap = (portfolio.cash_cents as f64 + * self.config.max_cluster_frac) as i64; + let existing = portfolio.cluster_notional_cents(cluster); + let projected = existing.saturating_add(notional); + if intent.action == Action::Buy && projected > cluster_cap { + return RiskDecision::Reject { + reason: RejectReason::ClusterConcentration, + intent, + }; + } + } + + // 7. Live-trade env flag (last; cheap but most commonly hit in dev). + if self.config.require_live_flag && is_opening_live_order(&intent) { + if std::env::var("KALSHI_ENABLE_LIVE").ok().as_deref() != Some("1") { + return RiskDecision::Reject { + reason: RejectReason::LiveTradingDisabled, + intent, + }; + } + } + + RiskDecision::Approve(intent) + } +} + +fn is_opening_live_order(intent: &Intent) -> bool { + // Opening buys require live; closing sells can always run (we want to + // be able to exit positions even after the kill switch trips). + intent.action == Action::Buy && matches!(intent.side, Side::Yes | Side::No) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::intent::{Action, Side}; + + fn sample_intent(edge: i64, price: i64, qty: i64) -> Intent { + Intent { + symbol_id: 1, + side: Side::Yes, + action: Action::Buy, + limit_price_cents: price, + quantity: qty, + edge_bps: edge, + confidence: 0.9, + strategy: "test", + } + } + + fn portfolio(cash: i64) -> PortfolioState { + PortfolioState { + cash_cents: cash, + starting_cash_cents: cash, + ..Default::default() + } + } + + #[test] + fn thin_edge_rejected() { + let gate = RiskGate::default(); + let d = gate.evaluate(sample_intent(100, 24, 10), &portfolio(100_000)); + assert!(matches!(d, RiskDecision::Reject { reason: RejectReason::EdgeTooThin, .. })); + } + + #[test] + fn oversize_position_rejected() { + let gate = RiskGate::default(); + // 24¢ × 600 = 14_400¢ vs cap 10% × 100_000 = 10_000¢ + let d = gate.evaluate(sample_intent(500, 24, 600), &portfolio(100_000)); + assert!(matches!(d, RiskDecision::Reject { reason: RejectReason::PositionTooLarge, .. })); + } + + #[test] + fn daily_loss_kill_triggers() { + let gate = RiskGate::default(); + let mut p = portfolio(100_000); + p.day_pnl_cents = -3_001; // just past 3% + let d = gate.evaluate(sample_intent(500, 24, 10), &p); + assert!(matches!(d, RiskDecision::Reject { reason: RejectReason::DailyLossKill, .. })); + } + + #[test] + fn concentration_rejected() { + // Raise the per-position cap so we're testing concentration, not + // position size. Disable live flag so we don't trip on env. + let gate = RiskGate::new(RiskConfig { + require_live_flag: false, + max_position_frac: 0.20, + max_cluster_frac: 0.40, + ..Default::default() + }); + let mut p = portfolio(100_000); + p.clusters.insert(1, 42); + p.clusters.insert(2, 42); + p.positions.insert( + 2, + Position { + symbol_id: 2, + quantity: 1000, + avg_price_cents: 35, // 35_000¢ already in cluster 42 + }, + ); + // 24¢ × 500 = 12_000 < 20% cap (20_000) but existing 35_000 + 12_000 + // = 47_000 > 40% cap (40_000) → concentration rejection. + let d = gate.evaluate(sample_intent(500, 24, 500), &p); + assert!(matches!(d, RiskDecision::Reject { reason: RejectReason::ClusterConcentration, .. })); + } + + #[test] + fn approves_under_paper_config() { + let gate = RiskGate::new(RiskConfig { require_live_flag: false, ..Default::default() }); + let d = gate.evaluate(sample_intent(500, 24, 10), &portfolio(100_000)); + assert!(matches!(d, RiskDecision::Approve(_))); + } + + #[test] + fn live_gate_rejects_without_env() { + // Ensure env flag is off. + std::env::remove_var("KALSHI_ENABLE_LIVE"); + let gate = RiskGate::new(RiskConfig { require_live_flag: true, ..Default::default() }); + let d = gate.evaluate(sample_intent(500, 24, 10), &portfolio(100_000)); + assert!(matches!(d, RiskDecision::Reject { reason: RejectReason::LiveTradingDisabled, .. })); + } + + #[test] + fn rejects_non_positive_qty_and_bad_price() { + let gate = RiskGate::new(RiskConfig { require_live_flag: false, ..Default::default() }); + let d = gate.evaluate(sample_intent(500, 24, 0), &portfolio(100_000)); + assert!(matches!(d, RiskDecision::Reject { reason: RejectReason::NonPositiveQuantity, .. })); + let d = gate.evaluate(sample_intent(500, 0, 10), &portfolio(100_000)); + assert!(matches!(d, RiskDecision::Reject { reason: RejectReason::PriceOutOfRange, .. })); + let d = gate.evaluate(sample_intent(500, 100, 10), &portfolio(100_000)); + assert!(matches!(d, RiskDecision::Reject { reason: RejectReason::PriceOutOfRange, .. })); + } + + #[test] + fn insufficient_cash_rejected_even_under_cap() { + // Oversized cap so the position-size check does not mask the cash + // check. + let gate = RiskGate::new(RiskConfig { + require_live_flag: false, + max_position_frac: 5.0, + ..Default::default() + }); + // 50¢ × 200 = 10_000; cash only 5_000. + let d = gate.evaluate(sample_intent(500, 50, 200), &portfolio(5_000)); + assert!(matches!(d, RiskDecision::Reject { reason: RejectReason::InsufficientCash, .. })); + } +} diff --git a/crates/ruvector-kalshi/Cargo.toml b/crates/ruvector-kalshi/Cargo.toml new file mode 100644 index 00000000..10c744bf --- /dev/null +++ b/crates/ruvector-kalshi/Cargo.toml @@ -0,0 +1,30 @@ +[package] +name = "ruvector-kalshi" +version = "0.1.0" +edition = "2021" +description = "Kalshi exchange integration for the RuVector Neural Trader (ADR-151)" +license = "MIT OR Apache-2.0" +publish = false + +[dependencies] +neural-trader-core = { path = "../neural-trader-core" } +serde = { workspace = true } +serde_json = { workspace = true } +thiserror = { workspace = true } +anyhow = { workspace = true } +tokio = { workspace = true, features = ["rt-multi-thread", "sync", "macros", "process", "time", "fs"] } +chrono = { workspace = true } +tracing = { workspace = true } + +# Crypto — Kalshi requires RSA-PSS-SHA256 +rsa = { version = "0.9", features = ["sha2"] } +sha2 = "0.10" +rand = { workspace = true } +base64 = "0.22" + +# HTTP — match existing graph (mcp-brain uses reqwest 0.12 + rustls) +reqwest = { version = "0.12", default-features = false, features = ["json", "rustls-tls"] } + +[dev-dependencies] +serde_json = { workspace = true } +tokio = { workspace = true, features = ["macros", "rt"] } diff --git a/crates/ruvector-kalshi/examples/validate.rs b/crates/ruvector-kalshi/examples/validate.rs new file mode 100644 index 00000000..091eb221 --- /dev/null +++ b/crates/ruvector-kalshi/examples/validate.rs @@ -0,0 +1,70 @@ +//! Offline validator: loads the real Kalshi credentials via `SecretLoader`, +//! signs a sample request, verifies the signature round-trips. +//! +//! No network calls are made. This proves the actual PEM on disk (or in +//! GCS) is loadable by this crate and produces valid RSA-PSS-SHA256 +//! signatures. +//! +//! Run: +//! # Offline, local PEM: +//! KALSHI_SECRET_SOURCE=local cargo run -p ruvector-kalshi --example validate +//! +//! # From Google Cloud Secret Manager (needs `gcloud` auth): +//! cargo run -p ruvector-kalshi --example validate + +use ruvector_kalshi::{ + auth::{self, Signer}, + secrets::SecretLoader, +}; + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + let project = std::env::var("KALSHI_GCP_PROJECT").unwrap_or_else(|_| "ruv-dev".into()); + let loader = SecretLoader::new(project); + let creds = loader.load().await?; + println!("loaded credentials: {creds:?}"); + + let signer = Signer::from_pem(&creds.api_key, &creds.private_key_pem)?; + println!("signer ready: {signer:?}"); + + let ts_ms = 1_700_000_000_000u64; + let method = "GET"; + let path = "/trade-api/v2/exchange/status"; + + let headers = signer.sign_with_ts(ts_ms, method, path); + println!( + "signed (ts={}, method={method}, path={path}):\n \ + KALSHI-ACCESS-KEY: \n \ + KALSHI-ACCESS-TIMESTAMP: {}\n \ + KALSHI-ACCESS-SIGNATURE: ", + ts_ms, + headers.access_key.len(), + headers.timestamp_ms, + headers.signature_b64.len(), + ); + + auth::verify( + &signer.verifying_key(), + ts_ms, + method, + path, + &headers.signature_b64, + )?; + println!("signature verified against derived public key — OK"); + + // Tampered verification must fail. + let tamper = auth::verify( + &signer.verifying_key(), + ts_ms, + method, + "/trade-api/v2/portfolio", + &headers.signature_b64, + ); + if tamper.is_ok() { + anyhow::bail!("tampered path unexpectedly verified — signing is broken"); + } + println!("tamper check rejected — OK"); + + println!("\nall offline checks passed."); + Ok(()) +} diff --git a/crates/ruvector-kalshi/src/auth.rs b/crates/ruvector-kalshi/src/auth.rs new file mode 100644 index 00000000..43363796 --- /dev/null +++ b/crates/ruvector-kalshi/src/auth.rs @@ -0,0 +1,212 @@ +//! Kalshi RSA-PSS-SHA256 request signing. +//! +//! Kalshi REST endpoints require three headers on every authenticated call: +//! - `KALSHI-ACCESS-KEY`: the API key UUID +//! - `KALSHI-ACCESS-TIMESTAMP`: unix time in milliseconds +//! - `KALSHI-ACCESS-SIGNATURE`: base64(RSA-PSS-SHA256(pem, ts || method || path)) +//! +//! The canonical string is `format!("{ts}{method}{path}")` with no separators. + +use base64::{engine::general_purpose::STANDARD, Engine}; +use rsa::pkcs1::DecodeRsaPrivateKey; +use rsa::pkcs8::DecodePrivateKey; +use rsa::pss::{SigningKey, VerifyingKey}; +use rsa::signature::{Keypair, RandomizedSigner, SignatureEncoding, Verifier}; +use rsa::traits::PublicKeyParts; +use rsa::RsaPrivateKey; +use sha2::Sha256; + +use crate::{KalshiError, Result}; + +/// Signed headers for a single Kalshi REST request. +#[derive(Debug, Clone)] +pub struct SignedHeaders { + pub access_key: String, + pub timestamp_ms: String, + pub signature_b64: String, +} + +impl SignedHeaders { + /// Apply this signature set onto a [`reqwest::RequestBuilder`]. + pub fn apply(self, rb: reqwest::RequestBuilder) -> reqwest::RequestBuilder { + rb.header("KALSHI-ACCESS-KEY", self.access_key) + .header("KALSHI-ACCESS-TIMESTAMP", self.timestamp_ms) + .header("KALSHI-ACCESS-SIGNATURE", self.signature_b64) + } +} + +/// Kalshi request signer. Holds a parsed RSA private key and the caller's +/// API key string. Cheap to clone (both fields clone trivially). +#[derive(Clone)] +pub struct Signer { + api_key: String, + signing_key: SigningKey, +} + +impl std::fmt::Debug for Signer { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + // Never leak the API key or key material in Debug output. + f.debug_struct("Signer") + .field("api_key_len", &self.api_key.len()) + .field("key_size_bits", &self.signing_key.as_ref().size()) + .finish() + } +} + +impl Signer { + /// Build a signer from a PEM string (accepts PKCS#1 or PKCS#8) and API key. + pub fn from_pem(api_key: impl Into, pem: &str) -> Result { + let private_key = parse_rsa_pem(pem)?; + Ok(Self { + api_key: api_key.into(), + signing_key: SigningKey::::new(private_key), + }) + } + + /// Sign for the given method + path using the current wall-clock time. + pub fn sign_now(&self, method: &str, path: &str) -> SignedHeaders { + let ts = current_ts_ms(); + self.sign_with_ts(ts, method, path) + } + + /// Sign with an explicit timestamp (deterministic — for tests/replay). + pub fn sign_with_ts(&self, ts_ms: u64, method: &str, path: &str) -> SignedHeaders { + let msg = canonical_string(ts_ms, method, path); + let mut rng = rand::thread_rng(); + let sig = self.signing_key.sign_with_rng(&mut rng, msg.as_bytes()); + SignedHeaders { + access_key: self.api_key.clone(), + timestamp_ms: ts_ms.to_string(), + signature_b64: STANDARD.encode(sig.to_bytes()), + } + } + + /// Derive the matching verifying key (useful for round-trip tests). + pub fn verifying_key(&self) -> VerifyingKey { + self.signing_key.verifying_key() + } +} + +fn canonical_string(ts_ms: u64, method: &str, path: &str) -> String { + // Kalshi spec: concat ts, method (upper-case), path exactly as sent. + format!("{ts_ms}{}{path}", method.to_uppercase()) +} + +fn current_ts_ms() -> u64 { + use std::time::{SystemTime, UNIX_EPOCH}; + SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("system clock before UNIX epoch") + .as_millis() as u64 +} + +fn parse_rsa_pem(pem: &str) -> Result { + // Accept either `-----BEGIN RSA PRIVATE KEY-----` (PKCS#1) or + // `-----BEGIN PRIVATE KEY-----` (PKCS#8). + if pem.contains("BEGIN RSA PRIVATE KEY") { + RsaPrivateKey::from_pkcs1_pem(pem) + .map_err(|e| KalshiError::InvalidPem(format!("pkcs1: {e}"))) + } else { + RsaPrivateKey::from_pkcs8_pem(pem) + .map_err(|e| KalshiError::InvalidPem(format!("pkcs8: {e}"))) + } +} + +/// Verify a signature against the public half. Test-only helper. +pub fn verify( + vk: &VerifyingKey, + ts_ms: u64, + method: &str, + path: &str, + sig_b64: &str, +) -> Result<()> { + use rsa::pss::Signature; + let sig_bytes = STANDARD + .decode(sig_b64) + .map_err(|e| KalshiError::Signing(format!("base64: {e}")))?; + let sig = Signature::try_from(sig_bytes.as_slice()) + .map_err(|e| KalshiError::Signing(format!("sig decode: {e}")))?; + let msg = canonical_string(ts_ms, method, path); + vk.verify(msg.as_bytes(), &sig) + .map_err(|e| KalshiError::Signing(format!("verify: {e}"))) +} + +#[cfg(test)] +mod tests { + use super::*; + use rsa::pkcs1::EncodeRsaPrivateKey; + + /// Generate a fresh 2048-bit RSA key in PKCS#1 PEM form for tests. + fn gen_pkcs1_pem() -> String { + let mut rng = rand::thread_rng(); + let key = RsaPrivateKey::new(&mut rng, 2048).expect("keygen"); + key.to_pkcs1_pem(rsa::pkcs1::LineEnding::LF) + .expect("encode pkcs1") + .to_string() + } + + #[test] + fn signer_roundtrip_pkcs1() { + let pem = gen_pkcs1_pem(); + let signer = Signer::from_pem("test-key-uuid", &pem).unwrap(); + + let headers = signer.sign_with_ts(1_700_000_000_000, "GET", "/trade-api/v2/portfolio"); + assert_eq!(headers.access_key, "test-key-uuid"); + assert_eq!(headers.timestamp_ms, "1700000000000"); + assert!(!headers.signature_b64.is_empty()); + + // Round-trip through the verifying key proves the signature is well-formed. + verify( + &signer.verifying_key(), + 1_700_000_000_000, + "GET", + "/trade-api/v2/portfolio", + &headers.signature_b64, + ) + .unwrap(); + } + + #[test] + fn signer_rejects_tampered_message() { + let pem = gen_pkcs1_pem(); + let signer = Signer::from_pem("k", &pem).unwrap(); + let headers = signer.sign_with_ts(42, "POST", "/orders"); + + // Verifying against a different path must fail. + let err = verify( + &signer.verifying_key(), + 42, + "POST", + "/orders/tampered", + &headers.signature_b64, + ); + assert!(err.is_err(), "tampered path must not verify"); + } + + #[test] + fn canonical_string_is_stable() { + assert_eq!( + canonical_string(1, "get", "/foo"), + "1GET/foo", + "method must be upper-cased", + ); + } + + #[test] + fn invalid_pem_is_rejected() { + let err = Signer::from_pem("k", "not a pem").unwrap_err(); + match err { + KalshiError::InvalidPem(_) => {} + other => panic!("expected InvalidPem, got {other:?}"), + } + } + + #[test] + fn debug_does_not_leak_key() { + let pem = gen_pkcs1_pem(); + let signer = Signer::from_pem("super-secret-api-key-1234567890", &pem).unwrap(); + let dbg = format!("{signer:?}"); + assert!(!dbg.contains("super-secret-api-key")); + assert!(!dbg.contains("BEGIN RSA")); + } +} diff --git a/crates/ruvector-kalshi/src/lib.rs b/crates/ruvector-kalshi/src/lib.rs new file mode 100644 index 00000000..ca935284 --- /dev/null +++ b/crates/ruvector-kalshi/src/lib.rs @@ -0,0 +1,62 @@ +//! # ruvector-kalshi +//! +//! Kalshi exchange integration for the RuVector Neural Trader. +//! +//! See ADR-151 for design. This crate provides: +//! - RSA-PSS-SHA256 request signing against Kalshi's REST API +//! - Typed DTOs for Kalshi market/event/order/fill payloads +//! - Normalization from Kalshi payloads into +//! [`neural_trader_core::MarketEvent`] so downstream coherence, attention, +//! and replay pipelines work unchanged. +//! - A REST client scaffold (live calls are gated behind a runtime flag) +//! - Secret loading from Google Cloud Secret Manager or a local PEM file + +pub mod auth; +pub mod models; +pub mod normalize; +pub mod rest; +pub mod secrets; +pub mod ws; + +use thiserror::Error; + +/// Venue identifier for Kalshi in [`neural_trader_core::MarketEvent::venue_id`]. +pub const KALSHI_VENUE_ID: u16 = 1001; + +/// Default Kalshi REST base URL (production). +pub const KALSHI_API_URL: &str = "https://trading-api.kalshi.com/trade-api/v2"; + +/// Default Kalshi WebSocket URL (production). +pub const KALSHI_WS_URL: &str = "wss://trading-api.kalshi.com/trade-api/ws/v2"; + +/// Kalshi fixed-point scale for price. Kalshi quotes in cents 0..=100; +/// we upscale into the canonical `price_fp` scale (value × 1e8) so +/// that a 24¢ YES becomes 24_000_000. +pub const KALSHI_PRICE_FP_SCALE: i64 = 1_000_000; + +/// Crate-wide error type. +#[derive(Debug, Error)] +pub enum KalshiError { + #[error("invalid PEM key material: {0}")] + InvalidPem(String), + + #[error("signing failed: {0}")] + Signing(String), + + #[error("secret source error: {0}")] + Secret(String), + + #[error("HTTP error: {0}")] + Http(#[from] reqwest::Error), + + #[error("decode error: {0}")] + Decode(#[from] serde_json::Error), + + #[error("api error {status}: {body}")] + Api { status: u16, body: String }, + + #[error("normalization error: {0}")] + Normalize(String), +} + +pub type Result = std::result::Result; diff --git a/crates/ruvector-kalshi/src/models.rs b/crates/ruvector-kalshi/src/models.rs new file mode 100644 index 00000000..0160c859 --- /dev/null +++ b/crates/ruvector-kalshi/src/models.rs @@ -0,0 +1,252 @@ +//! Kalshi REST + WebSocket DTOs. Only the fields we actually consume are +//! declared; everything else is ignored by `#[serde(deny_unknown_fields)]` +//! being *off* — we intentionally accept forward-compatible payloads. + +use serde::{Deserialize, Serialize}; + +/// Market metadata (GET /markets, /markets/{ticker}). +#[derive(Debug, Clone, Deserialize, Serialize)] +pub struct Market { + pub ticker: String, + pub event_ticker: Option, + pub title: Option, + pub status: Option, + pub yes_bid: Option, + pub yes_ask: Option, + pub no_bid: Option, + pub no_ask: Option, + pub last_price: Option, + pub volume: Option, + pub open_interest: Option, + pub close_time: Option, + pub expiration_time: Option, +} + +/// Envelope for list-markets response. +#[derive(Debug, Clone, Deserialize)] +pub struct MarketsResponse { + pub markets: Vec, + pub cursor: Option, +} + +/// Single trade print (GET /markets/trades). +#[derive(Debug, Clone, Deserialize, Serialize)] +pub struct KalshiTrade { + pub ticker: String, + pub trade_id: String, + pub yes_price: Option, + pub no_price: Option, + pub count: i64, + pub taker_side: Option, // "yes" | "no" + pub created_time: String, // ISO-8601 +} + +/// Orderbook snapshot (GET /markets/{ticker}/orderbook). +#[derive(Debug, Clone, Deserialize, Serialize)] +pub struct OrderbookSnapshot { + /// Each entry is `[price_cents, contracts]`. + pub yes: Vec<[i64; 2]>, + pub no: Vec<[i64; 2]>, +} + +/// Wrapper for orderbook GET envelope. +#[derive(Debug, Clone, Deserialize)] +pub struct OrderbookResponse { + pub orderbook: OrderbookSnapshot, +} + +/// Order placement payload (POST /portfolio/orders). +#[derive(Debug, Clone, Serialize)] +pub struct NewOrder { + pub ticker: String, + pub action: OrderAction, + pub side: OrderSide, + #[serde(rename = "type")] + pub order_type: OrderType, + pub count: i64, + #[serde(skip_serializing_if = "Option::is_none")] + pub yes_price: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub no_price: Option, + pub client_order_id: String, +} + +#[derive(Debug, Clone, Copy, Serialize)] +#[serde(rename_all = "lowercase")] +pub enum OrderAction { + Buy, + Sell, +} + +#[derive(Debug, Clone, Copy, Serialize)] +#[serde(rename_all = "lowercase")] +pub enum OrderSide { + Yes, + No, +} + +#[derive(Debug, Clone, Copy, Serialize)] +#[serde(rename_all = "lowercase")] +pub enum OrderType { + Limit, + Market, +} + +/// Response from POST /portfolio/orders. +#[derive(Debug, Clone, Deserialize)] +pub struct OrderAck { + pub order: OrderRecord, +} + +#[derive(Debug, Clone, Deserialize, Serialize)] +pub struct OrderRecord { + pub order_id: String, + pub client_order_id: Option, + pub status: String, + pub ticker: String, + pub count: i64, + pub filled_count: Option, + pub remaining_count: Option, +} + +/// Raw Kalshi WS envelope. `{"type": "...", "msg": {...}}`. `msg` is +/// kept as a `Value` so unknown `type` tags don't fail the parse — the +/// decoder routes on `msg_type`. +#[derive(Debug, Clone, Deserialize)] +pub struct WsEnvelope { + #[serde(rename = "type")] + pub msg_type: String, + pub msg: serde_json::Value, +} + +/// Typed WS messages routed from [`WsEnvelope`] by the decoder. +#[derive(Debug, Clone)] +pub enum WsMessage { + Ticker(WsTicker), + Trade(WsTrade), + OrderbookSnapshot(WsOrderbook), + OrderbookDelta(WsOrderbookDelta), + Fill(WsFill), + /// Any non-data frame (heartbeat, ack, etc.) or unknown `type` tag. + Other, +} + +impl WsMessage { + /// Decode an envelope into a typed message. Unknown type tags produce + /// [`WsMessage::Other`] rather than an error so forward-compatible + /// payloads don't kill the feed. + pub fn from_envelope(env: WsEnvelope) -> serde_json::Result { + Ok(match env.msg_type.as_str() { + "ticker" => Self::Ticker(serde_json::from_value(env.msg)?), + "trade" => Self::Trade(serde_json::from_value(env.msg)?), + "orderbook_snapshot" => Self::OrderbookSnapshot(serde_json::from_value(env.msg)?), + "orderbook_delta" => Self::OrderbookDelta(serde_json::from_value(env.msg)?), + "fill" => Self::Fill(serde_json::from_value(env.msg)?), + _ => Self::Other, + }) + } +} + +#[derive(Debug, Clone, Deserialize)] +pub struct WsTicker { + pub market_ticker: String, + pub yes_bid: Option, + pub yes_ask: Option, + pub price: Option, + pub ts: Option, +} + +#[derive(Debug, Clone, Deserialize)] +pub struct WsTrade { + pub market_ticker: String, + pub yes_price: Option, + pub no_price: Option, + pub count: i64, + pub taker_side: Option, + pub ts: Option, +} + +#[derive(Debug, Clone, Deserialize)] +pub struct WsOrderbook { + pub market_ticker: String, + pub yes: Vec<[i64; 2]>, + pub no: Vec<[i64; 2]>, + pub ts: Option, +} + +#[derive(Debug, Clone, Deserialize)] +pub struct WsOrderbookDelta { + pub market_ticker: String, + pub side: String, // "yes" | "no" + pub price: i64, + pub delta: i64, + pub ts: Option, +} + +#[derive(Debug, Clone, Deserialize)] +pub struct WsFill { + pub market_ticker: String, + pub order_id: String, + pub yes_price: Option, + pub no_price: Option, + pub count: i64, + pub side: String, + pub ts: Option, +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn market_deserializes_with_optional_fields() { + let json = r#"{ + "ticker": "FED-23DEC-T3.00", + "title": "Fed raises rates", + "status": "active", + "yes_bid": 24, + "yes_ask": 26, + "volume": 1200 + }"#; + let m: Market = serde_json::from_str(json).unwrap(); + assert_eq!(m.ticker, "FED-23DEC-T3.00"); + assert_eq!(m.yes_bid, Some(24)); + assert!(m.no_bid.is_none()); + } + + #[test] + fn ws_message_dispatch() { + let json = r#"{"type":"ticker","msg":{"market_ticker":"X","yes_bid":10,"yes_ask":12}}"#; + let env: WsEnvelope = serde_json::from_str(json).unwrap(); + let msg = WsMessage::from_envelope(env).unwrap(); + assert!(matches!(msg, WsMessage::Ticker(ref t) if t.market_ticker == "X")); + } + + #[test] + fn ws_message_unknown_kind_does_not_error() { + let json = r#"{"type":"heartbeat","msg":{}}"#; + let env: WsEnvelope = serde_json::from_str(json).unwrap(); + let msg = WsMessage::from_envelope(env).unwrap(); + assert!(matches!(msg, WsMessage::Other)); + } + + #[test] + fn new_order_limit_serializes() { + let o = NewOrder { + ticker: "X".into(), + action: OrderAction::Buy, + side: OrderSide::Yes, + order_type: OrderType::Limit, + count: 10, + yes_price: Some(24), + no_price: None, + client_order_id: "abc".into(), + }; + let s = serde_json::to_string(&o).unwrap(); + assert!(s.contains("\"type\":\"limit\"")); + assert!(s.contains("\"action\":\"buy\"")); + assert!(s.contains("\"yes_price\":24")); + // None field must be omitted. + assert!(!s.contains("no_price")); + } +} diff --git a/crates/ruvector-kalshi/src/normalize.rs b/crates/ruvector-kalshi/src/normalize.rs new file mode 100644 index 00000000..a25675a9 --- /dev/null +++ b/crates/ruvector-kalshi/src/normalize.rs @@ -0,0 +1,344 @@ +//! Normalize Kalshi payloads into [`neural_trader_core::MarketEvent`] so the +//! existing coherence, attention, CNN, and replay pipelines can consume +//! Kalshi as just another venue. +//! +//! Symbol-id assignment is deterministic via FNV-1a over the Kalshi ticker +//! so the same contract always maps to the same `symbol_id` across runs +//! without needing an external registry. + +use chrono::DateTime; +use neural_trader_core::{EventType, MarketEvent, Side}; + +use crate::models::{ + KalshiTrade, OrderbookSnapshot, WsFill, WsOrderbook, WsOrderbookDelta, WsTicker, WsTrade, +}; +use crate::{KalshiError, Result, KALSHI_PRICE_FP_SCALE, KALSHI_VENUE_ID}; + +/// Stable, deterministic symbol id from a Kalshi ticker. FNV-1a 32-bit, +/// ignoring case so `"FED-DEC23"` and `"fed-dec23"` collide (desired). +pub fn symbol_id_for(ticker: &str) -> u32 { + let mut hash: u32 = 0x811c_9dc5; + for b in ticker.as_bytes() { + let c = b.to_ascii_lowercase(); + hash ^= c as u32; + hash = hash.wrapping_mul(0x0100_0193); + } + hash +} + +/// Convert a Kalshi cent price (0..=100) into canonical fixed-point. +pub fn cents_to_fp(cents: i64) -> i64 { + cents.saturating_mul(KALSHI_PRICE_FP_SCALE) +} + +fn parse_iso_ns(ts: &str) -> Result { + let dt = DateTime::parse_from_rfc3339(ts) + .map_err(|e| KalshiError::Normalize(format!("parse ts {ts}: {e}")))?; + let ns = dt.timestamp_nanos_opt().ok_or_else(|| { + KalshiError::Normalize(format!("ts {ts} out of range for i64 ns")) + })?; + Ok(ns.max(0) as u64) +} + +fn ms_to_ns(ts_ms: Option) -> u64 { + ts_ms.map(|t| (t.max(0) as u64).saturating_mul(1_000_000)).unwrap_or_else(now_ns) +} + +fn now_ns() -> u64 { + use std::time::{SystemTime, UNIX_EPOCH}; + SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|d| d.as_nanos() as u64) + .unwrap_or(0) +} + +fn empty_event_id() -> [u8; 16] { + [0u8; 16] +} + +fn event_id_from_str(key: &str) -> [u8; 16] { + // 128-bit FNV-1a. Produces a stable id from trade_id / order_id strings. + let mut hash: u128 = 0x6c62_272e_07bb_0142_62b8_2175_6295_c58d; + for b in key.as_bytes() { + hash ^= *b as u128; + hash = hash.wrapping_mul(0x0000_0000_0100_0000_0000_0000_0000_013B); + } + hash.to_le_bytes() +} + +// --------------------------------------------------------------------------- +// REST normalizers +// --------------------------------------------------------------------------- + +/// Normalize a REST `KalshiTrade` into a canonical Trade event. +pub fn trade_to_event(t: &KalshiTrade, seq: u64) -> Result { + let ts_ns = parse_iso_ns(&t.created_time)?; + let price_cents = t.yes_price.unwrap_or(0); + let side = match t.taker_side.as_deref() { + Some("yes") => Some(Side::Bid), + Some("no") => Some(Side::Ask), + _ => None, + }; + Ok(MarketEvent { + event_id: event_id_from_str(&t.trade_id), + ts_exchange_ns: ts_ns, + ts_ingest_ns: now_ns(), + venue_id: KALSHI_VENUE_ID, + symbol_id: symbol_id_for(&t.ticker), + event_type: EventType::Trade, + side, + price_fp: cents_to_fp(price_cents), + qty_fp: t.count.saturating_mul(KALSHI_PRICE_FP_SCALE), + order_id_hash: None, + participant_id_hash: None, + flags: 0, + seq, + }) +} + +/// Expand an orderbook snapshot into per-level MarketEvents. Each Kalshi +/// level becomes one `BookSnapshot` event with the level's price/size. +pub fn orderbook_to_events( + ticker: &str, + ts_ns: u64, + ob: &OrderbookSnapshot, + starting_seq: u64, +) -> Vec { + let sym = symbol_id_for(ticker); + let mut out = Vec::with_capacity(ob.yes.len() + ob.no.len()); + let mut seq = starting_seq; + for [price, size] in &ob.yes { + out.push(MarketEvent { + event_id: empty_event_id(), + ts_exchange_ns: ts_ns, + ts_ingest_ns: now_ns(), + venue_id: KALSHI_VENUE_ID, + symbol_id: sym, + event_type: EventType::BookSnapshot, + side: Some(Side::Bid), + price_fp: cents_to_fp(*price), + qty_fp: size.saturating_mul(KALSHI_PRICE_FP_SCALE), + order_id_hash: None, + participant_id_hash: None, + flags: 0, + seq, + }); + seq = seq.wrapping_add(1); + } + for [price, size] in &ob.no { + out.push(MarketEvent { + event_id: empty_event_id(), + ts_exchange_ns: ts_ns, + ts_ingest_ns: now_ns(), + venue_id: KALSHI_VENUE_ID, + symbol_id: sym, + event_type: EventType::BookSnapshot, + side: Some(Side::Ask), + price_fp: cents_to_fp(*price), + qty_fp: size.saturating_mul(KALSHI_PRICE_FP_SCALE), + order_id_hash: None, + participant_id_hash: None, + flags: 0, + seq, + }); + seq = seq.wrapping_add(1); + } + out +} + +// --------------------------------------------------------------------------- +// WebSocket normalizers +// --------------------------------------------------------------------------- + +pub fn ws_ticker_to_event(t: &WsTicker, seq: u64) -> MarketEvent { + // Use the mid as the canonical price. Fallback to `price` if bids/asks + // absent (end-of-day tickers). + let price = match (t.yes_bid, t.yes_ask, t.price) { + (Some(b), Some(a), _) => (b + a) / 2, + (_, _, Some(p)) => p, + _ => 0, + }; + MarketEvent { + event_id: empty_event_id(), + ts_exchange_ns: ms_to_ns(t.ts), + ts_ingest_ns: now_ns(), + venue_id: KALSHI_VENUE_ID, + symbol_id: symbol_id_for(&t.market_ticker), + event_type: EventType::VenueStatus, + side: None, + price_fp: cents_to_fp(price), + qty_fp: 0, + order_id_hash: None, + participant_id_hash: None, + flags: 0, + seq, + } +} + +pub fn ws_trade_to_event(t: &WsTrade, seq: u64) -> MarketEvent { + let side = match t.taker_side.as_deref() { + Some("yes") => Some(Side::Bid), + Some("no") => Some(Side::Ask), + _ => None, + }; + MarketEvent { + event_id: empty_event_id(), + ts_exchange_ns: ms_to_ns(t.ts), + ts_ingest_ns: now_ns(), + venue_id: KALSHI_VENUE_ID, + symbol_id: symbol_id_for(&t.market_ticker), + event_type: EventType::Trade, + side, + price_fp: cents_to_fp(t.yes_price.unwrap_or(0)), + qty_fp: t.count.saturating_mul(KALSHI_PRICE_FP_SCALE), + order_id_hash: None, + participant_id_hash: None, + flags: 0, + seq, + } +} + +pub fn ws_orderbook_to_events(ob: &WsOrderbook, starting_seq: u64) -> Vec { + let ts_ns = ms_to_ns(ob.ts); + orderbook_to_events( + &ob.market_ticker, + ts_ns, + &OrderbookSnapshot { + yes: ob.yes.clone(), + no: ob.no.clone(), + }, + starting_seq, + ) +} + +pub fn ws_orderbook_delta_to_event(d: &WsOrderbookDelta, seq: u64) -> MarketEvent { + let side = match d.side.as_str() { + "yes" => Some(Side::Bid), + "no" => Some(Side::Ask), + _ => None, + }; + let event_type = if d.delta > 0 { + EventType::NewOrder + } else { + EventType::CancelOrder + }; + MarketEvent { + event_id: empty_event_id(), + ts_exchange_ns: ms_to_ns(d.ts), + ts_ingest_ns: now_ns(), + venue_id: KALSHI_VENUE_ID, + symbol_id: symbol_id_for(&d.market_ticker), + event_type, + side, + price_fp: cents_to_fp(d.price), + qty_fp: d.delta.abs().saturating_mul(KALSHI_PRICE_FP_SCALE), + order_id_hash: None, + participant_id_hash: None, + flags: 0, + seq, + } +} + +pub fn ws_fill_to_event(f: &WsFill, seq: u64) -> MarketEvent { + let side = match f.side.as_str() { + "yes" => Some(Side::Bid), + "no" => Some(Side::Ask), + _ => None, + }; + MarketEvent { + event_id: event_id_from_str(&f.order_id), + ts_exchange_ns: ms_to_ns(f.ts), + ts_ingest_ns: now_ns(), + venue_id: KALSHI_VENUE_ID, + symbol_id: symbol_id_for(&f.market_ticker), + event_type: EventType::Trade, + side, + price_fp: cents_to_fp(f.yes_price.unwrap_or(0)), + qty_fp: f.count.saturating_mul(KALSHI_PRICE_FP_SCALE), + order_id_hash: Some(event_id_from_str(&f.order_id)), + participant_id_hash: None, + flags: 1, // flag bit 0 = own-fill + seq, + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::models::{KalshiTrade, OrderbookSnapshot, WsOrderbookDelta}; + + #[test] + fn symbol_id_is_deterministic_and_case_insensitive() { + assert_eq!(symbol_id_for("FED-DEC23"), symbol_id_for("fed-dec23")); + assert_ne!(symbol_id_for("FED-DEC23"), symbol_id_for("FED-DEC24")); + } + + #[test] + fn cents_to_fp_scales() { + assert_eq!(cents_to_fp(24), 24_000_000); + assert_eq!(cents_to_fp(0), 0); + } + + #[test] + fn trade_to_event_maps_side_and_price() { + let t = KalshiTrade { + ticker: "FED-DEC23".into(), + trade_id: "trade-abc".into(), + yes_price: Some(34), + no_price: Some(66), + count: 5, + taker_side: Some("yes".into()), + created_time: "2026-04-20T18:00:00Z".into(), + }; + let e = trade_to_event(&t, 7).unwrap(); + assert_eq!(e.event_type, EventType::Trade); + assert_eq!(e.side, Some(Side::Bid)); + assert_eq!(e.venue_id, KALSHI_VENUE_ID); + assert_eq!(e.price_fp, cents_to_fp(34)); + assert_eq!(e.qty_fp, 5 * KALSHI_PRICE_FP_SCALE); + assert_eq!(e.seq, 7); + assert_ne!(e.event_id, [0u8; 16], "trade id must seed event_id"); + } + + #[test] + fn orderbook_levels_expanded() { + let ob = OrderbookSnapshot { + yes: vec![[24, 100], [23, 200]], + no: vec![[76, 150]], + }; + let events = orderbook_to_events("FED-DEC23", 1_000_000_000, &ob, 0); + assert_eq!(events.len(), 3); + assert_eq!(events[0].side, Some(Side::Bid)); + assert_eq!(events[0].price_fp, cents_to_fp(24)); + assert_eq!(events[0].qty_fp, 100 * KALSHI_PRICE_FP_SCALE); + assert_eq!(events[2].side, Some(Side::Ask)); + // Seq monotonic. + assert_eq!(events[0].seq, 0); + assert_eq!(events[2].seq, 2); + } + + #[test] + fn ws_delta_positive_is_new_order_negative_is_cancel() { + let add = WsOrderbookDelta { + market_ticker: "FED-DEC23".into(), + side: "yes".into(), + price: 24, + delta: 5, + ts: Some(1_700_000_000_000), + }; + let remove = WsOrderbookDelta { + market_ticker: "FED-DEC23".into(), + side: "no".into(), + price: 76, + delta: -3, + ts: None, + }; + assert_eq!(ws_orderbook_delta_to_event(&add, 0).event_type, EventType::NewOrder); + assert_eq!( + ws_orderbook_delta_to_event(&remove, 1).event_type, + EventType::CancelOrder + ); + // qty always positive. + assert_eq!(ws_orderbook_delta_to_event(&remove, 1).qty_fp, 3 * KALSHI_PRICE_FP_SCALE); + } +} diff --git a/crates/ruvector-kalshi/src/rest.rs b/crates/ruvector-kalshi/src/rest.rs new file mode 100644 index 00000000..b55121f4 --- /dev/null +++ b/crates/ruvector-kalshi/src/rest.rs @@ -0,0 +1,182 @@ +//! Kalshi REST client. All authenticated endpoints sign the request using +//! [`crate::auth::Signer`] and propagate typed errors. +//! +//! # Live-trade gate +//! +//! [`RestClient::post_order`] is the only method that can move money and it +//! refuses to run unless the `KALSHI_ENABLE_LIVE` environment variable is +//! set to `1`. Any other value (including unset) returns an error without +//! making the HTTP call. This is a belt-and-braces backstop on top of any +//! strategy-level `RiskGate`. + +use crate::auth::Signer; +use crate::models::{ + Market, MarketsResponse, NewOrder, OrderAck, OrderbookResponse, OrderbookSnapshot, +}; +use crate::{KalshiError, Result}; + +#[derive(Clone)] +pub struct RestClient { + base_url: String, + signer: Signer, + http: reqwest::Client, +} + +impl RestClient { + pub fn new(base_url: impl Into, signer: Signer) -> Result { + let http = reqwest::Client::builder() + .user_agent("ruvector-kalshi/0.1") + .timeout(std::time::Duration::from_secs(10)) + .build()?; + Ok(Self { + base_url: base_url.into(), + signer, + http, + }) + } + + /// Path used in the signature must be the full `/trade-api/v2/...` path, + /// not the host-relative fragment, per Kalshi's spec. + fn sig_path_for(&self, path: &str) -> String { + // `base_url` already ends with `/trade-api/v2`; the sig path is the + // full URL path component. + let url = url_join(&self.base_url, path); + match reqwest::Url::parse(&url) { + Ok(u) => u.path().to_string(), + Err(_) => path.to_string(), + } + } + + async fn send serde::Deserialize<'de>>( + &self, + method: reqwest::Method, + path: &str, + body: Option<&impl serde::Serialize>, + ) -> Result { + let url = url_join(&self.base_url, path); + let sig_path = self.sig_path_for(path); + let headers = self.signer.sign_now(method.as_str(), &sig_path); + + let mut rb = self.http.request(method, &url); + rb = headers.apply(rb); + if let Some(b) = body { + rb = rb.json(b); + } + + let resp = rb.send().await?; + let status = resp.status(); + if !status.is_success() { + let body = resp.text().await.unwrap_or_default(); + return Err(KalshiError::Api { + status: status.as_u16(), + body, + }); + } + let parsed = resp.json::().await?; + Ok(parsed) + } + + pub async fn list_markets(&self, status: Option<&str>) -> Result> { + let path = match status { + Some(s) => format!("/markets?status={s}"), + None => "/markets".into(), + }; + let resp: MarketsResponse = self.send(reqwest::Method::GET, &path, NO_BODY).await?; + Ok(resp.markets) + } + + pub async fn orderbook(&self, ticker: &str) -> Result { + let path = format!("/markets/{ticker}/orderbook"); + let resp: OrderbookResponse = self.send(reqwest::Method::GET, &path, NO_BODY).await?; + Ok(resp.orderbook) + } + + /// Place a new order. Refuses to run unless `KALSHI_ENABLE_LIVE=1`. + pub async fn post_order(&self, order: &NewOrder) -> Result { + if std::env::var("KALSHI_ENABLE_LIVE").ok().as_deref() != Some("1") { + return Err(KalshiError::Api { + status: 0, + body: "live trading disabled (set KALSHI_ENABLE_LIVE=1 to enable)".into(), + }); + } + self.send(reqwest::Method::POST, "/portfolio/orders", Some(order)) + .await + } +} + +const NO_BODY: Option<&()> = None; + +fn url_join(base: &str, path: &str) -> String { + if path.starts_with("http://") || path.starts_with("https://") { + return path.to_string(); + } + let b = base.trim_end_matches('/'); + let p = if path.starts_with('/') { path.to_string() } else { format!("/{path}") }; + format!("{b}{p}") +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::auth::Signer; + use rsa::pkcs1::EncodeRsaPrivateKey; + use rsa::RsaPrivateKey; + + fn test_signer() -> Signer { + let mut rng = rand::thread_rng(); + let key = RsaPrivateKey::new(&mut rng, 2048).unwrap(); + let pem = key.to_pkcs1_pem(rsa::pkcs1::LineEnding::LF).unwrap().to_string(); + Signer::from_pem("test-key", &pem).unwrap() + } + + #[test] + fn url_join_handles_trailing_and_leading_slashes() { + assert_eq!( + url_join("https://example.com/trade-api/v2/", "/markets"), + "https://example.com/trade-api/v2/markets" + ); + assert_eq!( + url_join("https://example.com/trade-api/v2", "markets"), + "https://example.com/trade-api/v2/markets" + ); + } + + #[test] + fn sig_path_uses_full_url_path() { + let client = RestClient::new( + "https://trading-api.kalshi.com/trade-api/v2", + test_signer(), + ) + .unwrap(); + let p = client.sig_path_for("/markets"); + assert_eq!(p, "/trade-api/v2/markets"); + } + + #[tokio::test] + async fn post_order_refuses_without_live_flag() { + // Ensure the flag is not set. + std::env::remove_var("KALSHI_ENABLE_LIVE"); + let client = RestClient::new( + "https://trading-api.kalshi.com/trade-api/v2", + test_signer(), + ) + .unwrap(); + let order = NewOrder { + ticker: "X".into(), + action: crate::models::OrderAction::Buy, + side: crate::models::OrderSide::Yes, + order_type: crate::models::OrderType::Limit, + count: 1, + yes_price: Some(24), + no_price: None, + client_order_id: "t-1".into(), + }; + let err = client.post_order(&order).await.unwrap_err(); + match err { + KalshiError::Api { status: 0, body } => { + assert!(body.contains("live trading disabled")); + } + other => panic!("expected Api status=0 error, got {other:?}"), + } + } +} diff --git a/crates/ruvector-kalshi/src/secrets.rs b/crates/ruvector-kalshi/src/secrets.rs new file mode 100644 index 00000000..9c5fbcf3 --- /dev/null +++ b/crates/ruvector-kalshi/src/secrets.rs @@ -0,0 +1,219 @@ +//! Secret loading for Kalshi credentials. +//! +//! Three sources, checked in order: +//! 1. `KALSHI_SECRET_SOURCE=local` → read PEM + API key from local files +//! (dev only). Paths default to `.kalshi/kalshi.pem` and +//! `.kalshi/api-key.txt` under the current working directory. +//! 2. `KALSHI_SECRET_SOURCE=env` → read from `KALSHI_API_KEY` and +//! `KALSHI_PRIVATE_KEY_PEM` env vars. +//! 3. Default: shell out to `gcloud secrets versions access latest`. +//! +//! The result is cached in memory for 5 minutes so hot REST paths do not +//! re-invoke `gcloud` per request. + +use std::path::PathBuf; +use std::sync::Arc; +use std::time::{Duration, Instant}; + +use tokio::sync::Mutex; + +use crate::{KalshiError, Result}; + +/// Loaded Kalshi credentials. The PEM is kept as a `String` rather than a +/// parsed key so the [`crate::auth::Signer`] stays the single owner of the +/// private key material. +#[derive(Clone)] +pub struct Credentials { + pub api_key: String, + pub private_key_pem: String, + pub api_url: String, +} + +impl std::fmt::Debug for Credentials { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Credentials") + .field("api_key_len", &self.api_key.len()) + .field("pem_len", &self.private_key_pem.len()) + .field("api_url", &self.api_url) + .finish() + } +} + +#[derive(Clone)] +pub struct SecretLoader { + project: String, + ttl: Duration, + inner: Arc>>, +} + +impl SecretLoader { + pub fn new(project: impl Into) -> Self { + Self { + project: project.into(), + ttl: Duration::from_secs(300), + inner: Arc::new(Mutex::new(None)), + } + } + + /// Override the cache TTL. Primarily for tests. + pub fn with_ttl(mut self, ttl: Duration) -> Self { + self.ttl = ttl; + self + } + + /// Load credentials, using the in-memory cache if fresh. + pub async fn load(&self) -> Result { + { + let guard = self.inner.lock().await; + if let Some((creds, loaded_at)) = guard.as_ref() { + if loaded_at.elapsed() < self.ttl { + return Ok(creds.clone()); + } + } + } + let creds = self.load_fresh().await?; + let mut guard = self.inner.lock().await; + *guard = Some((creds.clone(), Instant::now())); + Ok(creds) + } + + async fn load_fresh(&self) -> Result { + match std::env::var("KALSHI_SECRET_SOURCE").as_deref() { + Ok("local") => self.load_local().await, + Ok("env") => self.load_env(), + _ => self.load_gcloud().await, + } + } + + fn load_env(&self) -> Result { + let api_key = std::env::var("KALSHI_API_KEY") + .map_err(|_| KalshiError::Secret("KALSHI_API_KEY not set".into()))?; + let private_key_pem = std::env::var("KALSHI_PRIVATE_KEY_PEM") + .map_err(|_| KalshiError::Secret("KALSHI_PRIVATE_KEY_PEM not set".into()))?; + let api_url = std::env::var("KALSHI_API_URL") + .unwrap_or_else(|_| crate::KALSHI_API_URL.to_string()); + Ok(Credentials { + api_key, + private_key_pem, + api_url, + }) + } + + async fn load_local(&self) -> Result { + let pem_path = std::env::var("KALSHI_PEM_PATH") + .map(PathBuf::from) + .unwrap_or_else(|_| PathBuf::from(".kalshi/kalshi.pem")); + let api_key_path = std::env::var("KALSHI_API_KEY_PATH") + .map(PathBuf::from) + .unwrap_or_else(|_| PathBuf::from(".kalshi/api-key.txt")); + + let private_key_pem = tokio::fs::read_to_string(&pem_path) + .await + .map_err(|e| KalshiError::Secret(format!("reading {pem_path:?}: {e}")))?; + + let api_key = if let Ok(key) = std::env::var("KALSHI_API_KEY") { + key + } else { + tokio::fs::read_to_string(&api_key_path) + .await + .map(|s| s.trim().to_string()) + .map_err(|e| { + KalshiError::Secret(format!( + "reading {api_key_path:?} (or set KALSHI_API_KEY): {e}" + )) + })? + }; + + let api_url = std::env::var("KALSHI_API_URL") + .unwrap_or_else(|_| crate::KALSHI_API_URL.to_string()); + + Ok(Credentials { + api_key, + private_key_pem, + api_url, + }) + } + + async fn load_gcloud(&self) -> Result { + let api_key = gcloud_secret(&self.project, "KALSHI_API_KEY").await?; + let private_key_pem = gcloud_secret(&self.project, "KALSHI_PRIVATE_KEY_PEM").await?; + let api_url = match gcloud_secret(&self.project, "KALSHI_API_URL").await { + Ok(s) => s, + Err(_) => crate::KALSHI_API_URL.to_string(), + }; + Ok(Credentials { + api_key: api_key.trim().to_string(), + private_key_pem, + api_url: api_url.trim().to_string(), + }) + } +} + +async fn gcloud_secret(project: &str, name: &str) -> Result { + let output = tokio::process::Command::new("gcloud") + .args([ + "secrets", + "versions", + "access", + "latest", + "--secret", + name, + "--project", + project, + ]) + .output() + .await + .map_err(|e| KalshiError::Secret(format!("spawning gcloud for {name}: {e}")))?; + + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr).into_owned(); + return Err(KalshiError::Secret(format!( + "gcloud secrets access {name} failed: {stderr}" + ))); + } + let val = String::from_utf8(output.stdout).map_err(|e| { + KalshiError::Secret(format!("gcloud output for {name} is not utf-8: {e}")) + })?; + Ok(val) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn env_source_reads_vars() { + // SAFETY: env mutation in tests is serialized by running this test + // in isolation from other env-touching tests in this module (only one). + std::env::set_var("KALSHI_SECRET_SOURCE", "env"); + std::env::set_var("KALSHI_API_KEY", "k-env-1"); + std::env::set_var("KALSHI_PRIVATE_KEY_PEM", "PEM-PLACEHOLDER"); + std::env::remove_var("KALSHI_API_URL"); + + let loader = SecretLoader::new("test").with_ttl(Duration::from_millis(10)); + let creds = loader.load().await.unwrap(); + assert_eq!(creds.api_key, "k-env-1"); + assert_eq!(creds.private_key_pem, "PEM-PLACEHOLDER"); + assert_eq!(creds.api_url, crate::KALSHI_API_URL); + + // Cache hit returns the same value immediately. + let creds2 = loader.load().await.unwrap(); + assert_eq!(creds.api_key, creds2.api_key); + + std::env::remove_var("KALSHI_SECRET_SOURCE"); + std::env::remove_var("KALSHI_API_KEY"); + std::env::remove_var("KALSHI_PRIVATE_KEY_PEM"); + } + + #[test] + fn debug_does_not_leak_pem() { + let c = Credentials { + api_key: "super-secret-1234567890".into(), + private_key_pem: "-----BEGIN RSA PRIVATE KEY----- secret".into(), + api_url: crate::KALSHI_API_URL.into(), + }; + let s = format!("{c:?}"); + assert!(!s.contains("super-secret")); + assert!(!s.contains("BEGIN RSA")); + } +} diff --git a/crates/ruvector-kalshi/src/ws.rs b/crates/ruvector-kalshi/src/ws.rs new file mode 100644 index 00000000..9926a075 --- /dev/null +++ b/crates/ruvector-kalshi/src/ws.rs @@ -0,0 +1,129 @@ +//! WebSocket ingest scaffolding for Kalshi's market-data stream. +//! +//! This module intentionally keeps the network layer *pluggable*. The +//! default build does not pull in `tokio-tungstenite` — callers feed raw +//! JSON frames into [`FeedDecoder::decode`], which produces +//! [`neural_trader_core::MarketEvent`] records ready for the coherence / +//! replay pipelines. +//! +//! When a tungstenite-based transport is needed, add +//! `tokio-tungstenite` to this crate and wire `connect` + `pump` inside a +//! `#[cfg(feature = "tungstenite")]` module. Keeping the decoder feature- +//! free means unit tests cover the parsing path without requiring a +//! network runtime. + +use neural_trader_core::MarketEvent; + +use crate::models::{WsEnvelope, WsMessage}; +use crate::normalize::{ + ws_fill_to_event, ws_orderbook_delta_to_event, ws_orderbook_to_events, ws_ticker_to_event, + ws_trade_to_event, +}; +use crate::Result; + +/// Decodes raw JSON WebSocket frames into canonical `MarketEvent`s. +/// +/// Stateful only in its monotonically increasing sequence counter, so it +/// is cheap to construct per connection. +#[derive(Debug, Default)] +pub struct FeedDecoder { + next_seq: u64, +} + +impl FeedDecoder { + pub fn new() -> Self { + Self::default() + } + + /// Decode one JSON frame. Unknown message kinds yield an empty vector + /// (not an error) so forward-compatible payloads don't kill the feed. + pub fn decode(&mut self, frame: &str) -> Result> { + let env: WsEnvelope = serde_json::from_str(frame)?; + let msg = WsMessage::from_envelope(env)?; + Ok(self.decode_msg(&msg)) + } + + fn decode_msg(&mut self, msg: &WsMessage) -> Vec { + match msg { + WsMessage::Ticker(t) => vec![self.tick(|s| ws_ticker_to_event(t, s))], + WsMessage::Trade(t) => vec![self.tick(|s| ws_trade_to_event(t, s))], + WsMessage::OrderbookSnapshot(ob) => { + let events = ws_orderbook_to_events(ob, self.next_seq); + self.next_seq = self + .next_seq + .wrapping_add(events.len() as u64); + events + } + WsMessage::OrderbookDelta(d) => { + vec![self.tick(|s| ws_orderbook_delta_to_event(d, s))] + } + WsMessage::Fill(f) => vec![self.tick(|s| ws_fill_to_event(f, s))], + WsMessage::Other => Vec::new(), + } + } + + fn tick MarketEvent>(&mut self, f: F) -> MarketEvent { + let seq = self.next_seq; + self.next_seq = self.next_seq.wrapping_add(1); + f(seq) + } + + pub fn next_seq(&self) -> u64 { + self.next_seq + } +} + +#[cfg(test)] +mod tests { + use super::*; + use neural_trader_core::EventType; + + #[test] + fn decodes_ticker_then_trade_monotonically() { + let mut dec = FeedDecoder::new(); + let events = dec + .decode(r#"{"type":"ticker","msg":{"market_ticker":"X","yes_bid":10,"yes_ask":12}}"#) + .unwrap(); + assert_eq!(events.len(), 1); + assert_eq!(events[0].seq, 0); + assert_eq!(events[0].event_type, EventType::VenueStatus); + + let events = dec + .decode(r#"{"type":"trade","msg":{"market_ticker":"X","yes_price":11,"count":3}}"#) + .unwrap(); + assert_eq!(events.len(), 1); + assert_eq!(events[0].seq, 1); + assert_eq!(events[0].event_type, EventType::Trade); + } + + #[test] + fn decodes_snapshot_with_multiple_levels() { + let mut dec = FeedDecoder::new(); + let frame = r#"{ + "type":"orderbook_snapshot", + "msg":{ + "market_ticker":"X", + "yes":[[24,100],[23,200]], + "no":[[76,150]] + } + }"#; + let events = dec.decode(frame).unwrap(); + assert_eq!(events.len(), 3); + assert_eq!(dec.next_seq(), 3); + } + + #[test] + fn unknown_message_is_silent() { + let mut dec = FeedDecoder::new(); + let events = dec.decode(r#"{"type":"heartbeat","msg":{}}"#).unwrap(); + assert!(events.is_empty()); + assert_eq!(dec.next_seq(), 0); + } + + #[test] + fn malformed_json_returns_error() { + let mut dec = FeedDecoder::new(); + let err = dec.decode("not json").unwrap_err(); + assert!(matches!(err, crate::KalshiError::Decode(_))); + } +} diff --git a/docs/adr/ADR-151-kalshi-neural-trader-integration.md b/docs/adr/ADR-151-kalshi-neural-trader-integration.md new file mode 100644 index 00000000..bdcbd543 --- /dev/null +++ b/docs/adr/ADR-151-kalshi-neural-trader-integration.md @@ -0,0 +1,253 @@ +# ADR-151: Kalshi Integration via RuVector Neural Trader + +## Status + +Proposed + +## Date + +2026-04-20 + +## Context + +[Kalshi](https://kalshi.com) is a CFTC-regulated US event-contract exchange exposing a signed REST + WebSocket API at `https://trading-api.kalshi.com/trade-api/v2`. Contracts are binary (YES/NO) with prices in cents (0–100), settling to $1 on resolution. This is a natural target for the **RuVector Neural Trader** stack, which already models probabilistic event markets in `examples/neural-trader/specialized/prediction-markets.js` (Polymarket/Kalshi/PredictIt) but lacks a live Rust execution path against the real Kalshi API. + +Existing building blocks in the ruvector workspace we will reuse verbatim: + +| Crate / Module | Role | Reuse | +|---|---|---| +| `crates/neural-trader-core` | Canonical `MarketEvent`, `EventType`, `Side`, ingest traits | Normalize Kalshi ticker/trade/orderbook into `MarketEvent` | +| `crates/neural-trader-coherence` | Coherence / regime detection across markets | Cross-market arbitrage and regime-gated sizing | +| `crates/neural-trader-replay` | Deterministic replay for backtests | Replay captured Kalshi streams against strategies | +| `crates/ruvector-attention` / `ruvector-attn-mincut` | Attention + MinCut for market graph clustering | Group correlated Kalshi series (e.g., all Fed-rate strikes) | +| `crates/ruvector-cnn` | Temporal convnet over order-book imbalance | Short-horizon tick prediction on Kalshi trades feed | +| `crates/ruvllm` / RuvLtra | Local inference for news/event summarization → probability prior | Provide `modelProbability` input analogous to the JS example | +| `crates/mcp-brain-server` + pi.ruv.io brain | Persistent knowledge store for contracts, resolutions, past edges | Long-memory of market behavior and strategy outcomes | + +Kalshi authentication is **RSA-PSS-SHA256** signed requests, *not* HMAC. Each request requires: + +``` +KALSHI-ACCESS-KEY: +KALSHI-ACCESS-TIMESTAMP: +KALSHI-ACCESS-SIGNATURE: base64( RSA-PSS-SHA256( PEM_PRIVATE_KEY, timestamp + method + path ) ) +``` + +Because of this, secrets must be held server-side in GCS and never serialized into logs, traces, or the brain. + +### Credential state (already provisioned) + +Created in Google Cloud project `ruv-dev` (ADR-150 context): + +| Secret | Source | Access | +|---|---|---| +| `KALSHI_API_KEY` | UUID, 16 bytes | `gcloud secrets versions access latest --secret=KALSHI_API_KEY --project=ruv-dev` | +| `KALSHI_PRIVATE_KEY_PEM` | RSA private key (1679 bytes) | same pattern | +| `KALSHI_API_URL` | `https://trading-api.kalshi.com/trade-api/v2` | same pattern | + +Local development copy is at `/home/ruvultra/projects/ruvector/.kalshi/kalshi.pem`. Both `.kalshi/` and `*.pem` are already in `.gitignore`; no key material is ever committed. + +## Decision + +Introduce a new Rust crate **`ruvector-kalshi`** that (a) authenticates against the live Kalshi API using RSA-PSS-SHA256, (b) normalizes Kalshi events into `neural_trader_core::MarketEvent`, and (c) exposes a strategy runtime that composes attention/mincut/cnn signals with a ruvllm-supplied prior to quote and execute on binary contracts — with pi.ruv.io brain as the long-term memory. + +No Python. No JS shelling. All three layers (auth, stream, strategy) are pure Rust; WASM exposure is optional via a thin `ruvector-kalshi-wasm` sibling for dashboards. + +### Workspace layout + +``` +crates/ +├── ruvector-kalshi/ # new — auth, REST, WS, secrets +│ ├── src/ +│ │ ├── lib.rs +│ │ ├── auth.rs # RSA-PSS signer +│ │ ├── rest.rs # GET/POST wrappers +│ │ ├── ws.rs # tokio-tungstenite feed +│ │ ├── models.rs # Kalshi DTOs (Market, Event, Order, Fill) +│ │ ├── normalize.rs # Kalshi → neural_trader_core::MarketEvent +│ │ └── secrets.rs # gcloud + local-PEM loader +│ └── Cargo.toml +├── ruvector-kalshi-wasm/ # optional — browser-safe subset (read-only) +└── neural-trader-strategies/ # new — reusable strategy trait + Kalshi strats + └── src/ + ├── lib.rs # Strategy trait, PositionSizer, RiskGate + ├── expected_value.rs # EV+Kelly fractional sizing + ├── coherence_arb.rs # cross-market arb via neural-trader-coherence + └── attn_scalper.rs # attention+cnn short-horizon scalper + +examples/ +└── kalshi/ # new — runnable demos + ├── list-markets.rs + ├── stream-orderbook.rs + ├── paper-trade.rs + └── live-trade.rs # gated behind KALSHI_ENABLE_LIVE=1 +``` + +### Architecture + +``` +┌────────────────────────────────────────────────────────────────────┐ +│ ruvector-kalshi (Rust) │ +│ │ +│ secrets::load() ──► gcloud secrets versions access (cached 5min) │ +│ │ │ +│ ▼ │ +│ auth::Signer { api_key, rsa_private_key } │ +│ │ sign(ts, method, path) → KALSHI-ACCESS-SIGNATURE │ +│ ▼ │ +│ rest::Client ──► GET /markets, /portfolio, /events │ +│ POST /orders │ +│ ws::Feed ──► wss: ticker/orderbook/trade/fill │ +│ │ │ +│ ▼ │ +│ normalize::to_market_event() ──► neural_trader_core::MarketEvent │ +└────────────────────────────────┬───────────────────────────────────┘ + │ + ▼ +┌────────────────────────────────────────────────────────────────────┐ +│ neural-trader-strategies │ +│ │ +│ ┌─────────────┐ ┌────────────────┐ ┌──────────────┐ │ +│ │ EV + Kelly │ │ Coherence Arb │ │ Attn Scalper │ │ +│ │ (prior from │ │ (mincut groups │ │ (cnn over │ │ +│ │ ruvllm) │◄──┤ via attention)│◄──┤ imbalance) │ │ +│ └─────┬───────┘ └───────┬────────┘ └──────┬───────┘ │ +│ │ │ │ │ +│ └──► RiskGate ◄─────┴───────────────────┘ │ +│ (position cap, daily-loss kill, vol regime) │ +└────────────────────────────────┬───────────────────────────────────┘ + │ + ▼ +┌────────────────────────────────────────────────────────────────────┐ +│ Execution + Memory │ +│ │ +│ Paper mode → neural-trader-replay ledger │ +│ Live mode → ruvector-kalshi::rest::post_order │ +│ │ +│ pi.ruv.io brain: store contract outcomes, strategy P&L, market │ +│ resolutions as MarketEvent-derived memories (category: pattern). │ +└────────────────────────────────────────────────────────────────────┘ +``` + +### Authentication detail + +```rust +// crates/ruvector-kalshi/src/auth.rs (sketch) +use rsa::{RsaPrivateKey, pkcs1::DecodeRsaPrivateKey, + pss::SigningKey, signature::RandomizedSigner}; +use sha2::Sha256; +use base64::{engine::general_purpose::STANDARD, Engine}; + +pub struct Signer { + api_key: String, + signing_key: SigningKey, +} + +impl Signer { + pub fn sign(&self, ts_ms: u64, method: &str, path: &str) -> (String, String, String) { + let msg = format!("{ts_ms}{method}{path}"); + let mut rng = rand::thread_rng(); + let sig = self.signing_key.sign_with_rng(&mut rng, msg.as_bytes()); + let sig_b64 = STANDARD.encode(sig.to_bytes()); + (self.api_key.clone(), ts_ms.to_string(), sig_b64) + } +} +``` + +Dependencies: `rsa = "0.9"`, `sha2 = "0.10"`, `base64 = "0.22"`, `reqwest = { version = "0.12", features = ["json", "rustls-tls"] }`, `tokio-tungstenite`, `tokio`. + +### Secret loading (no hardcoding, no .env commits) + +```rust +// crates/ruvector-kalshi/src/secrets.rs (sketch) +pub enum SecretSource { Gcloud, LocalPem(PathBuf), Env } + +pub async fn load() -> Result { + match env::var("KALSHI_SECRET_SOURCE").as_deref() { + Ok("local") => load_local(), // dev only + _ => load_gcloud("ruv-dev").await, // default + } +} +``` + +`load_gcloud` shells out to `gcloud secrets versions access latest --secret=...` with a 5-minute in-process cache. Production deploys will use Workload Identity on Cloud Run so no gcloud CLI is needed — Secret Manager SDK direct read. + +### Risk gate (cheap, mandatory) + +The `RiskGate` wraps every strategy and enforces, before any `post_order`: + +1. **Max single-position notional**: default 10% of cash. +2. **Daily loss kill**: hard stop at −3% of starting balance (configurable). +3. **Concentration cap**: ≤ 40% across one neural-trader-coherence cluster. +4. **Min edge**: ≥ 300 bps over mid after fees (Kalshi takes 2%). +5. **Live gate**: `KALSHI_ENABLE_LIVE=1` required; absent → paper ledger only. + +## Consequences + +### Positive + +- First production path from ruvector neural capabilities to a regulated live venue. +- Reuses existing Rust crates — no duplication, no Python sidecars. +- Kalshi events land in the canonical `MarketEvent` shape so they immediately flow through existing coherence, replay, and attention modules. +- Secrets are centrally managed in GCS; no key material in the repo, logs, or the brain. +- pi.ruv.io accumulates a long-memory of event-market behavior, feeding back into future strategies (ADR-149/150 synergy). + +### Negative / risks + +- **Regulatory**: Kalshi is US-only; strategy code must remain paper-only unless the operator has verified a Kalshi account. Live trading is gated behind an env flag and manual credential load. +- **Key custody**: RSA private key in GCS means any party with `ruv-dev` Secret Accessor can trade. Mitigation: restrict IAM to a single service account; rotate PEM quarterly; consider KMS-wrapped storage later. +- **Latency**: gcloud CLI shell-out is ~200 ms — acceptable at startup, unacceptable per-request. We cache and never re-read inside the hot path. +- **Rate limits**: Kalshi enforces per-endpoint limits. The `rest::Client` must include a token-bucket limiter. Replay tests catch 429 regressions. +- **Model drift**: ruvllm priors can degrade silently. The `neural-trader-replay` suite must run nightly in CI with the previous week's captured feed and compare P&L against a baseline. + +### Neutral + +- Adds ~2 crates to the workspace — consistent with existing `neural-trader-*` pattern. +- Optional wasm sibling is not blocking; it can come later when a dashboard needs read-only market access. + +## Implementation Plan + +**Phase 1 — Auth & REST (week 1)** +- `ruvector-kalshi` crate skeleton +- `auth::Signer` + round-trip test against Kalshi `/exchange/status` +- `rest::Client` with GET `/markets`, `/events`, `/portfolio` +- `examples/kalshi/list-markets.rs` + +**Phase 2 — Stream & normalize (week 2)** +- `ws::Feed` subscribing to ticker, trade, orderbook channels +- `normalize::to_market_event` with unit tests covering all Kalshi event kinds +- `examples/kalshi/stream-orderbook.rs` dumping `MarketEvent` JSONL + +**Phase 3 — Strategy runtime & paper trade (week 3)** +- `neural-trader-strategies` crate with `Strategy` trait + `RiskGate` +- `ExpectedValueKelly` strategy wired to ruvllm prior +- `CoherenceArb` strategy wired to `neural-trader-coherence` +- `AttentionScalper` wired to `ruvector-cnn` +- `examples/kalshi/paper-trade.rs` replays captured feed → ledger + +**Phase 4 — Live & memory (week 4)** +- `rest::post_order`, cancel, amend +- Brain integration: every resolution and P&L event → `brain_share` +- `examples/kalshi/live-trade.rs` behind `KALSHI_ENABLE_LIVE=1` +- Nightly replay benchmark in CI + +## Testing Strategy + +- **Unit**: auth signature matches Kalshi reference vectors; normalize round-trips against fixture feeds. +- **Integration**: hit Kalshi `/exchange/status` in a gated CI job (needs a sandbox API key). +- **Replay**: `neural-trader-replay` replays captured `.jsonl` against each strategy; asserts P&L envelope. +- **Chaos**: drop WS connection, 429 storms, expired timestamps — all must fail closed (no orders). +- **Security**: `aidefence_scan` over any fields we send to the brain to strip accidental PII. + +## Alternatives Considered + +1. **Extend the JS example directly** — rejected; CLAUDE.md mandates Rust for all ruOS components, and we lose access to the Rust neural-trader crates. +2. **Generic "exchange" crate with Kalshi as a driver** — deferred; premature abstraction. Ship Kalshi first, extract the trait once a second venue (Polymarket) is scoped. +3. **Store PEM in the repo encrypted with age/sops** — rejected; GCS Secret Manager is the existing pattern (ADR-150, `CRATES_API_KEY`, etc.) and plays well with Cloud Run Workload Identity. + +## References + +- ADR-084: Neural Trader foundation (referenced by `neural-trader-core::lib.rs`) +- ADR-149: Brain performance optimizations +- ADR-150: π Brain + RuvLtra via Tailscale +- Kalshi API docs: https://trading-api.readme.io/reference/getting-started +- Existing example: `examples/neural-trader/specialized/prediction-markets.js`