Skip to content

Instantly share code, notes, and snippets.

@ayjayt
Created February 23, 2026 21:03
Show Gist options
  • Select an option

  • Save ayjayt/8599b7b29300c83576b6a8091b64eb5d to your computer and use it in GitHub Desktop.

Select an option

Save ayjayt/8599b7b29300c83576b6a8091b64eb5d to your computer and use it in GitHub Desktop.
// YOU WRITE (or derive):
impl<'de> Deserialize<'de> for T {
fn deserialize<D>(deserializer: D) -> Result<T, D::Error>
where D: Deserializer<'de>
}
// FORMAT CRATE PROVIDES:
serde_json::Deserializer<'a> // implements Deserializer<'a>
// GLUE FUNCTION (serde_json):
fn from_str<'a, T>(s: &'a str) -> Result<T, serde_json::Error>
where
T: Deserialize<'a>
{
let de: serde_json::Deserializer<'a> = serde_json::Deserializer::from_str(s);
// (in real code it’s often &mut de; same idea)
T::deserialize(de)
}
// DESERIALIZER TRAIT METHOD SHAPE:
trait Deserializer<'de> {
type Error;
fn deserialize_struct<V>(self, name: &'static str, fields: &'static [&'static str], visitor: V)
-> Result<V::Value, Self::Error>
where V: Visitor<'de>;
// ... deserialize_map/seq/str/etc
}
// WHAT THE DERIVE GENERATES (conceptually):
T::deserialize(de) {
// creates a Visitor that knows how to build T
// then asks the deserializer to drive it
de.deserialize_struct("T", &["field1","field2",...], TVisitor)
}
// WHO BUILDS THE VALUE:
trait Visitor<'de> {
type Value; // here: Value = T
fn visit_map<A>(self, map: A) -> Result<T, A::Error>
where A: MapAccess<'de>;
// ... visit_str/visit_seq/etc
}
/* CONTROL FLOW (runtime):
input &str (lifetime 'a)
-> serde_json::Deserializer<'a> (parses JSON)
-> calls visitor methods while parsing
-> visitor constructs T
-> returns T out of T::deserialize
-> from_str returns T
*/
use prost::Message;
use prost_types::Any;
use serde::Deserialize;
use std::{
collections::{HashMap, HashSet},
sync::Arc,
time::Duration,
};
use tokio::sync::{mpsc, watch, RwLock};
use tonic::{transport::Server, Request, Response, Status};
use xds_api::envoy::config::{
cluster::v3::{cluster::ClusterDiscoveryType, Cluster, EdsClusterConfig},
core::v3::{
config_source::ConfigSourceSpecifier, AggregatedConfigSource, ConfigSource, SocketAddress,
},
endpoint::v3::{ClusterLoadAssignment, Endpoint, LbEndpoint, LocalityLbEndpoints},
route::v3::{Route, RouteAction, RouteConfiguration, RouteMatch, VirtualHost},
};
use xds_api::envoy::service::discovery::v3::{
aggregated_discovery_service_server::{AggregatedDiscoveryService, AggregatedDiscoveryServiceServer},
DiscoveryRequest, DiscoveryResponse,
};
const TYPE_CDS: &str = "type.googleapis.com/envoy.config.cluster.v3.Cluster";
const TYPE_EDS: &str = "type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment";
const TYPE_RDS: &str = "type.googleapis.com/envoy.config.route.v3.RouteConfiguration";
#[derive(Deserialize, Clone)]
struct StateFile {
host_domains: Vec<String>,
base_cluster: ClusterFile,
temp: Option<TempFile>,
}
#[derive(Deserialize, Clone)]
struct TempFile {
enabled: bool,
prefix: String,
cluster: ClusterFile,
}
#[derive(Deserialize, Clone)]
struct ClusterFile {
name: String,
endpoints: Vec<String>, // ["ip:port", ...]
}
#[derive(Clone, Default)]
struct Snapshot {
version: u64,
clusters: HashMap<String, Cluster>,
eds: HashMap<String, ClusterLoadAssignment>,
routes: HashMap<String, RouteConfiguration>, // name -> config
}
#[derive(Clone)]
struct Shared {
snap: Arc<RwLock<Snapshot>>,
tick: watch::Sender<u64>,
}
fn pack_any<T: Message>(type_url: &str, msg: &T) -> Any {
Any { type_url: type_url.to_string(), value: msg.encode_to_vec() }
}
fn parse_hostport(s: &str) -> (String, u32) {
let (h, p) = s.rsplit_once(':').expect("endpoint must be host:port");
(h.to_string(), p.parse().expect("bad port"))
}
fn mk_eds(cluster_name: &str, eps: &[String]) -> ClusterLoadAssignment {
let lb_endpoints: Vec<LbEndpoint> = eps
.iter()
.map(|s| {
let (host, port) = parse_hostport(s);
LbEndpoint {
host_identifier: Some(
xds_api::envoy::config::endpoint::v3::lb_endpoint::HostIdentifier::Endpoint(
Endpoint {
address: Some(xds_api::envoy::config::core::v3::Address {
address: Some(
xds_api::envoy::config::core::v3::address::Address::SocketAddress(
SocketAddress {
address: host,
port_specifier: Some(
xds_api::envoy::config::core::v3::socket_address::PortSpecifier::PortValue(port),
),
..Default::default()
},
),
),
}),
..Default::default()
},
),
),
..Default::default()
}
})
.collect();
ClusterLoadAssignment {
cluster_name: cluster_name.to_string(),
endpoints: vec![LocalityLbEndpoints { lb_endpoints, ..Default::default() }],
..Default::default()
}
}
fn mk_cluster_eds(name: &str) -> Cluster {
Cluster {
name: name.to_string(),
connect_timeout: Some(xds_api::google::protobuf::Duration { seconds: 1, nanos: 0 }),
cluster_discovery_type: Some(ClusterDiscoveryType::Type(Cluster::EDS as i32)),
eds_cluster_config: Some(EdsClusterConfig {
eds_config: Some(ConfigSource {
resource_api_version: xds_api::envoy::config::core::v3::ApiVersion::V3 as i32,
config_source_specifier: Some(ConfigSourceSpecifier::Ads(AggregatedConfigSource {})),
..Default::default()
}),
service_name: "".to_string(), // empty => use cluster name
}),
lb_policy: xds_api::envoy::config::cluster::v3::cluster::LbPolicy::RoundRobin as i32,
..Default::default()
}
}
fn build_snapshot(sf: &StateFile) -> Snapshot {
let mut snap = Snapshot::default();
// Base cluster + endpoints
snap.clusters.insert(sf.base_cluster.name.clone(), mk_cluster_eds(&sf.base_cluster.name));
snap.eds.insert(sf.base_cluster.name.clone(), mk_eds(&sf.base_cluster.name, &sf.base_cluster.endpoints));
// Optional temp cluster + endpoints
if let Some(t) = &sf.temp {
if t.enabled {
snap.clusters.insert(t.cluster.name.clone(), mk_cluster_eds(&t.cluster.name));
snap.eds.insert(t.cluster.name.clone(), mk_eds(&t.cluster.name, &t.cluster.endpoints));
}
}
// Routes (single RouteConfiguration named "local_route")
let mut routes: Vec<Route> = vec![];
// temp route first (more specific)
if let Some(t) = &sf.temp {
if t.enabled {
routes.push(Route {
r#match: Some(RouteMatch {
path_specifier: Some(xds_api::envoy::config::route::v3::route_match::PathSpecifier::Prefix(t.prefix.clone())),
..Default::default()
}),
action: Some(xds_api::envoy::config::route::v3::route::Action::Route(RouteAction {
cluster_specifier: Some(
xds_api::envoy::config::route::v3::route_action::ClusterSpecifier::Cluster(t.cluster.name.clone()),
),
..Default::default()
})),
..Default::default()
});
}
}
// fallback route
routes.push(Route {
r#match: Some(RouteMatch {
path_specifier: Some(xds_api::envoy::config::route::v3::route_match::PathSpecifier::Prefix("/".to_string())),
..Default::default()
}),
action: Some(xds_api::envoy::config::route::v3::route::Action::Route(RouteAction {
cluster_specifier: Some(
xds_api::envoy::config::route::v3::route_action::ClusterSpecifier::Cluster(sf.base_cluster.name.clone()),
),
..Default::default()
})),
..Default::default()
});
let vhost = VirtualHost {
name: "vh0".to_string(),
domains: if sf.host_domains.is_empty() { vec!["*".to_string()] } else { sf.host_domains.clone() },
routes,
..Default::default()
};
snap.routes.insert(
"local_route".to_string(),
RouteConfiguration {
name: "local_route".to_string(),
virtual_hosts: vec![vhost],
..Default::default()
},
);
snap
}
async fn load_state(path: &str) -> StateFile {
let s = tokio::fs::read_to_string(path).await.expect("read ads_state.json");
serde_json::from_str(&s).expect("parse ads_state.json")
}
async fn install_snapshot(shared: &Shared, mut snap: Snapshot) {
let mut w = shared.snap.write().await;
snap.version = w.version + 1;
*w = snap;
let _ = shared.tick.send(w.version);
}
#[derive(Clone)]
struct Ads { shared: Shared }
#[tonic::async_trait]
impl AggregatedDiscoveryService for Ads {
type StreamAggregatedResourcesStream = tokio_stream::wrappers::ReceiverStream<Result<DiscoveryResponse, Status>>;
async fn stream_aggregated_resources(
&self,
req: Request<tonic::Streaming<DiscoveryRequest>>,
) -> Result<Response<Self::StreamAggregatedResourcesStream>, Status> {
let mut inbound = req.into_inner();
let (tx, rx) = mpsc::channel::<Result<DiscoveryResponse, Status>>(32);
let mut tick_rx = self.shared.tick.subscribe();
let shared = self.shared.clone();
let subscribed = Arc::new(RwLock::new(HashSet::<String>::new()));
let subscribed2 = subscribed.clone();
// Reader: record requested type_urls and respond immediately
tokio::spawn(async move {
while let Some(Ok(dr)) = inbound.message().await {
let t = dr.type_url.clone();
subscribed.write().await.insert(t.clone());
if let Err(e) = send_type(&shared, &tx, &t).await {
let _ = tx.send(Err(e)).await;
return;
}
}
});
// Pusher: whenever snapshot changes, resend current state for any subscribed types
tokio::spawn(async move {
let mut nonce: u64 = 1;
loop {
if tick_rx.changed().await.is_err() { return; }
let types: Vec<String> = subscribed2.read().await.iter().cloned().collect();
for t in types {
if let Err(_) = send_type_with_nonce(&shared, &tx, &t, nonce).await {
return;
}
nonce += 1;
}
}
});
Ok(Response::new(tokio_stream::wrappers::ReceiverStream::new(rx)))
}
type DeltaAggregatedResourcesStream =
tokio_stream::wrappers::ReceiverStream<Result<xds_api::envoy::service::discovery::v3::DeltaDiscoveryResponse, Status>>;
async fn delta_aggregated_resources(
&self,
_req: Request<tonic::Streaming<xds_api::envoy::service::discovery::v3::DeltaDiscoveryRequest>>,
) -> Result<Response<Self::DeltaAggregatedResourcesStream>, Status> {
Err(Status::unimplemented("delta xDS not implemented"))
}
}
async fn send_type(shared: &Shared, tx: &mpsc::Sender<Result<DiscoveryResponse, Status>>, type_url: &str) -> Result<(), Status> {
send_type_with_nonce(shared, tx, type_url, 1).await
}
async fn send_type_with_nonce(
shared: &Shared,
tx: &mpsc::Sender<Result<DiscoveryResponse, Status>>,
type_url: &str,
nonce: u64,
) -> Result<(), Status> {
let s = shared.snap.read().await;
let version = s.version.to_string();
let resources: Vec<Any> = match type_url {
TYPE_CDS => s.clusters.values().map(|c| pack_any(TYPE_CDS, c)).collect(),
TYPE_EDS => s.eds.values().map(|e| pack_any(TYPE_EDS, e)).collect(),
TYPE_RDS => s.routes.values().map(|r| pack_any(TYPE_RDS, r)).collect(),
_ => vec![],
};
let resp = DiscoveryResponse {
version_info: version,
resources,
type_url: type_url.to_string(),
nonce: nonce.to_string(),
..Default::default()
};
tx.send(Ok(resp)).await.map_err(|_| Status::internal("stream closed"))?;
Ok(())
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let (tick, _rx) = watch::channel::<u64>(0);
let shared = Shared {
snap: Arc::new(RwLock::new(Snapshot::default())),
tick,
};
// initial snapshot
let sf = load_state("ads_state.json").await;
install_snapshot(&shared, build_snapshot(&sf)).await;
// poll the file once per second; reload if changed
{
let shared = shared.clone();
tokio::spawn(async move {
let mut last = String::new();
loop {
let cur = tokio::fs::read_to_string("ads_state.json").await.unwrap_or_default();
if cur != last {
last = cur.clone();
if let Ok(sf) = serde_json::from_str::<StateFile>(&cur) {
install_snapshot(&shared, build_snapshot(&sf)).await;
}
}
tokio::time::sleep(Duration::from_secs(1)).await;
}
});
}
let grpc_addr = "0.0.0.0:50051".parse()?;
Server::builder()
.add_service(AggregatedDiscoveryServiceServer::new(Ads { shared }))
.serve(grpc_addr)
.await?;
Ok(())
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment