Skip to content

Instantly share code, notes, and snippets.

@trozet
Created December 9, 2025 17:44
Show Gist options
  • Select an option

  • Save trozet/dfe6fc3a063c5be4be1822f5cd86e863 to your computer and use it in GitHub Desktop.

Select an option

Save trozet/dfe6fc3a063c5be4be1822f5cd86e863 to your computer and use it in GitHub Desktop.
sbdb.rs
use std::collections::HashSet;
use std::net::IpAddr;
use once_cell::sync::Lazy;
use proxy_wasm::traits::*;
use proxy_wasm::types::*;
use serde::Deserialize;
use serde_json::Value;
static DEFAULT_ALLOWED_IPS: Lazy<HashSet<IpAddr>> = Lazy::new(HashSet::new);
proxy_wasm::main! {{
proxy_wasm::set_log_level(LogLevel::Info);
proxy_wasm::set_root_context(|_| Box::new(OvsdbRoot::default()));
}}
#[derive(Default)]
struct OvsdbRoot {
allowed_ips: HashSet<IpAddr>,
}
#[derive(Deserialize)]
struct PluginConfig {
/// Either a list of exact IPs: ["10.0.0.10","10.0.0.11"]
#[serde(default)]
allowed_ips: Vec<String>,
}
impl RootContext for OvsdbRoot {
fn on_configure(&mut self, _: usize) -> bool {
// Configuration is passed as raw bytes from Envoy
if let Some(conf_bytes) = self.get_configuration() {
match serde_json::from_slice::<PluginConfig>(&conf_bytes) {
Ok(cfg) => {
let mut set = HashSet::new();
for s in cfg.allowed_ips {
if let Ok(ip) = s.parse::<IpAddr>() {
set.insert(ip);
} else {
self.log(LogLevel::Warn, &format!("Invalid IP in config: {}", s));
}
}
self.log(
LogLevel::Info,
&format!("Configured {} allowed IPs", set.len()),
);
self.allowed_ips = set;
}
Err(e) => {
self.log(
LogLevel::Error,
&format!("Failed to parse config JSON: {}", e),
);
self.allowed_ips = DEFAULT_ALLOWED_IPS.clone();
}
}
}
true
}
fn create_stream_context(&self, context_id: u32) -> Option<Box<dyn StreamContext>> {
let ctx = OvsdbStream {
context_id,
buffer: Vec::new(),
allowed_ips: self.allowed_ips.clone(),
};
Some(Box::new(ctx))
}
}
impl Context for OvsdbRoot {}
struct OvsdbStream {
context_id: u32,
buffer: Vec<u8>,
allowed_ips: HashSet<IpAddr>,
}
impl Context for OvsdbStream {}
impl StreamContext for OvsdbStream {
/// Data from client -> SBDB
fn on_downstream_data(&mut self, data_size: usize, end_of_stream: bool) -> Action {
if let Some(data) = self.get_downstream_data(0, data_size) {
self.buffer.extend_from_slice(&data);
}
// Process complete JSON lines
while let Some(pos) = self.buffer.iter().position(|b| *b == b'\n') {
let line = self.buffer.drain(..=pos).collect::<Vec<u8>>();
if !self.handle_line(&line) {
// Reject: close the connection
self.log(
LogLevel::Warn,
&format!(
"[ctx {}] Rejecting OVSDB transact for disallowed chassis IP",
self.context_id
),
);
// Drop data by not forwarding it, and close both sides
self.close_downstream();
self.close_upstream();
return Action::Pause;
}
}
if end_of_stream {
// Nothing special, just pass through
}
Action::Continue
}
/// Data from SBDB -> client (we don't need to inspect for this use case)
fn on_upstream_data(&mut self, _data_size: usize, _end_of_stream: bool) -> Action {
Action::Continue
}
}
impl OvsdbStream {
fn handle_line(&mut self, line: &[u8]) -> bool {
// Trim whitespace
let line = match std::str::from_utf8(line) {
Ok(s) => s.trim(),
Err(_) => return true, // ignore malformed
};
if line.is_empty() {
return true;
}
let v: Value = match serde_json::from_str(line) {
Ok(v) => v,
Err(e) => {
self.log(
LogLevel::Warn,
&format!("[ctx {}] Failed to parse JSON: {}", self.context_id, e),
);
return true; // don't block unknown traffic
}
};
// OVSDB JSON-RPC transact: { "method": "transact", "params": [...] }
if v.get("method").and_then(|m| m.as_str()) != Some("transact") {
return true;
}
let params = match v.get("params").and_then(|p| p.as_array()) {
Some(a) => a,
None => return true,
};
// params[0] is DB name, params[1] is operations array
if params.len() < 2 {
return true;
}
let ops = match params[1].as_array() {
Some(a) => a,
None => return true,
};
for op in ops {
let op_type = op.get("op").and_then(|x| x.as_str()).unwrap_or("");
let table = op.get("table").and_then(|x| x.as_str()).unwrap_or("");
// We care about insert/update on Chassis/Encap
if (op_type == "insert" || op_type == "update")
&& (table == "Chassis" || table == "Encap")
{
if let Some(ip) = self.extract_ip_from_op(op, table) {
if !self.allowed_ips.contains(&ip) {
self.log(
LogLevel::Warn,
&format!(
"[ctx {}] Disallowed chassis/encap IP observed: {}",
self.context_id, ip
),
);
return false; // reject whole transaction
} else {
self.log(
LogLevel::Info,
&format!(
"[ctx {}] Allowed chassis/encap IP: {}",
self.context_id, ip
),
);
}
}
}
}
true
}
/// Extract chassis IP from the operation. You’ll likely want to
/// tune this for your environment:
///
/// - For Encap: row.ip
/// - For Chassis: row.hostname or row.other_config["ovn-remote-ip"] etc.
fn extract_ip_from_op(&self, op: &Value, table: &str) -> Option<IpAddr> {
// Try "row" first
if let Some(row) = op.get("row") {
if let Some(ip_str) = match table {
"Encap" => row.get("ip").and_then(|x| x.as_str()),
"Chassis" => row
.get("hostname")
.and_then(|x| x.as_str()), // adjust as needed
_ => None,
} {
if let Ok(ip) = ip_str.parse::<IpAddr>() {
return Some(ip);
}
}
}
// For updates, IP might be in "where" or "set" clauses;
// extend here if needed.
None
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment