feat(kalshi): ruvector-kalshi + neural-trader-strategies (ADR-151)

New crate ruvector-kalshi: RSA-PSS-SHA256 signer (PKCS#1/#8), GCS/local/env
secret loader with 5-min cache, typed REST + WS DTOs, Kalshi→MarketEvent
normalizer (reuses neural-trader-core), transport-free FeedDecoder,
reqwest-backed REST client with live-trade env gate, and an offline
sign+verify example that validates against the real PEM.

New crate neural-trader-strategies: venue-agnostic Strategy trait, Intent
type, RiskGate (position cap, daily-loss kill, concentration, min-edge,
live gate, cash check), and ExpectedValueKelly prior-driven strategy.

36 unit tests pass across both crates. End-to-end offline validation
confirmed against the real Kalshi PEM via both local and GCS sources.

Co-Authored-By: claude-flow <ruv@ruv.net>
This commit is contained in:
ruvnet 2026-04-20 15:23:46 -04:00
parent b02635cd25
commit ff0f5bc4fa
18 changed files with 2459 additions and 0 deletions

2
.gitignore vendored
View file

@ -135,3 +135,5 @@ examples/real-eeg-analysis/data/*.edf
examples/real-eeg-multi-seizure/data/*.edf
agentdb.rvf
agentdb.rvf.lock
.kalshi

31
Cargo.lock generated
View file

@ -5921,6 +5921,17 @@ dependencies = [
"serde_json",
]
[[package]]
name = "neural-trader-strategies"
version = "0.1.0"
dependencies = [
"anyhow",
"neural-trader-core",
"serde",
"serde_json",
"thiserror 2.0.18",
]
[[package]]
name = "neural-trader-wasm"
version = "0.1.1"
@ -8541,6 +8552,7 @@ dependencies = [
"pkcs1",
"pkcs8",
"rand_core 0.6.4",
"sha2 0.10.9",
"signature",
"spki",
"subtle",
@ -9784,6 +9796,25 @@ dependencies = [
"thiserror 2.0.18",
]
[[package]]
name = "ruvector-kalshi"
version = "0.1.0"
dependencies = [
"anyhow",
"base64 0.22.1",
"chrono",
"neural-trader-core",
"rand 0.8.5",
"reqwest 0.12.28",
"rsa",
"serde",
"serde_json",
"sha2 0.10.9",
"thiserror 2.0.18",
"tokio",
"tracing",
]
[[package]]
name = "ruvector-learning-wasm"
version = "0.1.0"

View file

@ -106,6 +106,9 @@ members = [
"crates/neural-trader-coherence",
"crates/neural-trader-replay",
"crates/neural-trader-wasm",
# Kalshi integration (ADR-151)
"crates/ruvector-kalshi",
"crates/neural-trader-strategies",
# RuVix Cognition Kernel (organized under crates/ruvix/)
"crates/ruvix/crates/types",
"crates/ruvix/crates/region",

View file

@ -0,0 +1,17 @@
[package]
name = "neural-trader-strategies"
version = "0.1.0"
edition = "2021"
description = "Venue-agnostic strategy + risk-gate runtime for the RuVector Neural Trader (ADR-151)"
license = "MIT OR Apache-2.0"
publish = false
[dependencies]
neural-trader-core = { path = "../neural-trader-core" }
serde = { workspace = true }
serde_json = { workspace = true }
thiserror = { workspace = true }
anyhow = { workspace = true }
[dev-dependencies]
serde_json = { workspace = true }

View file

@ -0,0 +1,237 @@
//! Expected-Value Kelly strategy.
//!
//! Given a model probability `p` for the YES side of a binary contract
//! and a current market price `m` in cents (0..=100), the canonical
//! fractional Kelly sizing for a YES bet is:
//!
//! edge = p - m / 100
//! kelly = edge / (1 - m / 100)
//!
//! The strategy multiplies `kelly` by a conservative `kelly_fraction`
//! (default 0.25) and by `intent.confidence`, then converts the resulting
//! fraction of cash into a contract count at the current ask.
//!
//! The priors are supplied externally (e.g. by a ruvllm-based news model);
//! this crate simply consumes them via [`ExpectedValueKelly::set_prior`].
//!
//! Strategies are venue-agnostic — they emit [`Intent`]s; the RiskGate
//! plus a venue adapter decide whether to send an order.
use std::collections::HashMap;
use neural_trader_core::{EventType, MarketEvent};
use crate::intent::{Action, Intent, Side};
use crate::Strategy;
/// Per-symbol state tracked by the strategy.
#[derive(Debug, Clone)]
struct SymbolState {
/// Latest YES-side mid in cents.
latest_mid_cents: Option<i64>,
/// External YES-side probability prior, `[0, 1]`.
prior: Option<f64>,
/// Last emission sequence — throttle to at most one intent per event kind.
last_emit_seq: u64,
}
impl Default for SymbolState {
fn default() -> Self {
Self { latest_mid_cents: None, prior: None, last_emit_seq: 0 }
}
}
#[derive(Debug, Clone)]
pub struct ExpectedValueKellyConfig {
/// Kelly fraction (0, 1]. Quarter-Kelly (0.25) is a common default.
pub kelly_fraction: f64,
/// Total bankroll in cents used for sizing.
pub bankroll_cents: i64,
/// Minimum edge in basis points to emit an intent. Defensive — the
/// RiskGate enforces the final bar, but we avoid spamming intents we
/// know will be rejected.
pub min_edge_bps: i64,
/// Attribution name.
pub strategy_name: &'static str,
}
impl Default for ExpectedValueKellyConfig {
fn default() -> Self {
Self {
kelly_fraction: 0.25,
bankroll_cents: 100_000,
min_edge_bps: 100,
strategy_name: "ev-kelly",
}
}
}
#[derive(Debug, Clone, Default)]
pub struct ExpectedValueKelly {
pub config: ExpectedValueKellyConfig,
symbols: HashMap<u32, SymbolState>,
}
impl ExpectedValueKelly {
pub fn new(config: ExpectedValueKellyConfig) -> Self {
Self { config, symbols: HashMap::new() }
}
/// Install or update the probability prior for a symbol. Priors are
/// clamped into `[0.01, 0.99]` to avoid divide-by-zero at the limits.
pub fn set_prior(&mut self, symbol_id: u32, prior: f64) {
let clamped = prior.clamp(0.01, 0.99);
self.symbols.entry(symbol_id).or_default().prior = Some(clamped);
}
fn handle_snapshot(&mut self, event: &MarketEvent) -> Option<Intent> {
// Use BookSnapshot YES-bid as the mid proxy. The normalizer already
// emits separate events per level; we track the most recent bid.
let cents = fp_to_cents(event.price_fp);
let entry = self.symbols.entry(event.symbol_id).or_default();
entry.latest_mid_cents = Some(cents);
self.maybe_emit(event.symbol_id, event.seq)
}
fn handle_ticker_or_trade(&mut self, event: &MarketEvent) -> Option<Intent> {
let cents = fp_to_cents(event.price_fp);
if cents <= 0 {
return None;
}
let entry = self.symbols.entry(event.symbol_id).or_default();
entry.latest_mid_cents = Some(cents);
self.maybe_emit(event.symbol_id, event.seq)
}
fn maybe_emit(&mut self, symbol_id: u32, seq: u64) -> Option<Intent> {
let state = self.symbols.get(&symbol_id)?;
let mid = state.latest_mid_cents?;
let prior = state.prior?;
if mid <= 0 || mid >= 100 {
return None;
}
let m = mid as f64 / 100.0;
let edge = prior - m;
let edge_bps = (edge * 10_000.0).round() as i64;
if edge_bps < self.config.min_edge_bps {
return None;
}
// Quarter-Kelly for YES side: fraction of bankroll = k × edge / (1 - m).
let kelly = (self.config.kelly_fraction * edge / (1.0 - m)).clamp(0.0, 1.0);
let alloc_cents = (self.config.bankroll_cents as f64 * kelly) as i64;
let contracts = alloc_cents / mid.max(1);
if contracts <= 0 {
return None;
}
let entry = self.symbols.get_mut(&symbol_id)?;
entry.last_emit_seq = seq;
Some(Intent {
symbol_id,
side: Side::Yes,
action: Action::Buy,
limit_price_cents: mid,
quantity: contracts,
edge_bps,
confidence: prior,
strategy: self.config.strategy_name,
})
}
}
impl Strategy for ExpectedValueKelly {
fn name(&self) -> &'static str {
self.config.strategy_name
}
fn on_event(&mut self, event: &MarketEvent) -> Option<Intent> {
match event.event_type {
EventType::Trade | EventType::VenueStatus => self.handle_ticker_or_trade(event),
EventType::BookSnapshot => self.handle_snapshot(event),
_ => None,
}
}
}
/// The canonical price-fp scale used by `ruvector-kalshi` is `cents × 1e6`.
/// Reverse that here. If other venues use a different scale and want to
/// share this strategy, they should normalize into the same scale first.
fn fp_to_cents(fp: i64) -> i64 {
fp / 1_000_000
}
#[cfg(test)]
mod tests {
use super::*;
use neural_trader_core::{EventType, MarketEvent, Side as NtSide};
fn snapshot(sym: u32, price_cents: i64, seq: u64) -> MarketEvent {
MarketEvent {
event_id: [0u8; 16],
ts_exchange_ns: 0,
ts_ingest_ns: 0,
venue_id: 1001,
symbol_id: sym,
event_type: EventType::BookSnapshot,
side: Some(NtSide::Bid),
price_fp: price_cents * 1_000_000,
qty_fp: 1_000_000,
order_id_hash: None,
participant_id_hash: None,
flags: 0,
seq,
}
}
#[test]
fn no_prior_no_intent() {
let mut s = ExpectedValueKelly::new(ExpectedValueKellyConfig::default());
assert!(s.on_event(&snapshot(1, 24, 0)).is_none());
}
#[test]
fn fair_price_no_edge() {
let mut s = ExpectedValueKelly::new(ExpectedValueKellyConfig::default());
s.set_prior(1, 0.25);
// Mid exactly at prior → edge ~ 0 bps → no intent.
let intent = s.on_event(&snapshot(1, 25, 0));
assert!(intent.is_none(), "mid==prior should produce no intent");
}
#[test]
fn positive_edge_sizes_under_bankroll() {
let mut s = ExpectedValueKelly::new(ExpectedValueKellyConfig {
kelly_fraction: 0.25,
bankroll_cents: 100_000,
min_edge_bps: 100,
strategy_name: "test",
});
s.set_prior(1, 0.40);
let intent = s.on_event(&snapshot(1, 24, 0)).expect("should emit");
assert_eq!(intent.symbol_id, 1);
assert!(matches!(intent.side, Side::Yes));
assert_eq!(intent.limit_price_cents, 24);
assert!(intent.quantity > 0);
// Notional must not exceed bankroll.
assert!(intent.notional_cents() <= 100_000);
// Edge = (0.40 - 0.24) × 10_000 = 1600 bps.
assert_eq!(intent.edge_bps, 1600);
}
#[test]
fn extreme_prior_is_clamped() {
let mut s = ExpectedValueKelly::new(ExpectedValueKellyConfig::default());
s.set_prior(1, 1.5); // nonsensical; clamp to 0.99
let intent = s.on_event(&snapshot(1, 50, 0)).expect("should still emit");
assert!((intent.confidence - 0.99).abs() < 1e-9);
}
#[test]
fn mid_at_boundary_rejected() {
let mut s = ExpectedValueKelly::new(ExpectedValueKellyConfig::default());
s.set_prior(1, 0.80);
// mid = 0 → boundary
assert!(s.on_event(&snapshot(1, 0, 0)).is_none());
// mid = 100 → boundary
assert!(s.on_event(&snapshot(1, 100, 1)).is_none());
}
}

View file

@ -0,0 +1,48 @@
//! Canonical strategy intent — venue-agnostic.
use serde::{Deserialize, Serialize};
/// Buy (open) or sell (close) side of an action.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum Action {
Buy,
Sell,
}
/// For binary event markets like Kalshi: which side of the contract.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum Side {
Yes,
No,
}
/// A strategy's desired trade, before the risk gate.
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct Intent {
/// Canonical symbol id (`neural_trader_core::MarketEvent::symbol_id`).
pub symbol_id: u32,
/// Which side of the binary contract.
pub side: Side,
/// Buy to open, sell to close.
pub action: Action,
/// Limit price in Kalshi cents (0..=100). Strategies quote in cents
/// because event markets settle in cents; the adapter upscales into
/// `price_fp` if needed downstream.
pub limit_price_cents: i64,
/// Number of contracts.
pub quantity: i64,
/// Expected edge in basis points over the mid (1 bp = 0.01%).
pub edge_bps: i64,
/// Model confidence in `[0, 1]`. Used by the risk gate as a multiplier
/// on position sizing (higher conviction → larger fraction).
pub confidence: f64,
/// Strategy name for attribution / audit.
pub strategy: &'static str,
}
impl Intent {
/// Notional in Kalshi cents (`price × quantity`).
pub fn notional_cents(&self) -> i64 {
self.limit_price_cents.saturating_mul(self.quantity)
}
}

View file

@ -0,0 +1,35 @@
//! # neural-trader-strategies
//!
//! Venue-agnostic strategy runtime for the RuVector Neural Trader.
//!
//! Key types:
//! - [`Intent`]: what a strategy wants to do (canonical, venue-agnostic).
//! - [`RiskGate`]: the mandatory wrapper around every `Intent` that enforces
//! position cap, daily-loss kill, concentration, min-edge, and the
//! live-trade env flag.
//! - [`Strategy`]: the trait strategies implement.
//! - [`ev_kelly::ExpectedValueKelly`]: first concrete strategy.
//!
//! Strategies consume [`neural_trader_core::MarketEvent`] and hold no
//! venue-specific state, so the same strategy runs in paper replay and
//! against live Kalshi (or any future venue that normalizes to
//! `MarketEvent`).
pub mod ev_kelly;
pub mod intent;
pub mod risk;
pub use ev_kelly::ExpectedValueKelly;
pub use intent::{Action, Intent, Side};
pub use risk::{PortfolioState, Position, RiskConfig, RiskDecision, RiskGate};
use neural_trader_core::MarketEvent;
/// Strategy trait. Stateless strategies can be `&self`; stateful ones use
/// `&mut self`. Implementers return at most one [`Intent`] per call — if a
/// composite decision is needed, emit the primary intent and let the next
/// event drive follow-ups.
pub trait Strategy {
fn name(&self) -> &'static str;
fn on_event(&mut self, event: &MarketEvent) -> Option<Intent>;
}

View file

@ -0,0 +1,333 @@
//! Risk gate — mandatory wrapper around every [`Intent`].
//!
//! All checks are in Kalshi cents so we never cross scale boundaries on
//! the hot path. The gate is pure: it never mutates portfolio state, only
//! returns a decision. Updating the portfolio happens after a fill is
//! observed.
use std::collections::HashMap;
use crate::intent::{Action, Intent, Side};
/// Per-ticker open position.
#[derive(Debug, Clone, Default)]
pub struct Position {
pub symbol_id: u32,
/// Signed contract quantity (YES = +, NO = -). For Kalshi we store
/// YES/NO on separate symbol ids using the same ticker so this stays
/// unsigned in practice, but we keep i64 for future flexibility.
pub quantity: i64,
/// Average fill price in cents.
pub avg_price_cents: i64,
}
impl Position {
pub fn notional_cents(&self) -> i64 {
self.quantity.saturating_mul(self.avg_price_cents)
}
}
/// Cluster identifier — a group of correlated contracts (e.g. "all Fed
/// December strikes"). Strategy authors assign cluster ids; the neural-
/// trader-coherence crate produces them in production but tests can use
/// any `u32`.
pub type ClusterId = u32;
/// Portfolio state observed by the gate. Pure snapshot — the gate never
/// mutates it.
#[derive(Debug, Clone, Default)]
pub struct PortfolioState {
pub cash_cents: i64,
pub starting_cash_cents: i64,
/// Realized P&L for the current UTC day, in cents. Negative = loss.
pub day_pnl_cents: i64,
pub positions: HashMap<u32, Position>,
/// Optional symbol→cluster mapping. When populated, concentration is
/// enforced by cluster; otherwise by symbol.
pub clusters: HashMap<u32, ClusterId>,
}
impl PortfolioState {
pub fn total_notional_cents(&self) -> i64 {
self.positions
.values()
.map(|p| p.notional_cents().abs())
.fold(0i64, |a, b| a.saturating_add(b))
}
fn cluster_notional_cents(&self, cluster: ClusterId) -> i64 {
self.positions
.values()
.filter(|p| self.clusters.get(&p.symbol_id).copied() == Some(cluster))
.map(|p| p.notional_cents().abs())
.fold(0i64, |a, b| a.saturating_add(b))
}
}
/// Gate configuration. All limits are hard — breach → reject.
#[derive(Debug, Clone)]
pub struct RiskConfig {
/// Max notional for a single new position, as a fraction of cash.
/// Default: 0.10 (10%).
pub max_position_frac: f64,
/// Stop trading when realized day P&L ≤ `-max_daily_loss_frac × starting_cash`.
/// Default: 0.03 (3%).
pub max_daily_loss_frac: f64,
/// Minimum edge to open in basis points. Default: 300 bps.
pub min_edge_bps: i64,
/// Max fraction of cash allocated to one cluster. Default: 0.40 (40%).
pub max_cluster_frac: f64,
/// If true, live orders require `KALSHI_ENABLE_LIVE=1`. If false, the
/// gate allows orders regardless (paper mode).
pub require_live_flag: bool,
}
impl Default for RiskConfig {
fn default() -> Self {
Self {
max_position_frac: 0.10,
max_daily_loss_frac: 0.03,
min_edge_bps: 300,
max_cluster_frac: 0.40,
require_live_flag: true,
}
}
}
/// Outcome of the gate for a single intent.
#[derive(Debug, Clone, PartialEq)]
pub enum RiskDecision {
Approve(Intent),
Reject { reason: RejectReason, intent: Intent },
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum RejectReason {
EdgeTooThin,
PositionTooLarge,
DailyLossKill,
ClusterConcentration,
LiveTradingDisabled,
InsufficientCash,
NonPositiveQuantity,
PriceOutOfRange,
}
#[derive(Debug, Clone, Default)]
pub struct RiskGate {
pub config: RiskConfig,
}
impl RiskGate {
pub fn new(config: RiskConfig) -> Self {
Self { config }
}
/// Evaluate a single intent against the portfolio snapshot and env.
pub fn evaluate(&self, intent: Intent, portfolio: &PortfolioState) -> RiskDecision {
// 1. Basic sanity (cheapest checks first — failures are immediate).
if intent.quantity <= 0 {
return RiskDecision::Reject {
reason: RejectReason::NonPositiveQuantity,
intent,
};
}
if intent.limit_price_cents <= 0 || intent.limit_price_cents >= 100 {
return RiskDecision::Reject {
reason: RejectReason::PriceOutOfRange,
intent,
};
}
// 2. Daily loss kill — stop *all* opening trades after breach.
let max_loss = (portfolio.starting_cash_cents as f64
* self.config.max_daily_loss_frac)
.round() as i64;
if intent.action == Action::Buy && portfolio.day_pnl_cents <= -max_loss {
return RiskDecision::Reject {
reason: RejectReason::DailyLossKill,
intent,
};
}
// 3. Edge threshold.
if intent.edge_bps < self.config.min_edge_bps {
return RiskDecision::Reject {
reason: RejectReason::EdgeTooThin,
intent,
};
}
// 4. Hard cash check — can we actually pay for the contracts?
// Checked before policy caps so the "not enough money" reason is
// distinguishable from "over the configured cap."
let notional = intent.notional_cents();
if intent.action == Action::Buy && notional > portfolio.cash_cents {
return RiskDecision::Reject {
reason: RejectReason::InsufficientCash,
intent,
};
}
// 5. Single-position notional cap (policy).
let position_cap = (portfolio.cash_cents as f64
* self.config.max_position_frac) as i64;
if intent.action == Action::Buy && notional > position_cap {
return RiskDecision::Reject {
reason: RejectReason::PositionTooLarge,
intent,
};
}
// 6. Cluster concentration (policy).
if let Some(cluster) = portfolio.clusters.get(&intent.symbol_id).copied() {
let cluster_cap = (portfolio.cash_cents as f64
* self.config.max_cluster_frac) as i64;
let existing = portfolio.cluster_notional_cents(cluster);
let projected = existing.saturating_add(notional);
if intent.action == Action::Buy && projected > cluster_cap {
return RiskDecision::Reject {
reason: RejectReason::ClusterConcentration,
intent,
};
}
}
// 7. Live-trade env flag (last; cheap but most commonly hit in dev).
if self.config.require_live_flag && is_opening_live_order(&intent) {
if std::env::var("KALSHI_ENABLE_LIVE").ok().as_deref() != Some("1") {
return RiskDecision::Reject {
reason: RejectReason::LiveTradingDisabled,
intent,
};
}
}
RiskDecision::Approve(intent)
}
}
fn is_opening_live_order(intent: &Intent) -> bool {
// Opening buys require live; closing sells can always run (we want to
// be able to exit positions even after the kill switch trips).
intent.action == Action::Buy && matches!(intent.side, Side::Yes | Side::No)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::intent::{Action, Side};
fn sample_intent(edge: i64, price: i64, qty: i64) -> Intent {
Intent {
symbol_id: 1,
side: Side::Yes,
action: Action::Buy,
limit_price_cents: price,
quantity: qty,
edge_bps: edge,
confidence: 0.9,
strategy: "test",
}
}
fn portfolio(cash: i64) -> PortfolioState {
PortfolioState {
cash_cents: cash,
starting_cash_cents: cash,
..Default::default()
}
}
#[test]
fn thin_edge_rejected() {
let gate = RiskGate::default();
let d = gate.evaluate(sample_intent(100, 24, 10), &portfolio(100_000));
assert!(matches!(d, RiskDecision::Reject { reason: RejectReason::EdgeTooThin, .. }));
}
#[test]
fn oversize_position_rejected() {
let gate = RiskGate::default();
// 24¢ × 600 = 14_400¢ vs cap 10% × 100_000 = 10_000¢
let d = gate.evaluate(sample_intent(500, 24, 600), &portfolio(100_000));
assert!(matches!(d, RiskDecision::Reject { reason: RejectReason::PositionTooLarge, .. }));
}
#[test]
fn daily_loss_kill_triggers() {
let gate = RiskGate::default();
let mut p = portfolio(100_000);
p.day_pnl_cents = -3_001; // just past 3%
let d = gate.evaluate(sample_intent(500, 24, 10), &p);
assert!(matches!(d, RiskDecision::Reject { reason: RejectReason::DailyLossKill, .. }));
}
#[test]
fn concentration_rejected() {
// Raise the per-position cap so we're testing concentration, not
// position size. Disable live flag so we don't trip on env.
let gate = RiskGate::new(RiskConfig {
require_live_flag: false,
max_position_frac: 0.20,
max_cluster_frac: 0.40,
..Default::default()
});
let mut p = portfolio(100_000);
p.clusters.insert(1, 42);
p.clusters.insert(2, 42);
p.positions.insert(
2,
Position {
symbol_id: 2,
quantity: 1000,
avg_price_cents: 35, // 35_000¢ already in cluster 42
},
);
// 24¢ × 500 = 12_000 < 20% cap (20_000) but existing 35_000 + 12_000
// = 47_000 > 40% cap (40_000) → concentration rejection.
let d = gate.evaluate(sample_intent(500, 24, 500), &p);
assert!(matches!(d, RiskDecision::Reject { reason: RejectReason::ClusterConcentration, .. }));
}
#[test]
fn approves_under_paper_config() {
let gate = RiskGate::new(RiskConfig { require_live_flag: false, ..Default::default() });
let d = gate.evaluate(sample_intent(500, 24, 10), &portfolio(100_000));
assert!(matches!(d, RiskDecision::Approve(_)));
}
#[test]
fn live_gate_rejects_without_env() {
// Ensure env flag is off.
std::env::remove_var("KALSHI_ENABLE_LIVE");
let gate = RiskGate::new(RiskConfig { require_live_flag: true, ..Default::default() });
let d = gate.evaluate(sample_intent(500, 24, 10), &portfolio(100_000));
assert!(matches!(d, RiskDecision::Reject { reason: RejectReason::LiveTradingDisabled, .. }));
}
#[test]
fn rejects_non_positive_qty_and_bad_price() {
let gate = RiskGate::new(RiskConfig { require_live_flag: false, ..Default::default() });
let d = gate.evaluate(sample_intent(500, 24, 0), &portfolio(100_000));
assert!(matches!(d, RiskDecision::Reject { reason: RejectReason::NonPositiveQuantity, .. }));
let d = gate.evaluate(sample_intent(500, 0, 10), &portfolio(100_000));
assert!(matches!(d, RiskDecision::Reject { reason: RejectReason::PriceOutOfRange, .. }));
let d = gate.evaluate(sample_intent(500, 100, 10), &portfolio(100_000));
assert!(matches!(d, RiskDecision::Reject { reason: RejectReason::PriceOutOfRange, .. }));
}
#[test]
fn insufficient_cash_rejected_even_under_cap() {
// Oversized cap so the position-size check does not mask the cash
// check.
let gate = RiskGate::new(RiskConfig {
require_live_flag: false,
max_position_frac: 5.0,
..Default::default()
});
// 50¢ × 200 = 10_000; cash only 5_000.
let d = gate.evaluate(sample_intent(500, 50, 200), &portfolio(5_000));
assert!(matches!(d, RiskDecision::Reject { reason: RejectReason::InsufficientCash, .. }));
}
}

View file

@ -0,0 +1,30 @@
[package]
name = "ruvector-kalshi"
version = "0.1.0"
edition = "2021"
description = "Kalshi exchange integration for the RuVector Neural Trader (ADR-151)"
license = "MIT OR Apache-2.0"
publish = false
[dependencies]
neural-trader-core = { path = "../neural-trader-core" }
serde = { workspace = true }
serde_json = { workspace = true }
thiserror = { workspace = true }
anyhow = { workspace = true }
tokio = { workspace = true, features = ["rt-multi-thread", "sync", "macros", "process", "time", "fs"] }
chrono = { workspace = true }
tracing = { workspace = true }
# Crypto — Kalshi requires RSA-PSS-SHA256
rsa = { version = "0.9", features = ["sha2"] }
sha2 = "0.10"
rand = { workspace = true }
base64 = "0.22"
# HTTP — match existing graph (mcp-brain uses reqwest 0.12 + rustls)
reqwest = { version = "0.12", default-features = false, features = ["json", "rustls-tls"] }
[dev-dependencies]
serde_json = { workspace = true }
tokio = { workspace = true, features = ["macros", "rt"] }

View file

@ -0,0 +1,70 @@
//! Offline validator: loads the real Kalshi credentials via `SecretLoader`,
//! signs a sample request, verifies the signature round-trips.
//!
//! No network calls are made. This proves the actual PEM on disk (or in
//! GCS) is loadable by this crate and produces valid RSA-PSS-SHA256
//! signatures.
//!
//! Run:
//! # Offline, local PEM:
//! KALSHI_SECRET_SOURCE=local cargo run -p ruvector-kalshi --example validate
//!
//! # From Google Cloud Secret Manager (needs `gcloud` auth):
//! cargo run -p ruvector-kalshi --example validate
use ruvector_kalshi::{
auth::{self, Signer},
secrets::SecretLoader,
};
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let project = std::env::var("KALSHI_GCP_PROJECT").unwrap_or_else(|_| "ruv-dev".into());
let loader = SecretLoader::new(project);
let creds = loader.load().await?;
println!("loaded credentials: {creds:?}");
let signer = Signer::from_pem(&creds.api_key, &creds.private_key_pem)?;
println!("signer ready: {signer:?}");
let ts_ms = 1_700_000_000_000u64;
let method = "GET";
let path = "/trade-api/v2/exchange/status";
let headers = signer.sign_with_ts(ts_ms, method, path);
println!(
"signed (ts={}, method={method}, path={path}):\n \
KALSHI-ACCESS-KEY: <len {} chars>\n \
KALSHI-ACCESS-TIMESTAMP: {}\n \
KALSHI-ACCESS-SIGNATURE: <len {} chars, base64>",
ts_ms,
headers.access_key.len(),
headers.timestamp_ms,
headers.signature_b64.len(),
);
auth::verify(
&signer.verifying_key(),
ts_ms,
method,
path,
&headers.signature_b64,
)?;
println!("signature verified against derived public key — OK");
// Tampered verification must fail.
let tamper = auth::verify(
&signer.verifying_key(),
ts_ms,
method,
"/trade-api/v2/portfolio",
&headers.signature_b64,
);
if tamper.is_ok() {
anyhow::bail!("tampered path unexpectedly verified — signing is broken");
}
println!("tamper check rejected — OK");
println!("\nall offline checks passed.");
Ok(())
}

View file

@ -0,0 +1,212 @@
//! Kalshi RSA-PSS-SHA256 request signing.
//!
//! Kalshi REST endpoints require three headers on every authenticated call:
//! - `KALSHI-ACCESS-KEY`: the API key UUID
//! - `KALSHI-ACCESS-TIMESTAMP`: unix time in milliseconds
//! - `KALSHI-ACCESS-SIGNATURE`: base64(RSA-PSS-SHA256(pem, ts || method || path))
//!
//! The canonical string is `format!("{ts}{method}{path}")` with no separators.
use base64::{engine::general_purpose::STANDARD, Engine};
use rsa::pkcs1::DecodeRsaPrivateKey;
use rsa::pkcs8::DecodePrivateKey;
use rsa::pss::{SigningKey, VerifyingKey};
use rsa::signature::{Keypair, RandomizedSigner, SignatureEncoding, Verifier};
use rsa::traits::PublicKeyParts;
use rsa::RsaPrivateKey;
use sha2::Sha256;
use crate::{KalshiError, Result};
/// Signed headers for a single Kalshi REST request.
#[derive(Debug, Clone)]
pub struct SignedHeaders {
pub access_key: String,
pub timestamp_ms: String,
pub signature_b64: String,
}
impl SignedHeaders {
/// Apply this signature set onto a [`reqwest::RequestBuilder`].
pub fn apply(self, rb: reqwest::RequestBuilder) -> reqwest::RequestBuilder {
rb.header("KALSHI-ACCESS-KEY", self.access_key)
.header("KALSHI-ACCESS-TIMESTAMP", self.timestamp_ms)
.header("KALSHI-ACCESS-SIGNATURE", self.signature_b64)
}
}
/// Kalshi request signer. Holds a parsed RSA private key and the caller's
/// API key string. Cheap to clone (both fields clone trivially).
#[derive(Clone)]
pub struct Signer {
api_key: String,
signing_key: SigningKey<Sha256>,
}
impl std::fmt::Debug for Signer {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
// Never leak the API key or key material in Debug output.
f.debug_struct("Signer")
.field("api_key_len", &self.api_key.len())
.field("key_size_bits", &self.signing_key.as_ref().size())
.finish()
}
}
impl Signer {
/// Build a signer from a PEM string (accepts PKCS#1 or PKCS#8) and API key.
pub fn from_pem(api_key: impl Into<String>, pem: &str) -> Result<Self> {
let private_key = parse_rsa_pem(pem)?;
Ok(Self {
api_key: api_key.into(),
signing_key: SigningKey::<Sha256>::new(private_key),
})
}
/// Sign for the given method + path using the current wall-clock time.
pub fn sign_now(&self, method: &str, path: &str) -> SignedHeaders {
let ts = current_ts_ms();
self.sign_with_ts(ts, method, path)
}
/// Sign with an explicit timestamp (deterministic — for tests/replay).
pub fn sign_with_ts(&self, ts_ms: u64, method: &str, path: &str) -> SignedHeaders {
let msg = canonical_string(ts_ms, method, path);
let mut rng = rand::thread_rng();
let sig = self.signing_key.sign_with_rng(&mut rng, msg.as_bytes());
SignedHeaders {
access_key: self.api_key.clone(),
timestamp_ms: ts_ms.to_string(),
signature_b64: STANDARD.encode(sig.to_bytes()),
}
}
/// Derive the matching verifying key (useful for round-trip tests).
pub fn verifying_key(&self) -> VerifyingKey<Sha256> {
self.signing_key.verifying_key()
}
}
fn canonical_string(ts_ms: u64, method: &str, path: &str) -> String {
// Kalshi spec: concat ts, method (upper-case), path exactly as sent.
format!("{ts_ms}{}{path}", method.to_uppercase())
}
fn current_ts_ms() -> u64 {
use std::time::{SystemTime, UNIX_EPOCH};
SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("system clock before UNIX epoch")
.as_millis() as u64
}
fn parse_rsa_pem(pem: &str) -> Result<RsaPrivateKey> {
// Accept either `-----BEGIN RSA PRIVATE KEY-----` (PKCS#1) or
// `-----BEGIN PRIVATE KEY-----` (PKCS#8).
if pem.contains("BEGIN RSA PRIVATE KEY") {
RsaPrivateKey::from_pkcs1_pem(pem)
.map_err(|e| KalshiError::InvalidPem(format!("pkcs1: {e}")))
} else {
RsaPrivateKey::from_pkcs8_pem(pem)
.map_err(|e| KalshiError::InvalidPem(format!("pkcs8: {e}")))
}
}
/// Verify a signature against the public half. Test-only helper.
pub fn verify(
vk: &VerifyingKey<Sha256>,
ts_ms: u64,
method: &str,
path: &str,
sig_b64: &str,
) -> Result<()> {
use rsa::pss::Signature;
let sig_bytes = STANDARD
.decode(sig_b64)
.map_err(|e| KalshiError::Signing(format!("base64: {e}")))?;
let sig = Signature::try_from(sig_bytes.as_slice())
.map_err(|e| KalshiError::Signing(format!("sig decode: {e}")))?;
let msg = canonical_string(ts_ms, method, path);
vk.verify(msg.as_bytes(), &sig)
.map_err(|e| KalshiError::Signing(format!("verify: {e}")))
}
#[cfg(test)]
mod tests {
use super::*;
use rsa::pkcs1::EncodeRsaPrivateKey;
/// Generate a fresh 2048-bit RSA key in PKCS#1 PEM form for tests.
fn gen_pkcs1_pem() -> String {
let mut rng = rand::thread_rng();
let key = RsaPrivateKey::new(&mut rng, 2048).expect("keygen");
key.to_pkcs1_pem(rsa::pkcs1::LineEnding::LF)
.expect("encode pkcs1")
.to_string()
}
#[test]
fn signer_roundtrip_pkcs1() {
let pem = gen_pkcs1_pem();
let signer = Signer::from_pem("test-key-uuid", &pem).unwrap();
let headers = signer.sign_with_ts(1_700_000_000_000, "GET", "/trade-api/v2/portfolio");
assert_eq!(headers.access_key, "test-key-uuid");
assert_eq!(headers.timestamp_ms, "1700000000000");
assert!(!headers.signature_b64.is_empty());
// Round-trip through the verifying key proves the signature is well-formed.
verify(
&signer.verifying_key(),
1_700_000_000_000,
"GET",
"/trade-api/v2/portfolio",
&headers.signature_b64,
)
.unwrap();
}
#[test]
fn signer_rejects_tampered_message() {
let pem = gen_pkcs1_pem();
let signer = Signer::from_pem("k", &pem).unwrap();
let headers = signer.sign_with_ts(42, "POST", "/orders");
// Verifying against a different path must fail.
let err = verify(
&signer.verifying_key(),
42,
"POST",
"/orders/tampered",
&headers.signature_b64,
);
assert!(err.is_err(), "tampered path must not verify");
}
#[test]
fn canonical_string_is_stable() {
assert_eq!(
canonical_string(1, "get", "/foo"),
"1GET/foo",
"method must be upper-cased",
);
}
#[test]
fn invalid_pem_is_rejected() {
let err = Signer::from_pem("k", "not a pem").unwrap_err();
match err {
KalshiError::InvalidPem(_) => {}
other => panic!("expected InvalidPem, got {other:?}"),
}
}
#[test]
fn debug_does_not_leak_key() {
let pem = gen_pkcs1_pem();
let signer = Signer::from_pem("super-secret-api-key-1234567890", &pem).unwrap();
let dbg = format!("{signer:?}");
assert!(!dbg.contains("super-secret-api-key"));
assert!(!dbg.contains("BEGIN RSA"));
}
}

View file

@ -0,0 +1,62 @@
//! # ruvector-kalshi
//!
//! Kalshi exchange integration for the RuVector Neural Trader.
//!
//! See ADR-151 for design. This crate provides:
//! - RSA-PSS-SHA256 request signing against Kalshi's REST API
//! - Typed DTOs for Kalshi market/event/order/fill payloads
//! - Normalization from Kalshi payloads into
//! [`neural_trader_core::MarketEvent`] so downstream coherence, attention,
//! and replay pipelines work unchanged.
//! - A REST client scaffold (live calls are gated behind a runtime flag)
//! - Secret loading from Google Cloud Secret Manager or a local PEM file
pub mod auth;
pub mod models;
pub mod normalize;
pub mod rest;
pub mod secrets;
pub mod ws;
use thiserror::Error;
/// Venue identifier for Kalshi in [`neural_trader_core::MarketEvent::venue_id`].
pub const KALSHI_VENUE_ID: u16 = 1001;
/// Default Kalshi REST base URL (production).
pub const KALSHI_API_URL: &str = "https://trading-api.kalshi.com/trade-api/v2";
/// Default Kalshi WebSocket URL (production).
pub const KALSHI_WS_URL: &str = "wss://trading-api.kalshi.com/trade-api/ws/v2";
/// Kalshi fixed-point scale for price. Kalshi quotes in cents 0..=100;
/// we upscale into the canonical `price_fp` scale (value × 1e8) so
/// that a 24¢ YES becomes 24_000_000.
pub const KALSHI_PRICE_FP_SCALE: i64 = 1_000_000;
/// Crate-wide error type.
#[derive(Debug, Error)]
pub enum KalshiError {
#[error("invalid PEM key material: {0}")]
InvalidPem(String),
#[error("signing failed: {0}")]
Signing(String),
#[error("secret source error: {0}")]
Secret(String),
#[error("HTTP error: {0}")]
Http(#[from] reqwest::Error),
#[error("decode error: {0}")]
Decode(#[from] serde_json::Error),
#[error("api error {status}: {body}")]
Api { status: u16, body: String },
#[error("normalization error: {0}")]
Normalize(String),
}
pub type Result<T> = std::result::Result<T, KalshiError>;

View file

@ -0,0 +1,252 @@
//! Kalshi REST + WebSocket DTOs. Only the fields we actually consume are
//! declared; everything else is ignored by `#[serde(deny_unknown_fields)]`
//! being *off* — we intentionally accept forward-compatible payloads.
use serde::{Deserialize, Serialize};
/// Market metadata (GET /markets, /markets/{ticker}).
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct Market {
pub ticker: String,
pub event_ticker: Option<String>,
pub title: Option<String>,
pub status: Option<String>,
pub yes_bid: Option<i64>,
pub yes_ask: Option<i64>,
pub no_bid: Option<i64>,
pub no_ask: Option<i64>,
pub last_price: Option<i64>,
pub volume: Option<i64>,
pub open_interest: Option<i64>,
pub close_time: Option<String>,
pub expiration_time: Option<String>,
}
/// Envelope for list-markets response.
#[derive(Debug, Clone, Deserialize)]
pub struct MarketsResponse {
pub markets: Vec<Market>,
pub cursor: Option<String>,
}
/// Single trade print (GET /markets/trades).
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct KalshiTrade {
pub ticker: String,
pub trade_id: String,
pub yes_price: Option<i64>,
pub no_price: Option<i64>,
pub count: i64,
pub taker_side: Option<String>, // "yes" | "no"
pub created_time: String, // ISO-8601
}
/// Orderbook snapshot (GET /markets/{ticker}/orderbook).
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct OrderbookSnapshot {
/// Each entry is `[price_cents, contracts]`.
pub yes: Vec<[i64; 2]>,
pub no: Vec<[i64; 2]>,
}
/// Wrapper for orderbook GET envelope.
#[derive(Debug, Clone, Deserialize)]
pub struct OrderbookResponse {
pub orderbook: OrderbookSnapshot,
}
/// Order placement payload (POST /portfolio/orders).
#[derive(Debug, Clone, Serialize)]
pub struct NewOrder {
pub ticker: String,
pub action: OrderAction,
pub side: OrderSide,
#[serde(rename = "type")]
pub order_type: OrderType,
pub count: i64,
#[serde(skip_serializing_if = "Option::is_none")]
pub yes_price: Option<i64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub no_price: Option<i64>,
pub client_order_id: String,
}
#[derive(Debug, Clone, Copy, Serialize)]
#[serde(rename_all = "lowercase")]
pub enum OrderAction {
Buy,
Sell,
}
#[derive(Debug, Clone, Copy, Serialize)]
#[serde(rename_all = "lowercase")]
pub enum OrderSide {
Yes,
No,
}
#[derive(Debug, Clone, Copy, Serialize)]
#[serde(rename_all = "lowercase")]
pub enum OrderType {
Limit,
Market,
}
/// Response from POST /portfolio/orders.
#[derive(Debug, Clone, Deserialize)]
pub struct OrderAck {
pub order: OrderRecord,
}
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct OrderRecord {
pub order_id: String,
pub client_order_id: Option<String>,
pub status: String,
pub ticker: String,
pub count: i64,
pub filled_count: Option<i64>,
pub remaining_count: Option<i64>,
}
/// Raw Kalshi WS envelope. `{"type": "...", "msg": {...}}`. `msg` is
/// kept as a `Value` so unknown `type` tags don't fail the parse — the
/// decoder routes on `msg_type`.
#[derive(Debug, Clone, Deserialize)]
pub struct WsEnvelope {
#[serde(rename = "type")]
pub msg_type: String,
pub msg: serde_json::Value,
}
/// Typed WS messages routed from [`WsEnvelope`] by the decoder.
#[derive(Debug, Clone)]
pub enum WsMessage {
Ticker(WsTicker),
Trade(WsTrade),
OrderbookSnapshot(WsOrderbook),
OrderbookDelta(WsOrderbookDelta),
Fill(WsFill),
/// Any non-data frame (heartbeat, ack, etc.) or unknown `type` tag.
Other,
}
impl WsMessage {
/// Decode an envelope into a typed message. Unknown type tags produce
/// [`WsMessage::Other`] rather than an error so forward-compatible
/// payloads don't kill the feed.
pub fn from_envelope(env: WsEnvelope) -> serde_json::Result<Self> {
Ok(match env.msg_type.as_str() {
"ticker" => Self::Ticker(serde_json::from_value(env.msg)?),
"trade" => Self::Trade(serde_json::from_value(env.msg)?),
"orderbook_snapshot" => Self::OrderbookSnapshot(serde_json::from_value(env.msg)?),
"orderbook_delta" => Self::OrderbookDelta(serde_json::from_value(env.msg)?),
"fill" => Self::Fill(serde_json::from_value(env.msg)?),
_ => Self::Other,
})
}
}
#[derive(Debug, Clone, Deserialize)]
pub struct WsTicker {
pub market_ticker: String,
pub yes_bid: Option<i64>,
pub yes_ask: Option<i64>,
pub price: Option<i64>,
pub ts: Option<i64>,
}
#[derive(Debug, Clone, Deserialize)]
pub struct WsTrade {
pub market_ticker: String,
pub yes_price: Option<i64>,
pub no_price: Option<i64>,
pub count: i64,
pub taker_side: Option<String>,
pub ts: Option<i64>,
}
#[derive(Debug, Clone, Deserialize)]
pub struct WsOrderbook {
pub market_ticker: String,
pub yes: Vec<[i64; 2]>,
pub no: Vec<[i64; 2]>,
pub ts: Option<i64>,
}
#[derive(Debug, Clone, Deserialize)]
pub struct WsOrderbookDelta {
pub market_ticker: String,
pub side: String, // "yes" | "no"
pub price: i64,
pub delta: i64,
pub ts: Option<i64>,
}
#[derive(Debug, Clone, Deserialize)]
pub struct WsFill {
pub market_ticker: String,
pub order_id: String,
pub yes_price: Option<i64>,
pub no_price: Option<i64>,
pub count: i64,
pub side: String,
pub ts: Option<i64>,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn market_deserializes_with_optional_fields() {
let json = r#"{
"ticker": "FED-23DEC-T3.00",
"title": "Fed raises rates",
"status": "active",
"yes_bid": 24,
"yes_ask": 26,
"volume": 1200
}"#;
let m: Market = serde_json::from_str(json).unwrap();
assert_eq!(m.ticker, "FED-23DEC-T3.00");
assert_eq!(m.yes_bid, Some(24));
assert!(m.no_bid.is_none());
}
#[test]
fn ws_message_dispatch() {
let json = r#"{"type":"ticker","msg":{"market_ticker":"X","yes_bid":10,"yes_ask":12}}"#;
let env: WsEnvelope = serde_json::from_str(json).unwrap();
let msg = WsMessage::from_envelope(env).unwrap();
assert!(matches!(msg, WsMessage::Ticker(ref t) if t.market_ticker == "X"));
}
#[test]
fn ws_message_unknown_kind_does_not_error() {
let json = r#"{"type":"heartbeat","msg":{}}"#;
let env: WsEnvelope = serde_json::from_str(json).unwrap();
let msg = WsMessage::from_envelope(env).unwrap();
assert!(matches!(msg, WsMessage::Other));
}
#[test]
fn new_order_limit_serializes() {
let o = NewOrder {
ticker: "X".into(),
action: OrderAction::Buy,
side: OrderSide::Yes,
order_type: OrderType::Limit,
count: 10,
yes_price: Some(24),
no_price: None,
client_order_id: "abc".into(),
};
let s = serde_json::to_string(&o).unwrap();
assert!(s.contains("\"type\":\"limit\""));
assert!(s.contains("\"action\":\"buy\""));
assert!(s.contains("\"yes_price\":24"));
// None field must be omitted.
assert!(!s.contains("no_price"));
}
}

View file

@ -0,0 +1,344 @@
//! Normalize Kalshi payloads into [`neural_trader_core::MarketEvent`] so the
//! existing coherence, attention, CNN, and replay pipelines can consume
//! Kalshi as just another venue.
//!
//! Symbol-id assignment is deterministic via FNV-1a over the Kalshi ticker
//! so the same contract always maps to the same `symbol_id` across runs
//! without needing an external registry.
use chrono::DateTime;
use neural_trader_core::{EventType, MarketEvent, Side};
use crate::models::{
KalshiTrade, OrderbookSnapshot, WsFill, WsOrderbook, WsOrderbookDelta, WsTicker, WsTrade,
};
use crate::{KalshiError, Result, KALSHI_PRICE_FP_SCALE, KALSHI_VENUE_ID};
/// Stable, deterministic symbol id from a Kalshi ticker. FNV-1a 32-bit,
/// ignoring case so `"FED-DEC23"` and `"fed-dec23"` collide (desired).
pub fn symbol_id_for(ticker: &str) -> u32 {
let mut hash: u32 = 0x811c_9dc5;
for b in ticker.as_bytes() {
let c = b.to_ascii_lowercase();
hash ^= c as u32;
hash = hash.wrapping_mul(0x0100_0193);
}
hash
}
/// Convert a Kalshi cent price (0..=100) into canonical fixed-point.
pub fn cents_to_fp(cents: i64) -> i64 {
cents.saturating_mul(KALSHI_PRICE_FP_SCALE)
}
fn parse_iso_ns(ts: &str) -> Result<u64> {
let dt = DateTime::parse_from_rfc3339(ts)
.map_err(|e| KalshiError::Normalize(format!("parse ts {ts}: {e}")))?;
let ns = dt.timestamp_nanos_opt().ok_or_else(|| {
KalshiError::Normalize(format!("ts {ts} out of range for i64 ns"))
})?;
Ok(ns.max(0) as u64)
}
fn ms_to_ns(ts_ms: Option<i64>) -> u64 {
ts_ms.map(|t| (t.max(0) as u64).saturating_mul(1_000_000)).unwrap_or_else(now_ns)
}
fn now_ns() -> u64 {
use std::time::{SystemTime, UNIX_EPOCH};
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_nanos() as u64)
.unwrap_or(0)
}
fn empty_event_id() -> [u8; 16] {
[0u8; 16]
}
fn event_id_from_str(key: &str) -> [u8; 16] {
// 128-bit FNV-1a. Produces a stable id from trade_id / order_id strings.
let mut hash: u128 = 0x6c62_272e_07bb_0142_62b8_2175_6295_c58d;
for b in key.as_bytes() {
hash ^= *b as u128;
hash = hash.wrapping_mul(0x0000_0000_0100_0000_0000_0000_0000_013B);
}
hash.to_le_bytes()
}
// ---------------------------------------------------------------------------
// REST normalizers
// ---------------------------------------------------------------------------
/// Normalize a REST `KalshiTrade` into a canonical Trade event.
pub fn trade_to_event(t: &KalshiTrade, seq: u64) -> Result<MarketEvent> {
let ts_ns = parse_iso_ns(&t.created_time)?;
let price_cents = t.yes_price.unwrap_or(0);
let side = match t.taker_side.as_deref() {
Some("yes") => Some(Side::Bid),
Some("no") => Some(Side::Ask),
_ => None,
};
Ok(MarketEvent {
event_id: event_id_from_str(&t.trade_id),
ts_exchange_ns: ts_ns,
ts_ingest_ns: now_ns(),
venue_id: KALSHI_VENUE_ID,
symbol_id: symbol_id_for(&t.ticker),
event_type: EventType::Trade,
side,
price_fp: cents_to_fp(price_cents),
qty_fp: t.count.saturating_mul(KALSHI_PRICE_FP_SCALE),
order_id_hash: None,
participant_id_hash: None,
flags: 0,
seq,
})
}
/// Expand an orderbook snapshot into per-level MarketEvents. Each Kalshi
/// level becomes one `BookSnapshot` event with the level's price/size.
pub fn orderbook_to_events(
ticker: &str,
ts_ns: u64,
ob: &OrderbookSnapshot,
starting_seq: u64,
) -> Vec<MarketEvent> {
let sym = symbol_id_for(ticker);
let mut out = Vec::with_capacity(ob.yes.len() + ob.no.len());
let mut seq = starting_seq;
for [price, size] in &ob.yes {
out.push(MarketEvent {
event_id: empty_event_id(),
ts_exchange_ns: ts_ns,
ts_ingest_ns: now_ns(),
venue_id: KALSHI_VENUE_ID,
symbol_id: sym,
event_type: EventType::BookSnapshot,
side: Some(Side::Bid),
price_fp: cents_to_fp(*price),
qty_fp: size.saturating_mul(KALSHI_PRICE_FP_SCALE),
order_id_hash: None,
participant_id_hash: None,
flags: 0,
seq,
});
seq = seq.wrapping_add(1);
}
for [price, size] in &ob.no {
out.push(MarketEvent {
event_id: empty_event_id(),
ts_exchange_ns: ts_ns,
ts_ingest_ns: now_ns(),
venue_id: KALSHI_VENUE_ID,
symbol_id: sym,
event_type: EventType::BookSnapshot,
side: Some(Side::Ask),
price_fp: cents_to_fp(*price),
qty_fp: size.saturating_mul(KALSHI_PRICE_FP_SCALE),
order_id_hash: None,
participant_id_hash: None,
flags: 0,
seq,
});
seq = seq.wrapping_add(1);
}
out
}
// ---------------------------------------------------------------------------
// WebSocket normalizers
// ---------------------------------------------------------------------------
pub fn ws_ticker_to_event(t: &WsTicker, seq: u64) -> MarketEvent {
// Use the mid as the canonical price. Fallback to `price` if bids/asks
// absent (end-of-day tickers).
let price = match (t.yes_bid, t.yes_ask, t.price) {
(Some(b), Some(a), _) => (b + a) / 2,
(_, _, Some(p)) => p,
_ => 0,
};
MarketEvent {
event_id: empty_event_id(),
ts_exchange_ns: ms_to_ns(t.ts),
ts_ingest_ns: now_ns(),
venue_id: KALSHI_VENUE_ID,
symbol_id: symbol_id_for(&t.market_ticker),
event_type: EventType::VenueStatus,
side: None,
price_fp: cents_to_fp(price),
qty_fp: 0,
order_id_hash: None,
participant_id_hash: None,
flags: 0,
seq,
}
}
pub fn ws_trade_to_event(t: &WsTrade, seq: u64) -> MarketEvent {
let side = match t.taker_side.as_deref() {
Some("yes") => Some(Side::Bid),
Some("no") => Some(Side::Ask),
_ => None,
};
MarketEvent {
event_id: empty_event_id(),
ts_exchange_ns: ms_to_ns(t.ts),
ts_ingest_ns: now_ns(),
venue_id: KALSHI_VENUE_ID,
symbol_id: symbol_id_for(&t.market_ticker),
event_type: EventType::Trade,
side,
price_fp: cents_to_fp(t.yes_price.unwrap_or(0)),
qty_fp: t.count.saturating_mul(KALSHI_PRICE_FP_SCALE),
order_id_hash: None,
participant_id_hash: None,
flags: 0,
seq,
}
}
pub fn ws_orderbook_to_events(ob: &WsOrderbook, starting_seq: u64) -> Vec<MarketEvent> {
let ts_ns = ms_to_ns(ob.ts);
orderbook_to_events(
&ob.market_ticker,
ts_ns,
&OrderbookSnapshot {
yes: ob.yes.clone(),
no: ob.no.clone(),
},
starting_seq,
)
}
pub fn ws_orderbook_delta_to_event(d: &WsOrderbookDelta, seq: u64) -> MarketEvent {
let side = match d.side.as_str() {
"yes" => Some(Side::Bid),
"no" => Some(Side::Ask),
_ => None,
};
let event_type = if d.delta > 0 {
EventType::NewOrder
} else {
EventType::CancelOrder
};
MarketEvent {
event_id: empty_event_id(),
ts_exchange_ns: ms_to_ns(d.ts),
ts_ingest_ns: now_ns(),
venue_id: KALSHI_VENUE_ID,
symbol_id: symbol_id_for(&d.market_ticker),
event_type,
side,
price_fp: cents_to_fp(d.price),
qty_fp: d.delta.abs().saturating_mul(KALSHI_PRICE_FP_SCALE),
order_id_hash: None,
participant_id_hash: None,
flags: 0,
seq,
}
}
pub fn ws_fill_to_event(f: &WsFill, seq: u64) -> MarketEvent {
let side = match f.side.as_str() {
"yes" => Some(Side::Bid),
"no" => Some(Side::Ask),
_ => None,
};
MarketEvent {
event_id: event_id_from_str(&f.order_id),
ts_exchange_ns: ms_to_ns(f.ts),
ts_ingest_ns: now_ns(),
venue_id: KALSHI_VENUE_ID,
symbol_id: symbol_id_for(&f.market_ticker),
event_type: EventType::Trade,
side,
price_fp: cents_to_fp(f.yes_price.unwrap_or(0)),
qty_fp: f.count.saturating_mul(KALSHI_PRICE_FP_SCALE),
order_id_hash: Some(event_id_from_str(&f.order_id)),
participant_id_hash: None,
flags: 1, // flag bit 0 = own-fill
seq,
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::models::{KalshiTrade, OrderbookSnapshot, WsOrderbookDelta};
#[test]
fn symbol_id_is_deterministic_and_case_insensitive() {
assert_eq!(symbol_id_for("FED-DEC23"), symbol_id_for("fed-dec23"));
assert_ne!(symbol_id_for("FED-DEC23"), symbol_id_for("FED-DEC24"));
}
#[test]
fn cents_to_fp_scales() {
assert_eq!(cents_to_fp(24), 24_000_000);
assert_eq!(cents_to_fp(0), 0);
}
#[test]
fn trade_to_event_maps_side_and_price() {
let t = KalshiTrade {
ticker: "FED-DEC23".into(),
trade_id: "trade-abc".into(),
yes_price: Some(34),
no_price: Some(66),
count: 5,
taker_side: Some("yes".into()),
created_time: "2026-04-20T18:00:00Z".into(),
};
let e = trade_to_event(&t, 7).unwrap();
assert_eq!(e.event_type, EventType::Trade);
assert_eq!(e.side, Some(Side::Bid));
assert_eq!(e.venue_id, KALSHI_VENUE_ID);
assert_eq!(e.price_fp, cents_to_fp(34));
assert_eq!(e.qty_fp, 5 * KALSHI_PRICE_FP_SCALE);
assert_eq!(e.seq, 7);
assert_ne!(e.event_id, [0u8; 16], "trade id must seed event_id");
}
#[test]
fn orderbook_levels_expanded() {
let ob = OrderbookSnapshot {
yes: vec![[24, 100], [23, 200]],
no: vec![[76, 150]],
};
let events = orderbook_to_events("FED-DEC23", 1_000_000_000, &ob, 0);
assert_eq!(events.len(), 3);
assert_eq!(events[0].side, Some(Side::Bid));
assert_eq!(events[0].price_fp, cents_to_fp(24));
assert_eq!(events[0].qty_fp, 100 * KALSHI_PRICE_FP_SCALE);
assert_eq!(events[2].side, Some(Side::Ask));
// Seq monotonic.
assert_eq!(events[0].seq, 0);
assert_eq!(events[2].seq, 2);
}
#[test]
fn ws_delta_positive_is_new_order_negative_is_cancel() {
let add = WsOrderbookDelta {
market_ticker: "FED-DEC23".into(),
side: "yes".into(),
price: 24,
delta: 5,
ts: Some(1_700_000_000_000),
};
let remove = WsOrderbookDelta {
market_ticker: "FED-DEC23".into(),
side: "no".into(),
price: 76,
delta: -3,
ts: None,
};
assert_eq!(ws_orderbook_delta_to_event(&add, 0).event_type, EventType::NewOrder);
assert_eq!(
ws_orderbook_delta_to_event(&remove, 1).event_type,
EventType::CancelOrder
);
// qty always positive.
assert_eq!(ws_orderbook_delta_to_event(&remove, 1).qty_fp, 3 * KALSHI_PRICE_FP_SCALE);
}
}

View file

@ -0,0 +1,182 @@
//! Kalshi REST client. All authenticated endpoints sign the request using
//! [`crate::auth::Signer`] and propagate typed errors.
//!
//! # Live-trade gate
//!
//! [`RestClient::post_order`] is the only method that can move money and it
//! refuses to run unless the `KALSHI_ENABLE_LIVE` environment variable is
//! set to `1`. Any other value (including unset) returns an error without
//! making the HTTP call. This is a belt-and-braces backstop on top of any
//! strategy-level `RiskGate`.
use crate::auth::Signer;
use crate::models::{
Market, MarketsResponse, NewOrder, OrderAck, OrderbookResponse, OrderbookSnapshot,
};
use crate::{KalshiError, Result};
#[derive(Clone)]
pub struct RestClient {
base_url: String,
signer: Signer,
http: reqwest::Client,
}
impl RestClient {
pub fn new(base_url: impl Into<String>, signer: Signer) -> Result<Self> {
let http = reqwest::Client::builder()
.user_agent("ruvector-kalshi/0.1")
.timeout(std::time::Duration::from_secs(10))
.build()?;
Ok(Self {
base_url: base_url.into(),
signer,
http,
})
}
/// Path used in the signature must be the full `/trade-api/v2/...` path,
/// not the host-relative fragment, per Kalshi's spec.
fn sig_path_for(&self, path: &str) -> String {
// `base_url` already ends with `/trade-api/v2`; the sig path is the
// full URL path component.
let url = url_join(&self.base_url, path);
match reqwest::Url::parse(&url) {
Ok(u) => u.path().to_string(),
Err(_) => path.to_string(),
}
}
async fn send<R: for<'de> serde::Deserialize<'de>>(
&self,
method: reqwest::Method,
path: &str,
body: Option<&impl serde::Serialize>,
) -> Result<R> {
let url = url_join(&self.base_url, path);
let sig_path = self.sig_path_for(path);
let headers = self.signer.sign_now(method.as_str(), &sig_path);
let mut rb = self.http.request(method, &url);
rb = headers.apply(rb);
if let Some(b) = body {
rb = rb.json(b);
}
let resp = rb.send().await?;
let status = resp.status();
if !status.is_success() {
let body = resp.text().await.unwrap_or_default();
return Err(KalshiError::Api {
status: status.as_u16(),
body,
});
}
let parsed = resp.json::<R>().await?;
Ok(parsed)
}
pub async fn list_markets(&self, status: Option<&str>) -> Result<Vec<Market>> {
let path = match status {
Some(s) => format!("/markets?status={s}"),
None => "/markets".into(),
};
let resp: MarketsResponse = self.send(reqwest::Method::GET, &path, NO_BODY).await?;
Ok(resp.markets)
}
pub async fn orderbook(&self, ticker: &str) -> Result<OrderbookSnapshot> {
let path = format!("/markets/{ticker}/orderbook");
let resp: OrderbookResponse = self.send(reqwest::Method::GET, &path, NO_BODY).await?;
Ok(resp.orderbook)
}
/// Place a new order. Refuses to run unless `KALSHI_ENABLE_LIVE=1`.
pub async fn post_order(&self, order: &NewOrder) -> Result<OrderAck> {
if std::env::var("KALSHI_ENABLE_LIVE").ok().as_deref() != Some("1") {
return Err(KalshiError::Api {
status: 0,
body: "live trading disabled (set KALSHI_ENABLE_LIVE=1 to enable)".into(),
});
}
self.send(reqwest::Method::POST, "/portfolio/orders", Some(order))
.await
}
}
const NO_BODY: Option<&()> = None;
fn url_join(base: &str, path: &str) -> String {
if path.starts_with("http://") || path.starts_with("https://") {
return path.to_string();
}
let b = base.trim_end_matches('/');
let p = if path.starts_with('/') { path.to_string() } else { format!("/{path}") };
format!("{b}{p}")
}
#[cfg(test)]
mod tests {
use super::*;
use crate::auth::Signer;
use rsa::pkcs1::EncodeRsaPrivateKey;
use rsa::RsaPrivateKey;
fn test_signer() -> Signer {
let mut rng = rand::thread_rng();
let key = RsaPrivateKey::new(&mut rng, 2048).unwrap();
let pem = key.to_pkcs1_pem(rsa::pkcs1::LineEnding::LF).unwrap().to_string();
Signer::from_pem("test-key", &pem).unwrap()
}
#[test]
fn url_join_handles_trailing_and_leading_slashes() {
assert_eq!(
url_join("https://example.com/trade-api/v2/", "/markets"),
"https://example.com/trade-api/v2/markets"
);
assert_eq!(
url_join("https://example.com/trade-api/v2", "markets"),
"https://example.com/trade-api/v2/markets"
);
}
#[test]
fn sig_path_uses_full_url_path() {
let client = RestClient::new(
"https://trading-api.kalshi.com/trade-api/v2",
test_signer(),
)
.unwrap();
let p = client.sig_path_for("/markets");
assert_eq!(p, "/trade-api/v2/markets");
}
#[tokio::test]
async fn post_order_refuses_without_live_flag() {
// Ensure the flag is not set.
std::env::remove_var("KALSHI_ENABLE_LIVE");
let client = RestClient::new(
"https://trading-api.kalshi.com/trade-api/v2",
test_signer(),
)
.unwrap();
let order = NewOrder {
ticker: "X".into(),
action: crate::models::OrderAction::Buy,
side: crate::models::OrderSide::Yes,
order_type: crate::models::OrderType::Limit,
count: 1,
yes_price: Some(24),
no_price: None,
client_order_id: "t-1".into(),
};
let err = client.post_order(&order).await.unwrap_err();
match err {
KalshiError::Api { status: 0, body } => {
assert!(body.contains("live trading disabled"));
}
other => panic!("expected Api status=0 error, got {other:?}"),
}
}
}

View file

@ -0,0 +1,219 @@
//! Secret loading for Kalshi credentials.
//!
//! Three sources, checked in order:
//! 1. `KALSHI_SECRET_SOURCE=local` → read PEM + API key from local files
//! (dev only). Paths default to `.kalshi/kalshi.pem` and
//! `.kalshi/api-key.txt` under the current working directory.
//! 2. `KALSHI_SECRET_SOURCE=env` → read from `KALSHI_API_KEY` and
//! `KALSHI_PRIVATE_KEY_PEM` env vars.
//! 3. Default: shell out to `gcloud secrets versions access latest`.
//!
//! The result is cached in memory for 5 minutes so hot REST paths do not
//! re-invoke `gcloud` per request.
use std::path::PathBuf;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::Mutex;
use crate::{KalshiError, Result};
/// Loaded Kalshi credentials. The PEM is kept as a `String` rather than a
/// parsed key so the [`crate::auth::Signer`] stays the single owner of the
/// private key material.
#[derive(Clone)]
pub struct Credentials {
pub api_key: String,
pub private_key_pem: String,
pub api_url: String,
}
impl std::fmt::Debug for Credentials {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Credentials")
.field("api_key_len", &self.api_key.len())
.field("pem_len", &self.private_key_pem.len())
.field("api_url", &self.api_url)
.finish()
}
}
#[derive(Clone)]
pub struct SecretLoader {
project: String,
ttl: Duration,
inner: Arc<Mutex<Option<(Credentials, Instant)>>>,
}
impl SecretLoader {
pub fn new(project: impl Into<String>) -> Self {
Self {
project: project.into(),
ttl: Duration::from_secs(300),
inner: Arc::new(Mutex::new(None)),
}
}
/// Override the cache TTL. Primarily for tests.
pub fn with_ttl(mut self, ttl: Duration) -> Self {
self.ttl = ttl;
self
}
/// Load credentials, using the in-memory cache if fresh.
pub async fn load(&self) -> Result<Credentials> {
{
let guard = self.inner.lock().await;
if let Some((creds, loaded_at)) = guard.as_ref() {
if loaded_at.elapsed() < self.ttl {
return Ok(creds.clone());
}
}
}
let creds = self.load_fresh().await?;
let mut guard = self.inner.lock().await;
*guard = Some((creds.clone(), Instant::now()));
Ok(creds)
}
async fn load_fresh(&self) -> Result<Credentials> {
match std::env::var("KALSHI_SECRET_SOURCE").as_deref() {
Ok("local") => self.load_local().await,
Ok("env") => self.load_env(),
_ => self.load_gcloud().await,
}
}
fn load_env(&self) -> Result<Credentials> {
let api_key = std::env::var("KALSHI_API_KEY")
.map_err(|_| KalshiError::Secret("KALSHI_API_KEY not set".into()))?;
let private_key_pem = std::env::var("KALSHI_PRIVATE_KEY_PEM")
.map_err(|_| KalshiError::Secret("KALSHI_PRIVATE_KEY_PEM not set".into()))?;
let api_url = std::env::var("KALSHI_API_URL")
.unwrap_or_else(|_| crate::KALSHI_API_URL.to_string());
Ok(Credentials {
api_key,
private_key_pem,
api_url,
})
}
async fn load_local(&self) -> Result<Credentials> {
let pem_path = std::env::var("KALSHI_PEM_PATH")
.map(PathBuf::from)
.unwrap_or_else(|_| PathBuf::from(".kalshi/kalshi.pem"));
let api_key_path = std::env::var("KALSHI_API_KEY_PATH")
.map(PathBuf::from)
.unwrap_or_else(|_| PathBuf::from(".kalshi/api-key.txt"));
let private_key_pem = tokio::fs::read_to_string(&pem_path)
.await
.map_err(|e| KalshiError::Secret(format!("reading {pem_path:?}: {e}")))?;
let api_key = if let Ok(key) = std::env::var("KALSHI_API_KEY") {
key
} else {
tokio::fs::read_to_string(&api_key_path)
.await
.map(|s| s.trim().to_string())
.map_err(|e| {
KalshiError::Secret(format!(
"reading {api_key_path:?} (or set KALSHI_API_KEY): {e}"
))
})?
};
let api_url = std::env::var("KALSHI_API_URL")
.unwrap_or_else(|_| crate::KALSHI_API_URL.to_string());
Ok(Credentials {
api_key,
private_key_pem,
api_url,
})
}
async fn load_gcloud(&self) -> Result<Credentials> {
let api_key = gcloud_secret(&self.project, "KALSHI_API_KEY").await?;
let private_key_pem = gcloud_secret(&self.project, "KALSHI_PRIVATE_KEY_PEM").await?;
let api_url = match gcloud_secret(&self.project, "KALSHI_API_URL").await {
Ok(s) => s,
Err(_) => crate::KALSHI_API_URL.to_string(),
};
Ok(Credentials {
api_key: api_key.trim().to_string(),
private_key_pem,
api_url: api_url.trim().to_string(),
})
}
}
async fn gcloud_secret(project: &str, name: &str) -> Result<String> {
let output = tokio::process::Command::new("gcloud")
.args([
"secrets",
"versions",
"access",
"latest",
"--secret",
name,
"--project",
project,
])
.output()
.await
.map_err(|e| KalshiError::Secret(format!("spawning gcloud for {name}: {e}")))?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr).into_owned();
return Err(KalshiError::Secret(format!(
"gcloud secrets access {name} failed: {stderr}"
)));
}
let val = String::from_utf8(output.stdout).map_err(|e| {
KalshiError::Secret(format!("gcloud output for {name} is not utf-8: {e}"))
})?;
Ok(val)
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn env_source_reads_vars() {
// SAFETY: env mutation in tests is serialized by running this test
// in isolation from other env-touching tests in this module (only one).
std::env::set_var("KALSHI_SECRET_SOURCE", "env");
std::env::set_var("KALSHI_API_KEY", "k-env-1");
std::env::set_var("KALSHI_PRIVATE_KEY_PEM", "PEM-PLACEHOLDER");
std::env::remove_var("KALSHI_API_URL");
let loader = SecretLoader::new("test").with_ttl(Duration::from_millis(10));
let creds = loader.load().await.unwrap();
assert_eq!(creds.api_key, "k-env-1");
assert_eq!(creds.private_key_pem, "PEM-PLACEHOLDER");
assert_eq!(creds.api_url, crate::KALSHI_API_URL);
// Cache hit returns the same value immediately.
let creds2 = loader.load().await.unwrap();
assert_eq!(creds.api_key, creds2.api_key);
std::env::remove_var("KALSHI_SECRET_SOURCE");
std::env::remove_var("KALSHI_API_KEY");
std::env::remove_var("KALSHI_PRIVATE_KEY_PEM");
}
#[test]
fn debug_does_not_leak_pem() {
let c = Credentials {
api_key: "super-secret-1234567890".into(),
private_key_pem: "-----BEGIN RSA PRIVATE KEY----- secret".into(),
api_url: crate::KALSHI_API_URL.into(),
};
let s = format!("{c:?}");
assert!(!s.contains("super-secret"));
assert!(!s.contains("BEGIN RSA"));
}
}

View file

@ -0,0 +1,129 @@
//! WebSocket ingest scaffolding for Kalshi's market-data stream.
//!
//! This module intentionally keeps the network layer *pluggable*. The
//! default build does not pull in `tokio-tungstenite` — callers feed raw
//! JSON frames into [`FeedDecoder::decode`], which produces
//! [`neural_trader_core::MarketEvent`] records ready for the coherence /
//! replay pipelines.
//!
//! When a tungstenite-based transport is needed, add
//! `tokio-tungstenite` to this crate and wire `connect` + `pump` inside a
//! `#[cfg(feature = "tungstenite")]` module. Keeping the decoder feature-
//! free means unit tests cover the parsing path without requiring a
//! network runtime.
use neural_trader_core::MarketEvent;
use crate::models::{WsEnvelope, WsMessage};
use crate::normalize::{
ws_fill_to_event, ws_orderbook_delta_to_event, ws_orderbook_to_events, ws_ticker_to_event,
ws_trade_to_event,
};
use crate::Result;
/// Decodes raw JSON WebSocket frames into canonical `MarketEvent`s.
///
/// Stateful only in its monotonically increasing sequence counter, so it
/// is cheap to construct per connection.
#[derive(Debug, Default)]
pub struct FeedDecoder {
next_seq: u64,
}
impl FeedDecoder {
pub fn new() -> Self {
Self::default()
}
/// Decode one JSON frame. Unknown message kinds yield an empty vector
/// (not an error) so forward-compatible payloads don't kill the feed.
pub fn decode(&mut self, frame: &str) -> Result<Vec<MarketEvent>> {
let env: WsEnvelope = serde_json::from_str(frame)?;
let msg = WsMessage::from_envelope(env)?;
Ok(self.decode_msg(&msg))
}
fn decode_msg(&mut self, msg: &WsMessage) -> Vec<MarketEvent> {
match msg {
WsMessage::Ticker(t) => vec![self.tick(|s| ws_ticker_to_event(t, s))],
WsMessage::Trade(t) => vec![self.tick(|s| ws_trade_to_event(t, s))],
WsMessage::OrderbookSnapshot(ob) => {
let events = ws_orderbook_to_events(ob, self.next_seq);
self.next_seq = self
.next_seq
.wrapping_add(events.len() as u64);
events
}
WsMessage::OrderbookDelta(d) => {
vec![self.tick(|s| ws_orderbook_delta_to_event(d, s))]
}
WsMessage::Fill(f) => vec![self.tick(|s| ws_fill_to_event(f, s))],
WsMessage::Other => Vec::new(),
}
}
fn tick<F: FnOnce(u64) -> MarketEvent>(&mut self, f: F) -> MarketEvent {
let seq = self.next_seq;
self.next_seq = self.next_seq.wrapping_add(1);
f(seq)
}
pub fn next_seq(&self) -> u64 {
self.next_seq
}
}
#[cfg(test)]
mod tests {
use super::*;
use neural_trader_core::EventType;
#[test]
fn decodes_ticker_then_trade_monotonically() {
let mut dec = FeedDecoder::new();
let events = dec
.decode(r#"{"type":"ticker","msg":{"market_ticker":"X","yes_bid":10,"yes_ask":12}}"#)
.unwrap();
assert_eq!(events.len(), 1);
assert_eq!(events[0].seq, 0);
assert_eq!(events[0].event_type, EventType::VenueStatus);
let events = dec
.decode(r#"{"type":"trade","msg":{"market_ticker":"X","yes_price":11,"count":3}}"#)
.unwrap();
assert_eq!(events.len(), 1);
assert_eq!(events[0].seq, 1);
assert_eq!(events[0].event_type, EventType::Trade);
}
#[test]
fn decodes_snapshot_with_multiple_levels() {
let mut dec = FeedDecoder::new();
let frame = r#"{
"type":"orderbook_snapshot",
"msg":{
"market_ticker":"X",
"yes":[[24,100],[23,200]],
"no":[[76,150]]
}
}"#;
let events = dec.decode(frame).unwrap();
assert_eq!(events.len(), 3);
assert_eq!(dec.next_seq(), 3);
}
#[test]
fn unknown_message_is_silent() {
let mut dec = FeedDecoder::new();
let events = dec.decode(r#"{"type":"heartbeat","msg":{}}"#).unwrap();
assert!(events.is_empty());
assert_eq!(dec.next_seq(), 0);
}
#[test]
fn malformed_json_returns_error() {
let mut dec = FeedDecoder::new();
let err = dec.decode("not json").unwrap_err();
assert!(matches!(err, crate::KalshiError::Decode(_)));
}
}

View file

@ -0,0 +1,253 @@
# ADR-151: Kalshi Integration via RuVector Neural Trader
## Status
Proposed
## Date
2026-04-20
## Context
[Kalshi](https://kalshi.com) is a CFTC-regulated US event-contract exchange exposing a signed REST + WebSocket API at `https://trading-api.kalshi.com/trade-api/v2`. Contracts are binary (YES/NO) with prices in cents (0100), settling to $1 on resolution. This is a natural target for the **RuVector Neural Trader** stack, which already models probabilistic event markets in `examples/neural-trader/specialized/prediction-markets.js` (Polymarket/Kalshi/PredictIt) but lacks a live Rust execution path against the real Kalshi API.
Existing building blocks in the ruvector workspace we will reuse verbatim:
| Crate / Module | Role | Reuse |
|---|---|---|
| `crates/neural-trader-core` | Canonical `MarketEvent`, `EventType`, `Side`, ingest traits | Normalize Kalshi ticker/trade/orderbook into `MarketEvent` |
| `crates/neural-trader-coherence` | Coherence / regime detection across markets | Cross-market arbitrage and regime-gated sizing |
| `crates/neural-trader-replay` | Deterministic replay for backtests | Replay captured Kalshi streams against strategies |
| `crates/ruvector-attention` / `ruvector-attn-mincut` | Attention + MinCut for market graph clustering | Group correlated Kalshi series (e.g., all Fed-rate strikes) |
| `crates/ruvector-cnn` | Temporal convnet over order-book imbalance | Short-horizon tick prediction on Kalshi trades feed |
| `crates/ruvllm` / RuvLtra | Local inference for news/event summarization → probability prior | Provide `modelProbability` input analogous to the JS example |
| `crates/mcp-brain-server` + pi.ruv.io brain | Persistent knowledge store for contracts, resolutions, past edges | Long-memory of market behavior and strategy outcomes |
Kalshi authentication is **RSA-PSS-SHA256** signed requests, *not* HMAC. Each request requires:
```
KALSHI-ACCESS-KEY: <API key UUID>
KALSHI-ACCESS-TIMESTAMP: <unix ms>
KALSHI-ACCESS-SIGNATURE: base64( RSA-PSS-SHA256( PEM_PRIVATE_KEY, timestamp + method + path ) )
```
Because of this, secrets must be held server-side in GCS and never serialized into logs, traces, or the brain.
### Credential state (already provisioned)
Created in Google Cloud project `ruv-dev` (ADR-150 context):
| Secret | Source | Access |
|---|---|---|
| `KALSHI_API_KEY` | UUID, 16 bytes | `gcloud secrets versions access latest --secret=KALSHI_API_KEY --project=ruv-dev` |
| `KALSHI_PRIVATE_KEY_PEM` | RSA private key (1679 bytes) | same pattern |
| `KALSHI_API_URL` | `https://trading-api.kalshi.com/trade-api/v2` | same pattern |
Local development copy is at `/home/ruvultra/projects/ruvector/.kalshi/kalshi.pem`. Both `.kalshi/` and `*.pem` are already in `.gitignore`; no key material is ever committed.
## Decision
Introduce a new Rust crate **`ruvector-kalshi`** that (a) authenticates against the live Kalshi API using RSA-PSS-SHA256, (b) normalizes Kalshi events into `neural_trader_core::MarketEvent`, and (c) exposes a strategy runtime that composes attention/mincut/cnn signals with a ruvllm-supplied prior to quote and execute on binary contracts — with pi.ruv.io brain as the long-term memory.
No Python. No JS shelling. All three layers (auth, stream, strategy) are pure Rust; WASM exposure is optional via a thin `ruvector-kalshi-wasm` sibling for dashboards.
### Workspace layout
```
crates/
├── ruvector-kalshi/ # new — auth, REST, WS, secrets
│ ├── src/
│ │ ├── lib.rs
│ │ ├── auth.rs # RSA-PSS signer
│ │ ├── rest.rs # GET/POST wrappers
│ │ ├── ws.rs # tokio-tungstenite feed
│ │ ├── models.rs # Kalshi DTOs (Market, Event, Order, Fill)
│ │ ├── normalize.rs # Kalshi → neural_trader_core::MarketEvent
│ │ └── secrets.rs # gcloud + local-PEM loader
│ └── Cargo.toml
├── ruvector-kalshi-wasm/ # optional — browser-safe subset (read-only)
└── neural-trader-strategies/ # new — reusable strategy trait + Kalshi strats
└── src/
├── lib.rs # Strategy trait, PositionSizer, RiskGate
├── expected_value.rs # EV+Kelly fractional sizing
├── coherence_arb.rs # cross-market arb via neural-trader-coherence
└── attn_scalper.rs # attention+cnn short-horizon scalper
examples/
└── kalshi/ # new — runnable demos
├── list-markets.rs
├── stream-orderbook.rs
├── paper-trade.rs
└── live-trade.rs # gated behind KALSHI_ENABLE_LIVE=1
```
### Architecture
```
┌────────────────────────────────────────────────────────────────────┐
│ ruvector-kalshi (Rust) │
│ │
│ secrets::load() ──► gcloud secrets versions access (cached 5min) │
│ │ │
│ ▼ │
│ auth::Signer { api_key, rsa_private_key } │
│ │ sign(ts, method, path) → KALSHI-ACCESS-SIGNATURE │
│ ▼ │
│ rest::Client ──► GET /markets, /portfolio, /events │
│ POST /orders │
│ ws::Feed ──► wss: ticker/orderbook/trade/fill │
│ │ │
│ ▼ │
│ normalize::to_market_event() ──► neural_trader_core::MarketEvent │
└────────────────────────────────┬───────────────────────────────────┘
┌────────────────────────────────────────────────────────────────────┐
│ neural-trader-strategies │
│ │
│ ┌─────────────┐ ┌────────────────┐ ┌──────────────┐ │
│ │ EV + Kelly │ │ Coherence Arb │ │ Attn Scalper │ │
│ │ (prior from │ │ (mincut groups │ │ (cnn over │ │
│ │ ruvllm) │◄──┤ via attention)│◄──┤ imbalance) │ │
│ └─────┬───────┘ └───────┬────────┘ └──────┬───────┘ │
│ │ │ │ │
│ └──► RiskGate ◄─────┴───────────────────┘ │
│ (position cap, daily-loss kill, vol regime) │
└────────────────────────────────┬───────────────────────────────────┘
┌────────────────────────────────────────────────────────────────────┐
│ Execution + Memory │
│ │
│ Paper mode → neural-trader-replay ledger │
│ Live mode → ruvector-kalshi::rest::post_order │
│ │
│ pi.ruv.io brain: store contract outcomes, strategy P&L, market │
│ resolutions as MarketEvent-derived memories (category: pattern). │
└────────────────────────────────────────────────────────────────────┘
```
### Authentication detail
```rust
// crates/ruvector-kalshi/src/auth.rs (sketch)
use rsa::{RsaPrivateKey, pkcs1::DecodeRsaPrivateKey,
pss::SigningKey, signature::RandomizedSigner};
use sha2::Sha256;
use base64::{engine::general_purpose::STANDARD, Engine};
pub struct Signer {
api_key: String,
signing_key: SigningKey<Sha256>,
}
impl Signer {
pub fn sign(&self, ts_ms: u64, method: &str, path: &str) -> (String, String, String) {
let msg = format!("{ts_ms}{method}{path}");
let mut rng = rand::thread_rng();
let sig = self.signing_key.sign_with_rng(&mut rng, msg.as_bytes());
let sig_b64 = STANDARD.encode(sig.to_bytes());
(self.api_key.clone(), ts_ms.to_string(), sig_b64)
}
}
```
Dependencies: `rsa = "0.9"`, `sha2 = "0.10"`, `base64 = "0.22"`, `reqwest = { version = "0.12", features = ["json", "rustls-tls"] }`, `tokio-tungstenite`, `tokio`.
### Secret loading (no hardcoding, no .env commits)
```rust
// crates/ruvector-kalshi/src/secrets.rs (sketch)
pub enum SecretSource { Gcloud, LocalPem(PathBuf), Env }
pub async fn load() -> Result<Credentials> {
match env::var("KALSHI_SECRET_SOURCE").as_deref() {
Ok("local") => load_local(), // dev only
_ => load_gcloud("ruv-dev").await, // default
}
}
```
`load_gcloud` shells out to `gcloud secrets versions access latest --secret=...` with a 5-minute in-process cache. Production deploys will use Workload Identity on Cloud Run so no gcloud CLI is needed — Secret Manager SDK direct read.
### Risk gate (cheap, mandatory)
The `RiskGate` wraps every strategy and enforces, before any `post_order`:
1. **Max single-position notional**: default 10% of cash.
2. **Daily loss kill**: hard stop at 3% of starting balance (configurable).
3. **Concentration cap**: ≤ 40% across one neural-trader-coherence cluster.
4. **Min edge**: ≥ 300 bps over mid after fees (Kalshi takes 2%).
5. **Live gate**: `KALSHI_ENABLE_LIVE=1` required; absent → paper ledger only.
## Consequences
### Positive
- First production path from ruvector neural capabilities to a regulated live venue.
- Reuses existing Rust crates — no duplication, no Python sidecars.
- Kalshi events land in the canonical `MarketEvent` shape so they immediately flow through existing coherence, replay, and attention modules.
- Secrets are centrally managed in GCS; no key material in the repo, logs, or the brain.
- pi.ruv.io accumulates a long-memory of event-market behavior, feeding back into future strategies (ADR-149/150 synergy).
### Negative / risks
- **Regulatory**: Kalshi is US-only; strategy code must remain paper-only unless the operator has verified a Kalshi account. Live trading is gated behind an env flag and manual credential load.
- **Key custody**: RSA private key in GCS means any party with `ruv-dev` Secret Accessor can trade. Mitigation: restrict IAM to a single service account; rotate PEM quarterly; consider KMS-wrapped storage later.
- **Latency**: gcloud CLI shell-out is ~200 ms — acceptable at startup, unacceptable per-request. We cache and never re-read inside the hot path.
- **Rate limits**: Kalshi enforces per-endpoint limits. The `rest::Client` must include a token-bucket limiter. Replay tests catch 429 regressions.
- **Model drift**: ruvllm priors can degrade silently. The `neural-trader-replay` suite must run nightly in CI with the previous week's captured feed and compare P&L against a baseline.
### Neutral
- Adds ~2 crates to the workspace — consistent with existing `neural-trader-*` pattern.
- Optional wasm sibling is not blocking; it can come later when a dashboard needs read-only market access.
## Implementation Plan
**Phase 1 — Auth & REST (week 1)**
- `ruvector-kalshi` crate skeleton
- `auth::Signer` + round-trip test against Kalshi `/exchange/status`
- `rest::Client` with GET `/markets`, `/events`, `/portfolio`
- `examples/kalshi/list-markets.rs`
**Phase 2 — Stream & normalize (week 2)**
- `ws::Feed` subscribing to ticker, trade, orderbook channels
- `normalize::to_market_event` with unit tests covering all Kalshi event kinds
- `examples/kalshi/stream-orderbook.rs` dumping `MarketEvent` JSONL
**Phase 3 — Strategy runtime & paper trade (week 3)**
- `neural-trader-strategies` crate with `Strategy` trait + `RiskGate`
- `ExpectedValueKelly` strategy wired to ruvllm prior
- `CoherenceArb` strategy wired to `neural-trader-coherence`
- `AttentionScalper` wired to `ruvector-cnn`
- `examples/kalshi/paper-trade.rs` replays captured feed → ledger
**Phase 4 — Live & memory (week 4)**
- `rest::post_order`, cancel, amend
- Brain integration: every resolution and P&L event → `brain_share`
- `examples/kalshi/live-trade.rs` behind `KALSHI_ENABLE_LIVE=1`
- Nightly replay benchmark in CI
## Testing Strategy
- **Unit**: auth signature matches Kalshi reference vectors; normalize round-trips against fixture feeds.
- **Integration**: hit Kalshi `/exchange/status` in a gated CI job (needs a sandbox API key).
- **Replay**: `neural-trader-replay` replays captured `.jsonl` against each strategy; asserts P&L envelope.
- **Chaos**: drop WS connection, 429 storms, expired timestamps — all must fail closed (no orders).
- **Security**: `aidefence_scan` over any fields we send to the brain to strip accidental PII.
## Alternatives Considered
1. **Extend the JS example directly** — rejected; CLAUDE.md mandates Rust for all ruOS components, and we lose access to the Rust neural-trader crates.
2. **Generic "exchange" crate with Kalshi as a driver** — deferred; premature abstraction. Ship Kalshi first, extract the trait once a second venue (Polymarket) is scoped.
3. **Store PEM in the repo encrypted with age/sops** — rejected; GCS Secret Manager is the existing pattern (ADR-150, `CRATES_API_KEY`, etc.) and plays well with Cloud Run Workload Identity.
## References
- ADR-084: Neural Trader foundation (referenced by `neural-trader-core::lib.rs`)
- ADR-149: Brain performance optimizations
- ADR-150: π Brain + RuvLtra via Tailscale
- Kalshi API docs: https://trading-api.readme.io/reference/getting-started
- Existing example: `examples/neural-trader/specialized/prediction-markets.js`