Created
December 9, 2025 17:44
-
-
Save trozet/dfe6fc3a063c5be4be1822f5cd86e863 to your computer and use it in GitHub Desktop.
sbdb.rs
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| use std::collections::HashSet; | |
| use std::net::IpAddr; | |
| use once_cell::sync::Lazy; | |
| use proxy_wasm::traits::*; | |
| use proxy_wasm::types::*; | |
| use serde::Deserialize; | |
| use serde_json::Value; | |
| static DEFAULT_ALLOWED_IPS: Lazy<HashSet<IpAddr>> = Lazy::new(HashSet::new); | |
| proxy_wasm::main! {{ | |
| proxy_wasm::set_log_level(LogLevel::Info); | |
| proxy_wasm::set_root_context(|_| Box::new(OvsdbRoot::default())); | |
| }} | |
| #[derive(Default)] | |
| struct OvsdbRoot { | |
| allowed_ips: HashSet<IpAddr>, | |
| } | |
| #[derive(Deserialize)] | |
| struct PluginConfig { | |
| /// Either a list of exact IPs: ["10.0.0.10","10.0.0.11"] | |
| #[serde(default)] | |
| allowed_ips: Vec<String>, | |
| } | |
| impl RootContext for OvsdbRoot { | |
| fn on_configure(&mut self, _: usize) -> bool { | |
| // Configuration is passed as raw bytes from Envoy | |
| if let Some(conf_bytes) = self.get_configuration() { | |
| match serde_json::from_slice::<PluginConfig>(&conf_bytes) { | |
| Ok(cfg) => { | |
| let mut set = HashSet::new(); | |
| for s in cfg.allowed_ips { | |
| if let Ok(ip) = s.parse::<IpAddr>() { | |
| set.insert(ip); | |
| } else { | |
| self.log(LogLevel::Warn, &format!("Invalid IP in config: {}", s)); | |
| } | |
| } | |
| self.log( | |
| LogLevel::Info, | |
| &format!("Configured {} allowed IPs", set.len()), | |
| ); | |
| self.allowed_ips = set; | |
| } | |
| Err(e) => { | |
| self.log( | |
| LogLevel::Error, | |
| &format!("Failed to parse config JSON: {}", e), | |
| ); | |
| self.allowed_ips = DEFAULT_ALLOWED_IPS.clone(); | |
| } | |
| } | |
| } | |
| true | |
| } | |
| fn create_stream_context(&self, context_id: u32) -> Option<Box<dyn StreamContext>> { | |
| let ctx = OvsdbStream { | |
| context_id, | |
| buffer: Vec::new(), | |
| allowed_ips: self.allowed_ips.clone(), | |
| }; | |
| Some(Box::new(ctx)) | |
| } | |
| } | |
| impl Context for OvsdbRoot {} | |
| struct OvsdbStream { | |
| context_id: u32, | |
| buffer: Vec<u8>, | |
| allowed_ips: HashSet<IpAddr>, | |
| } | |
| impl Context for OvsdbStream {} | |
| impl StreamContext for OvsdbStream { | |
| /// Data from client -> SBDB | |
| fn on_downstream_data(&mut self, data_size: usize, end_of_stream: bool) -> Action { | |
| if let Some(data) = self.get_downstream_data(0, data_size) { | |
| self.buffer.extend_from_slice(&data); | |
| } | |
| // Process complete JSON lines | |
| while let Some(pos) = self.buffer.iter().position(|b| *b == b'\n') { | |
| let line = self.buffer.drain(..=pos).collect::<Vec<u8>>(); | |
| if !self.handle_line(&line) { | |
| // Reject: close the connection | |
| self.log( | |
| LogLevel::Warn, | |
| &format!( | |
| "[ctx {}] Rejecting OVSDB transact for disallowed chassis IP", | |
| self.context_id | |
| ), | |
| ); | |
| // Drop data by not forwarding it, and close both sides | |
| self.close_downstream(); | |
| self.close_upstream(); | |
| return Action::Pause; | |
| } | |
| } | |
| if end_of_stream { | |
| // Nothing special, just pass through | |
| } | |
| Action::Continue | |
| } | |
| /// Data from SBDB -> client (we don't need to inspect for this use case) | |
| fn on_upstream_data(&mut self, _data_size: usize, _end_of_stream: bool) -> Action { | |
| Action::Continue | |
| } | |
| } | |
| impl OvsdbStream { | |
| fn handle_line(&mut self, line: &[u8]) -> bool { | |
| // Trim whitespace | |
| let line = match std::str::from_utf8(line) { | |
| Ok(s) => s.trim(), | |
| Err(_) => return true, // ignore malformed | |
| }; | |
| if line.is_empty() { | |
| return true; | |
| } | |
| let v: Value = match serde_json::from_str(line) { | |
| Ok(v) => v, | |
| Err(e) => { | |
| self.log( | |
| LogLevel::Warn, | |
| &format!("[ctx {}] Failed to parse JSON: {}", self.context_id, e), | |
| ); | |
| return true; // don't block unknown traffic | |
| } | |
| }; | |
| // OVSDB JSON-RPC transact: { "method": "transact", "params": [...] } | |
| if v.get("method").and_then(|m| m.as_str()) != Some("transact") { | |
| return true; | |
| } | |
| let params = match v.get("params").and_then(|p| p.as_array()) { | |
| Some(a) => a, | |
| None => return true, | |
| }; | |
| // params[0] is DB name, params[1] is operations array | |
| if params.len() < 2 { | |
| return true; | |
| } | |
| let ops = match params[1].as_array() { | |
| Some(a) => a, | |
| None => return true, | |
| }; | |
| for op in ops { | |
| let op_type = op.get("op").and_then(|x| x.as_str()).unwrap_or(""); | |
| let table = op.get("table").and_then(|x| x.as_str()).unwrap_or(""); | |
| // We care about insert/update on Chassis/Encap | |
| if (op_type == "insert" || op_type == "update") | |
| && (table == "Chassis" || table == "Encap") | |
| { | |
| if let Some(ip) = self.extract_ip_from_op(op, table) { | |
| if !self.allowed_ips.contains(&ip) { | |
| self.log( | |
| LogLevel::Warn, | |
| &format!( | |
| "[ctx {}] Disallowed chassis/encap IP observed: {}", | |
| self.context_id, ip | |
| ), | |
| ); | |
| return false; // reject whole transaction | |
| } else { | |
| self.log( | |
| LogLevel::Info, | |
| &format!( | |
| "[ctx {}] Allowed chassis/encap IP: {}", | |
| self.context_id, ip | |
| ), | |
| ); | |
| } | |
| } | |
| } | |
| } | |
| true | |
| } | |
| /// Extract chassis IP from the operation. You’ll likely want to | |
| /// tune this for your environment: | |
| /// | |
| /// - For Encap: row.ip | |
| /// - For Chassis: row.hostname or row.other_config["ovn-remote-ip"] etc. | |
| fn extract_ip_from_op(&self, op: &Value, table: &str) -> Option<IpAddr> { | |
| // Try "row" first | |
| if let Some(row) = op.get("row") { | |
| if let Some(ip_str) = match table { | |
| "Encap" => row.get("ip").and_then(|x| x.as_str()), | |
| "Chassis" => row | |
| .get("hostname") | |
| .and_then(|x| x.as_str()), // adjust as needed | |
| _ => None, | |
| } { | |
| if let Ok(ip) = ip_str.parse::<IpAddr>() { | |
| return Some(ip); | |
| } | |
| } | |
| } | |
| // For updates, IP might be in "where" or "set" clauses; | |
| // extend here if needed. | |
| None | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment