Created
February 23, 2026 21:03
-
-
Save ayjayt/8599b7b29300c83576b6a8091b64eb5d to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| // YOU WRITE (or derive): | |
| impl<'de> Deserialize<'de> for T { | |
| fn deserialize<D>(deserializer: D) -> Result<T, D::Error> | |
| where D: Deserializer<'de> | |
| } | |
| // FORMAT CRATE PROVIDES: | |
| serde_json::Deserializer<'a> // implements Deserializer<'a> | |
| // GLUE FUNCTION (serde_json): | |
| fn from_str<'a, T>(s: &'a str) -> Result<T, serde_json::Error> | |
| where | |
| T: Deserialize<'a> | |
| { | |
| let de: serde_json::Deserializer<'a> = serde_json::Deserializer::from_str(s); | |
| // (in real code it’s often &mut de; same idea) | |
| T::deserialize(de) | |
| } | |
| // DESERIALIZER TRAIT METHOD SHAPE: | |
| trait Deserializer<'de> { | |
| type Error; | |
| fn deserialize_struct<V>(self, name: &'static str, fields: &'static [&'static str], visitor: V) | |
| -> Result<V::Value, Self::Error> | |
| where V: Visitor<'de>; | |
| // ... deserialize_map/seq/str/etc | |
| } | |
| // WHAT THE DERIVE GENERATES (conceptually): | |
| T::deserialize(de) { | |
| // creates a Visitor that knows how to build T | |
| // then asks the deserializer to drive it | |
| de.deserialize_struct("T", &["field1","field2",...], TVisitor) | |
| } | |
| // WHO BUILDS THE VALUE: | |
| trait Visitor<'de> { | |
| type Value; // here: Value = T | |
| fn visit_map<A>(self, map: A) -> Result<T, A::Error> | |
| where A: MapAccess<'de>; | |
| // ... visit_str/visit_seq/etc | |
| } | |
| /* CONTROL FLOW (runtime): | |
| input &str (lifetime 'a) | |
| -> serde_json::Deserializer<'a> (parses JSON) | |
| -> calls visitor methods while parsing | |
| -> visitor constructs T | |
| -> returns T out of T::deserialize | |
| -> from_str returns T | |
| */ | |
| use prost::Message; | |
| use prost_types::Any; | |
| use serde::Deserialize; | |
| use std::{ | |
| collections::{HashMap, HashSet}, | |
| sync::Arc, | |
| time::Duration, | |
| }; | |
| use tokio::sync::{mpsc, watch, RwLock}; | |
| use tonic::{transport::Server, Request, Response, Status}; | |
| use xds_api::envoy::config::{ | |
| cluster::v3::{cluster::ClusterDiscoveryType, Cluster, EdsClusterConfig}, | |
| core::v3::{ | |
| config_source::ConfigSourceSpecifier, AggregatedConfigSource, ConfigSource, SocketAddress, | |
| }, | |
| endpoint::v3::{ClusterLoadAssignment, Endpoint, LbEndpoint, LocalityLbEndpoints}, | |
| route::v3::{Route, RouteAction, RouteConfiguration, RouteMatch, VirtualHost}, | |
| }; | |
| use xds_api::envoy::service::discovery::v3::{ | |
| aggregated_discovery_service_server::{AggregatedDiscoveryService, AggregatedDiscoveryServiceServer}, | |
| DiscoveryRequest, DiscoveryResponse, | |
| }; | |
| const TYPE_CDS: &str = "type.googleapis.com/envoy.config.cluster.v3.Cluster"; | |
| const TYPE_EDS: &str = "type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment"; | |
| const TYPE_RDS: &str = "type.googleapis.com/envoy.config.route.v3.RouteConfiguration"; | |
| #[derive(Deserialize, Clone)] | |
| struct StateFile { | |
| host_domains: Vec<String>, | |
| base_cluster: ClusterFile, | |
| temp: Option<TempFile>, | |
| } | |
| #[derive(Deserialize, Clone)] | |
| struct TempFile { | |
| enabled: bool, | |
| prefix: String, | |
| cluster: ClusterFile, | |
| } | |
| #[derive(Deserialize, Clone)] | |
| struct ClusterFile { | |
| name: String, | |
| endpoints: Vec<String>, // ["ip:port", ...] | |
| } | |
| #[derive(Clone, Default)] | |
| struct Snapshot { | |
| version: u64, | |
| clusters: HashMap<String, Cluster>, | |
| eds: HashMap<String, ClusterLoadAssignment>, | |
| routes: HashMap<String, RouteConfiguration>, // name -> config | |
| } | |
| #[derive(Clone)] | |
| struct Shared { | |
| snap: Arc<RwLock<Snapshot>>, | |
| tick: watch::Sender<u64>, | |
| } | |
| fn pack_any<T: Message>(type_url: &str, msg: &T) -> Any { | |
| Any { type_url: type_url.to_string(), value: msg.encode_to_vec() } | |
| } | |
| fn parse_hostport(s: &str) -> (String, u32) { | |
| let (h, p) = s.rsplit_once(':').expect("endpoint must be host:port"); | |
| (h.to_string(), p.parse().expect("bad port")) | |
| } | |
| fn mk_eds(cluster_name: &str, eps: &[String]) -> ClusterLoadAssignment { | |
| let lb_endpoints: Vec<LbEndpoint> = eps | |
| .iter() | |
| .map(|s| { | |
| let (host, port) = parse_hostport(s); | |
| LbEndpoint { | |
| host_identifier: Some( | |
| xds_api::envoy::config::endpoint::v3::lb_endpoint::HostIdentifier::Endpoint( | |
| Endpoint { | |
| address: Some(xds_api::envoy::config::core::v3::Address { | |
| address: Some( | |
| xds_api::envoy::config::core::v3::address::Address::SocketAddress( | |
| SocketAddress { | |
| address: host, | |
| port_specifier: Some( | |
| xds_api::envoy::config::core::v3::socket_address::PortSpecifier::PortValue(port), | |
| ), | |
| ..Default::default() | |
| }, | |
| ), | |
| ), | |
| }), | |
| ..Default::default() | |
| }, | |
| ), | |
| ), | |
| ..Default::default() | |
| } | |
| }) | |
| .collect(); | |
| ClusterLoadAssignment { | |
| cluster_name: cluster_name.to_string(), | |
| endpoints: vec![LocalityLbEndpoints { lb_endpoints, ..Default::default() }], | |
| ..Default::default() | |
| } | |
| } | |
| fn mk_cluster_eds(name: &str) -> Cluster { | |
| Cluster { | |
| name: name.to_string(), | |
| connect_timeout: Some(xds_api::google::protobuf::Duration { seconds: 1, nanos: 0 }), | |
| cluster_discovery_type: Some(ClusterDiscoveryType::Type(Cluster::EDS as i32)), | |
| eds_cluster_config: Some(EdsClusterConfig { | |
| eds_config: Some(ConfigSource { | |
| resource_api_version: xds_api::envoy::config::core::v3::ApiVersion::V3 as i32, | |
| config_source_specifier: Some(ConfigSourceSpecifier::Ads(AggregatedConfigSource {})), | |
| ..Default::default() | |
| }), | |
| service_name: "".to_string(), // empty => use cluster name | |
| }), | |
| lb_policy: xds_api::envoy::config::cluster::v3::cluster::LbPolicy::RoundRobin as i32, | |
| ..Default::default() | |
| } | |
| } | |
| fn build_snapshot(sf: &StateFile) -> Snapshot { | |
| let mut snap = Snapshot::default(); | |
| // Base cluster + endpoints | |
| snap.clusters.insert(sf.base_cluster.name.clone(), mk_cluster_eds(&sf.base_cluster.name)); | |
| snap.eds.insert(sf.base_cluster.name.clone(), mk_eds(&sf.base_cluster.name, &sf.base_cluster.endpoints)); | |
| // Optional temp cluster + endpoints | |
| if let Some(t) = &sf.temp { | |
| if t.enabled { | |
| snap.clusters.insert(t.cluster.name.clone(), mk_cluster_eds(&t.cluster.name)); | |
| snap.eds.insert(t.cluster.name.clone(), mk_eds(&t.cluster.name, &t.cluster.endpoints)); | |
| } | |
| } | |
| // Routes (single RouteConfiguration named "local_route") | |
| let mut routes: Vec<Route> = vec![]; | |
| // temp route first (more specific) | |
| if let Some(t) = &sf.temp { | |
| if t.enabled { | |
| routes.push(Route { | |
| r#match: Some(RouteMatch { | |
| path_specifier: Some(xds_api::envoy::config::route::v3::route_match::PathSpecifier::Prefix(t.prefix.clone())), | |
| ..Default::default() | |
| }), | |
| action: Some(xds_api::envoy::config::route::v3::route::Action::Route(RouteAction { | |
| cluster_specifier: Some( | |
| xds_api::envoy::config::route::v3::route_action::ClusterSpecifier::Cluster(t.cluster.name.clone()), | |
| ), | |
| ..Default::default() | |
| })), | |
| ..Default::default() | |
| }); | |
| } | |
| } | |
| // fallback route | |
| routes.push(Route { | |
| r#match: Some(RouteMatch { | |
| path_specifier: Some(xds_api::envoy::config::route::v3::route_match::PathSpecifier::Prefix("/".to_string())), | |
| ..Default::default() | |
| }), | |
| action: Some(xds_api::envoy::config::route::v3::route::Action::Route(RouteAction { | |
| cluster_specifier: Some( | |
| xds_api::envoy::config::route::v3::route_action::ClusterSpecifier::Cluster(sf.base_cluster.name.clone()), | |
| ), | |
| ..Default::default() | |
| })), | |
| ..Default::default() | |
| }); | |
| let vhost = VirtualHost { | |
| name: "vh0".to_string(), | |
| domains: if sf.host_domains.is_empty() { vec!["*".to_string()] } else { sf.host_domains.clone() }, | |
| routes, | |
| ..Default::default() | |
| }; | |
| snap.routes.insert( | |
| "local_route".to_string(), | |
| RouteConfiguration { | |
| name: "local_route".to_string(), | |
| virtual_hosts: vec![vhost], | |
| ..Default::default() | |
| }, | |
| ); | |
| snap | |
| } | |
| async fn load_state(path: &str) -> StateFile { | |
| let s = tokio::fs::read_to_string(path).await.expect("read ads_state.json"); | |
| serde_json::from_str(&s).expect("parse ads_state.json") | |
| } | |
| async fn install_snapshot(shared: &Shared, mut snap: Snapshot) { | |
| let mut w = shared.snap.write().await; | |
| snap.version = w.version + 1; | |
| *w = snap; | |
| let _ = shared.tick.send(w.version); | |
| } | |
| #[derive(Clone)] | |
| struct Ads { shared: Shared } | |
| #[tonic::async_trait] | |
| impl AggregatedDiscoveryService for Ads { | |
| type StreamAggregatedResourcesStream = tokio_stream::wrappers::ReceiverStream<Result<DiscoveryResponse, Status>>; | |
| async fn stream_aggregated_resources( | |
| &self, | |
| req: Request<tonic::Streaming<DiscoveryRequest>>, | |
| ) -> Result<Response<Self::StreamAggregatedResourcesStream>, Status> { | |
| let mut inbound = req.into_inner(); | |
| let (tx, rx) = mpsc::channel::<Result<DiscoveryResponse, Status>>(32); | |
| let mut tick_rx = self.shared.tick.subscribe(); | |
| let shared = self.shared.clone(); | |
| let subscribed = Arc::new(RwLock::new(HashSet::<String>::new())); | |
| let subscribed2 = subscribed.clone(); | |
| // Reader: record requested type_urls and respond immediately | |
| tokio::spawn(async move { | |
| while let Some(Ok(dr)) = inbound.message().await { | |
| let t = dr.type_url.clone(); | |
| subscribed.write().await.insert(t.clone()); | |
| if let Err(e) = send_type(&shared, &tx, &t).await { | |
| let _ = tx.send(Err(e)).await; | |
| return; | |
| } | |
| } | |
| }); | |
| // Pusher: whenever snapshot changes, resend current state for any subscribed types | |
| tokio::spawn(async move { | |
| let mut nonce: u64 = 1; | |
| loop { | |
| if tick_rx.changed().await.is_err() { return; } | |
| let types: Vec<String> = subscribed2.read().await.iter().cloned().collect(); | |
| for t in types { | |
| if let Err(_) = send_type_with_nonce(&shared, &tx, &t, nonce).await { | |
| return; | |
| } | |
| nonce += 1; | |
| } | |
| } | |
| }); | |
| Ok(Response::new(tokio_stream::wrappers::ReceiverStream::new(rx))) | |
| } | |
| type DeltaAggregatedResourcesStream = | |
| tokio_stream::wrappers::ReceiverStream<Result<xds_api::envoy::service::discovery::v3::DeltaDiscoveryResponse, Status>>; | |
| async fn delta_aggregated_resources( | |
| &self, | |
| _req: Request<tonic::Streaming<xds_api::envoy::service::discovery::v3::DeltaDiscoveryRequest>>, | |
| ) -> Result<Response<Self::DeltaAggregatedResourcesStream>, Status> { | |
| Err(Status::unimplemented("delta xDS not implemented")) | |
| } | |
| } | |
| async fn send_type(shared: &Shared, tx: &mpsc::Sender<Result<DiscoveryResponse, Status>>, type_url: &str) -> Result<(), Status> { | |
| send_type_with_nonce(shared, tx, type_url, 1).await | |
| } | |
| async fn send_type_with_nonce( | |
| shared: &Shared, | |
| tx: &mpsc::Sender<Result<DiscoveryResponse, Status>>, | |
| type_url: &str, | |
| nonce: u64, | |
| ) -> Result<(), Status> { | |
| let s = shared.snap.read().await; | |
| let version = s.version.to_string(); | |
| let resources: Vec<Any> = match type_url { | |
| TYPE_CDS => s.clusters.values().map(|c| pack_any(TYPE_CDS, c)).collect(), | |
| TYPE_EDS => s.eds.values().map(|e| pack_any(TYPE_EDS, e)).collect(), | |
| TYPE_RDS => s.routes.values().map(|r| pack_any(TYPE_RDS, r)).collect(), | |
| _ => vec![], | |
| }; | |
| let resp = DiscoveryResponse { | |
| version_info: version, | |
| resources, | |
| type_url: type_url.to_string(), | |
| nonce: nonce.to_string(), | |
| ..Default::default() | |
| }; | |
| tx.send(Ok(resp)).await.map_err(|_| Status::internal("stream closed"))?; | |
| Ok(()) | |
| } | |
| #[tokio::main] | |
| async fn main() -> Result<(), Box<dyn std::error::Error>> { | |
| let (tick, _rx) = watch::channel::<u64>(0); | |
| let shared = Shared { | |
| snap: Arc::new(RwLock::new(Snapshot::default())), | |
| tick, | |
| }; | |
| // initial snapshot | |
| let sf = load_state("ads_state.json").await; | |
| install_snapshot(&shared, build_snapshot(&sf)).await; | |
| // poll the file once per second; reload if changed | |
| { | |
| let shared = shared.clone(); | |
| tokio::spawn(async move { | |
| let mut last = String::new(); | |
| loop { | |
| let cur = tokio::fs::read_to_string("ads_state.json").await.unwrap_or_default(); | |
| if cur != last { | |
| last = cur.clone(); | |
| if let Ok(sf) = serde_json::from_str::<StateFile>(&cur) { | |
| install_snapshot(&shared, build_snapshot(&sf)).await; | |
| } | |
| } | |
| tokio::time::sleep(Duration::from_secs(1)).await; | |
| } | |
| }); | |
| } | |
| let grpc_addr = "0.0.0.0:50051".parse()?; | |
| Server::builder() | |
| .add_service(AggregatedDiscoveryServiceServer::new(Ads { shared })) | |
| .serve(grpc_addr) | |
| .await?; | |
| Ok(()) | |
| } | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment