Last active
March 6, 2026 13:29
-
-
Save felipecrv/38ce85b4f561d5a5cbbd367993458ee2 to your computer and use it in GitHub Desktop.
Snowflake ID generation for distributed systems.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| // Copyright 2026 Felipe Oliveira Carvalho | |
| // SPDX-License-Identifier: Apache-2.0 | |
| //! Snowflake ID generation. | |
| //! | |
| //! Snowflake IDs are a form of unique identifier used in distributed computing. | |
| //! The format was created by Twitter. Every snowflake has a unique structure, | |
| //! so they took the name "snowflake ID". The format has been adopted by other | |
| //! companies, including Discord and Instagram. | |
| //! | |
| //! The pros of Snowflake IDs include: | |
| //! - They are roughly ordered by time, which can be useful for sorting and | |
| //! querying. | |
| //! - They are compact (64 bits) and can be generated efficiently without | |
| //! coordination between nodes, as long as each node has a unique shard ID | |
| //! and reasonably synchronized clocks. | |
| //! | |
| //! Systems that don't want to leak timestamp information should use a dual | |
| //! system with snowflakes as internal 64-bit IDs used as primary and foreign | |
| //! keys in the database and external IDs (UUID, ULID, KSUID, etc) for public | |
| //! APIs and client-side usage. | |
| //! | |
| //! **Binary layout (`u64`)** | |
| //! | |
| //! ```text | |
| //! 63 0 | |
| //! +---+-----------------------------+--------------+--------------+ | |
| //! | 0 | timestamp (41 bits) | shard (10) | seq (12) | | |
| //! +---+-----------------------------+--------------+--------------+ | |
| //! ^ unused MSB (sign bit kept at 0) least-significant bit ^ | |
| //! ``` | |
| //! | |
| //! Field meanings: | |
| //! - `timestamp`: milliseconds since [`SNOWFLAKE_EPOCH`] | |
| //! - `shard`: node/shard identifier in `[0, 1023]` | |
| //! - `seq`: per-millisecond sequence in `[0, 4095]` | |
| //! | |
| //! Packing formula: | |
| //! - `id = (timestamp << 22) | (shard << 12) | seq` | |
| //! | |
| //! Snowflake pitfalls and how this implementation handles them: | |
| //! - **Clock rollback**: if wall-clock goes backwards, IDs can regress/collide. | |
| //! This implementation computes a rollback offset and accepts bounded rollback | |
| //! (up to [`MAX_CLOCK_ROLLBACK`]); larger rollback returns [`ClockError::Skew`]. | |
| //! - **Sequence exhaustion in hot milliseconds**: generating more than 4096 IDs | |
| //! in one millisecond can force duplicates unless paused. This implementation | |
| //! blocks via [`SnowflakeEnv::exhausted_sequence`] until the next millisecond. | |
| //! - **Restart monotonicity**: process restarts can lose in-memory state and issue | |
| //! IDs older than previously emitted IDs. This implementation can seed from | |
| //! [`SnowflakeEnv::persisted_system_time`] and forces first post-restart ID to | |
| //! move to a strictly newer millisecond. | |
| //! - **Timestamp range overflow**: 41-bit timestamp eventually overflows (~69.7 | |
| //! years after the epoch). This implementation reports [`ClockError::Overflow`] | |
| //! when range is exceeded. Future maintainers of a system using these IDs can | |
| //! migrate by using the sign bit to indicate the new encoding that will survive | |
| //! beyond this century. | |
| //! - **Shard-id misconfiguration**: global uniqueness still depends on deployment | |
| //! assigning distinct shard ids. Since slight clock skew is handled gracefully, | |
| //! a full restart of the system with a different assignment of shard IDs won't | |
| //! cause collisions as long ad the shard IDs are distinct. | |
| //! - **Shutdown races**: generating IDs while shutting down can break persistence | |
| //! guarantees. This implementation allows env-controlled shutdown gating via | |
| //! [`SnowflakeEnv::is_shutting_down`] causing every `next_id` call to fail with | |
| //! [`ClockError::Shutdown`] after that. A clean application shutdown is not | |
| //! always possible, but unless clocks are severely misconfigured, the window | |
| //! for ID collisions should be small. | |
| //! | |
| //! References: | |
| //! - [Wikipedia: Snowflake ID](https://en.wikipedia.org/wiki/Snowflake_ID) | |
| //! - [Twitter Engineering: Announcing Snowflake](https://blog.twitter.com/engineering/en_us/a/2010/announcing-snowflake.html) | |
| //! - [Instagram Engineering: Sharding & IDs at Instagram](https://instagram-engineering.com/sharding-ids-at-instagram-1cf5a71e5a5c) | |
| use std::ops::DerefMut; | |
| use std::sync::{Arc, Mutex}; | |
| use std::time::{Duration, SystemTime, UNIX_EPOCH}; | |
| const TIMESTAMP_BITS: u64 = 41; | |
| const SHARD_BITS: u64 = 10; | |
| const SEQUENCE_BITS: u64 = 12; | |
| const MAX_TIMESTAMP_PART: u64 = (1u64 << TIMESTAMP_BITS) - 1; | |
| const MAX_SHARD_ID: u16 = (1u16 << SHARD_BITS) - 1; | |
| const MAX_SEQUENCE: u16 = (1u16 << SEQUENCE_BITS) - 1; | |
| const SHARD_SHIFT: u64 = SEQUENCE_BITS; | |
| const TIMESTAMP_SHIFT: u64 = SHARD_BITS + SEQUENCE_BITS; | |
| /// The maximum allowed clock rollback duration before the generator starts rejecting requests. | |
| pub const MAX_CLOCK_ROLLBACK: Duration = Duration::from_secs(1); | |
| /// Snowflake ID epoch used by this implementation. | |
| /// | |
| /// The Unix timestamp for "2021-08-23T00:57:24.449Z" in milliseconds. | |
| pub const SNOWFLAKE_EPOCH: Duration = Duration::from_millis(1_629_680_244_449); | |
| /// Mutable state for one [`SnowflakeGenerator`]. | |
| /// | |
| /// This is held behind a mutex and updated on each generated ID. | |
| #[derive(Clone, Copy)] | |
| pub struct SnowflakeState { | |
| /// Last emitted timestamp (milliseconds since [`SNOWFLAKE_EPOCH`]). | |
| pub last_timestamp_ms: u64, | |
| /// How much has the clock rolled back, if it has rolled back at all. | |
| /// | |
| /// The value is added to the current timestamp to get the effective timestamp | |
| /// used for generating IDs. This allows the generator to continue generating | |
| /// IDs even if the system clock has rolled back, as long as the rollback is | |
| /// within a reasonable limit. | |
| pub rollback_offset: Duration, | |
| /// Sequence inside `last_timestamp_ms`. | |
| pub sequence: u16, | |
| } | |
| impl SnowflakeState { | |
| fn new(last_timestamp_ms: u64) -> Self { | |
| Self { | |
| last_timestamp_ms, | |
| rollback_offset: Duration::ZERO, | |
| sequence: 0, | |
| } | |
| } | |
| } | |
| /// Clock failures and control-flow errors during ID generation. | |
| #[derive(Clone, Copy, Debug)] | |
| pub enum ClockError { | |
| /// The system clock is set to a time before the snowflake epoch. | |
| Underflow, | |
| /// The system clock has advanced beyond the maximum timestamp representable | |
| /// in a snowflake ID. | |
| Overflow, | |
| /// The system clock is currently behind the last recorded timestamp, | |
| /// indicating a potential clock skew or rollback. | |
| Skew(Duration), | |
| /// The generator is shutting down and cannot generate IDs anymore. | |
| /// | |
| /// Only produced on [SnowflakeEnv] implementations that might return `true` | |
| /// from `is_shutting_down()`. | |
| Shutdown, | |
| } | |
| impl SnowflakeState { | |
| /// Bump the sequence number for the current timestamp if possible. | |
| /// | |
| /// This must be called for every ID generated with the same timestamp | |
| /// to ensure uniqueness. | |
| fn next_seqnum(&mut self) -> Option<u16> { | |
| if self.sequence < MAX_SEQUENCE { | |
| self.sequence += 1; | |
| Some(self.sequence) | |
| } else { | |
| None | |
| } | |
| } | |
| /// Reset the state for a new timestamp. | |
| /// | |
| /// # Panics | |
| /// | |
| /// This should only be called when the timestamp has advanced. | |
| fn recycle(&mut self, timestamp_ms: u64) -> u16 { | |
| assert!( | |
| timestamp_ms > self.last_timestamp_ms, | |
| "timestamp must advance" | |
| ); | |
| self.last_timestamp_ms = timestamp_ms; | |
| self.sequence = 0; | |
| self.sequence | |
| } | |
| } | |
| /// Environment hooks for time/shutdown/persistence. | |
| /// | |
| /// This trait is injected so applications and tests can provide | |
| /// different behavior for clock, sleeping, and restart persistence. | |
| pub trait SnowflakeEnv: Send + Sync { | |
| /// Returns the shard id embedded into generated IDs. | |
| /// | |
| /// Must fit in 10 bits (`0..=1023`). | |
| fn shard_id(&self) -> u16; | |
| /// Returns the last persisted system time if available. | |
| /// | |
| /// This can be used to restore the generator state after a restart and | |
| /// avoid generating IDs with timestamps that are before the last generated | |
| /// ID in case the system clock has rolled back during the downtime. If the | |
| /// environment does not have a way to persist the last system time, it can | |
| /// return [None]. | |
| /// | |
| /// We expect the [SnowflakeEnv] implementation to very carefully validate | |
| /// the stored timestamp and only return it if it's trustworthy. When in | |
| /// doubt, returning a timestamp slightlily in the future is safer than | |
| /// returning a stale timestamp in the past, since the former will just | |
| /// cause a short delay in ID generation while the clock catches up, whereas | |
| /// the latter can lead to ID collisions if the generator produces IDs with | |
| /// timestamps that are before the last generated ID. | |
| fn persisted_system_time(&self) -> Option<SystemTime> { | |
| None | |
| } | |
| /// Returns the current wall-clock time. | |
| fn now(&self) -> SystemTime { | |
| SystemTime::now() | |
| } | |
| /// Returns true if the generator is in the process of shutting down. | |
| /// | |
| /// The generator calls `now()` then only generates IDs if | |
| /// `is_shutting_down()` returns false. The application shutdown routine can | |
| /// combine this with a timestamp persistence mechanism by setting a global | |
| /// shutdown flag to `true` and persisting the current system time after the | |
| /// flag is set. | |
| fn is_shutting_down(&self) -> bool { | |
| false | |
| } | |
| /// Sleep primitive used by retry/stall paths. | |
| fn sleep_for(&self, dur: Duration) { | |
| std::thread::sleep(dur); | |
| } | |
| /// Called when the sequence number is exhausted during a millisecond window. | |
| /// | |
| /// `stall` is the time until the next millisecond tick, which is the | |
| /// minimum time we need to wait before we can generate more IDs. The | |
| /// implementation can choose to wait for the full duration or spend some of | |
| /// the time logging, emitting metrics, or performing other tasks. | |
| /// | |
| /// IMPORTANT: [SnowflakeEnv::exhausted_sequence()] is called while holding | |
| /// the generator mutex and all ID generation is blocked while this function | |
| /// is running. | |
| fn exhausted_sequence(&self, stall: Duration) { | |
| self.sleep_for(stall); | |
| } | |
| /// Called on clock errors. Returns true if the generator should retry, or | |
| /// false if caller should just propagate the error to the application. | |
| /// | |
| /// IMPORTANT: [SnowflakeEnv::on_error()] is called while holding the | |
| /// generator mutex and all ID generation is blocked while this function is | |
| /// running. | |
| /// | |
| /// [ClockError::Shutdown] is never passed to this function, since the | |
| /// generator checks to avoid indefinite blocking during shutdown. | |
| #[must_use] | |
| fn on_error(&self, err: ClockError, retry_count: u64, _state: &SnowflakeState) -> bool { | |
| use ClockError::*; | |
| match err { | |
| Underflow | Overflow => false, | |
| Skew(skew) => { | |
| let wait = Duration::from_millis(10); | |
| self.sleep_for(skew.min(wait)); | |
| retry_count < 5 // only retry 5 times | |
| } | |
| Shutdown => false, // on_error() is never called with `Shutdown` errors | |
| } | |
| } | |
| } | |
| /// Thread-safe Snowflake generator. | |
| pub struct SnowflakeGenerator { | |
| state: Mutex<SnowflakeState>, | |
| env: Arc<dyn SnowflakeEnv>, | |
| shard_id: u16, | |
| } | |
| impl SnowflakeGenerator { | |
| /// Creates a new generator bound to an environment/shard. | |
| /// | |
| /// If a persisted timestamp exists, startup state is seeded from it to help | |
| /// preserve monotonicity across restarts. | |
| pub fn new(env: Arc<dyn SnowflakeEnv>) -> Self { | |
| let shard_id = env.shard_id(); | |
| assert!(shard_id <= MAX_SHARD_ID, "invalid shard id"); | |
| let last_timestamp_ms = match env.persisted_system_time() { | |
| Some(persisted_system_time) => { | |
| // We expect the [SnowflakeEnv] implementation to very carefully | |
| // validate the stored timestamp and only return it if it's | |
| // trustworthy. So any error handling here is just a sanity | |
| // check and not expected to ever be triggered in practice. | |
| let timestamp = (persisted_system_time + MAX_CLOCK_ROLLBACK) | |
| .max(env.now()) | |
| .duration_since(UNIX_EPOCH + SNOWFLAKE_EPOCH) | |
| .unwrap_or(Duration::ZERO) | |
| .as_millis(); | |
| if timestamp < MAX_TIMESTAMP_PART as u128 { | |
| timestamp as u64 | |
| } else { | |
| 0 | |
| } | |
| } | |
| None => 0, | |
| }; | |
| let mut state = SnowflakeState::new(last_timestamp_ms); | |
| // Force next_id() to move to a strictly newer millisecond when resuming | |
| // within the same millisecond. | |
| state.sequence = MAX_SEQUENCE; | |
| Self { | |
| state: Mutex::new(state), | |
| env, | |
| shard_id, | |
| } | |
| } | |
| /// Generates the next Snowflake ID. | |
| /// | |
| /// Returns [`ClockError`] when clock/shutdown constraints prevent | |
| /// generation. | |
| pub fn next_id(&self) -> Result<u64, ClockError> { | |
| let (timestamp_ms, sequence) = { | |
| let mut state = self.state.lock().unwrap(); | |
| let mut timestamp_ms = self.current_timestamp_ms(state.deref_mut())?; | |
| let sequence = if timestamp_ms == state.last_timestamp_ms { | |
| match state.next_seqnum() { | |
| Some(seqnum) => seqnum, | |
| None => { | |
| timestamp_ms = self.wait_next_millis(state.deref_mut())?; | |
| state.recycle(timestamp_ms) | |
| } | |
| } | |
| } else { | |
| // SAFETY: timestamp_ms > state.last_timestamp_ms is guaranteed | |
| // here. If the clock has rolled back `current_timestamp_ms` | |
| // will handle it by incrementing the rollback offset or | |
| // returning an error. | |
| state.recycle(timestamp_ms) | |
| }; | |
| (timestamp_ms, sequence) | |
| }; | |
| let snowflake = Self::compose_snowflake(timestamp_ms, self.shard_id, sequence); | |
| if self.env.is_shutting_down() { | |
| Err(ClockError::Shutdown) | |
| } else { | |
| Ok(snowflake) | |
| } | |
| } | |
| fn wait_next_millis(&self, state: &mut SnowflakeState) -> Result<u64, ClockError> { | |
| const NANOS_PER_MILLI: u32 = 1_000_000; | |
| loop { | |
| let timestamp_ms = self.current_timestamp_ms(state)?; | |
| if timestamp_ms > state.last_timestamp_ms { | |
| return Ok(timestamp_ms); | |
| } | |
| let stall = self | |
| .env | |
| .now() | |
| .duration_since(UNIX_EPOCH) | |
| .and_then(|now| { | |
| let next_tick = UNIX_EPOCH | |
| + Duration::new(now.as_secs(), (now.subsec_millis() + 1) * NANOS_PER_MILLI); | |
| next_tick.duration_since(UNIX_EPOCH + now) | |
| }) | |
| // 1/4 of a millisecond if we don't know better | |
| .unwrap_or(Duration::from_micros(250)); | |
| self.env.exhausted_sequence(stall); | |
| } | |
| } | |
| fn current_timestamp_ms(&self, state: &mut SnowflakeState) -> Result<u64, ClockError> { | |
| let snowflake_epoch = UNIX_EPOCH + SNOWFLAKE_EPOCH; | |
| let last_system_time = snowflake_epoch + Duration::from_millis(state.last_timestamp_ms); | |
| let aux = move |state: &mut SnowflakeState| { | |
| let now = self.env.now(); | |
| let required_rollback = match last_system_time.duration_since(now) { | |
| // last_timestamp_ms is in the future, so we need to roll back | |
| Ok(dur) => dur, | |
| // last_timestamp_ms is in the past/present, as expected, no | |
| // rollback needed | |
| Err(_) => Duration::ZERO, | |
| }; | |
| // Adjust rollback as the system clock catches up. This keeps the | |
| // time shift minimal when demand for ID generations cools down | |
| // after a clock issue. | |
| state.rollback_offset = if required_rollback > MAX_CLOCK_ROLLBACK { | |
| let err = if now < snowflake_epoch { | |
| ClockError::Underflow | |
| } else { | |
| ClockError::Skew(required_rollback) | |
| }; | |
| return Err(err); | |
| } else { | |
| required_rollback | |
| }; | |
| // Include the rollback offset for the underflow/overflow check. | |
| (now + state.rollback_offset) | |
| .duration_since(snowflake_epoch) | |
| .map_err(|_| ClockError::Underflow) | |
| .and_then(|timestamp| { | |
| let timestamp_ms = timestamp.as_millis(); | |
| if timestamp_ms <= MAX_TIMESTAMP_PART as u128 { | |
| Ok(timestamp_ms as u64) | |
| } else { | |
| Err(ClockError::Overflow) | |
| } | |
| }) | |
| }; | |
| let mut retry_count = 0; | |
| loop { | |
| match aux(state) { | |
| Ok(timestamp) => return Ok(timestamp), | |
| Err(err) => { | |
| if self.env.on_error(err, retry_count, state) { | |
| retry_count += 1; | |
| continue; | |
| } | |
| return Err(err); | |
| } | |
| } | |
| } | |
| } | |
| fn compose_snowflake(timestamp_ms: u64, shard_id: u16, sequence: u16) -> u64 { | |
| debug_assert!(timestamp_ms <= MAX_TIMESTAMP_PART, "timestamp out of range"); | |
| let timestamp_part = timestamp_ms & MAX_TIMESTAMP_PART; | |
| debug_assert!(shard_id <= MAX_SHARD_ID, "invalid shard id"); | |
| debug_assert!(sequence <= MAX_SEQUENCE, "invalid sequence"); | |
| (timestamp_part << TIMESTAMP_SHIFT) | |
| | (((shard_id as u64) & (MAX_SHARD_ID as u64)) << SHARD_SHIFT) | |
| | ((sequence as u64) & (MAX_SEQUENCE as u64)) | |
| } | |
| } | |
| #[inline] | |
| /// Fast structural check: positive and sign bit clear. | |
| pub fn is_valid_snowflake_id(id: u64) -> bool { | |
| id > 0 && (id >> 63) == 0 | |
| } | |
| #[inline] | |
| /// Absolute wall-clock instant corresponding to [`SNOWFLAKE_EPOCH`]. | |
| pub fn snowflake_epoch() -> SystemTime { | |
| UNIX_EPOCH + SNOWFLAKE_EPOCH | |
| } | |
| #[cfg(test)] | |
| mod tests { | |
| use std::sync::atomic::{AtomicBool, Ordering}; | |
| use super::*; | |
| use proptest::prelude::*; | |
| use time::format_description::well_known::Rfc3339; | |
| use time::OffsetDateTime; | |
| const VERBOSE: bool = false; | |
| // const VERBOSE: bool = true; | |
| const YEAR_ISH: Duration = Duration::from_secs(300 * 24 * 3600); | |
| #[test] | |
| fn test_invalid_snowflake_id() { | |
| assert!(!is_valid_snowflake_id(0)); | |
| assert!(!is_valid_snowflake_id(u64::MAX)); | |
| } | |
| #[derive(Clone)] | |
| struct SimulationState { | |
| now: SystemTime, | |
| num_stalls: usize, | |
| max_stall_duration: Duration, | |
| } | |
| impl SimulationState { | |
| fn new(now: SystemTime) -> Self { | |
| Self { | |
| now, | |
| num_stalls: 0, | |
| max_stall_duration: Duration::ZERO, | |
| } | |
| } | |
| fn reset_stats(&mut self) { | |
| self.num_stalls = 0; | |
| self.max_stall_duration = Duration::ZERO; | |
| } | |
| } | |
| struct TestSnowflakeEnv { | |
| state: Mutex<SimulationState>, | |
| is_shutting_down: AtomicBool, | |
| persisted_timestamp: Option<SystemTime>, | |
| } | |
| impl TestSnowflakeEnv { | |
| fn new(now: SystemTime) -> Self { | |
| let state = SimulationState::new(now); | |
| Self { | |
| state: Mutex::new(state), | |
| persisted_timestamp: None, | |
| is_shutting_down: AtomicBool::new(false), | |
| } | |
| } | |
| fn restarted(&self, persisted_timestamp: Option<SystemTime>) -> Box<Self> { | |
| let state = self.state.lock().unwrap(); | |
| let env = Self { | |
| state: Mutex::new(state.clone()), | |
| persisted_timestamp, | |
| is_shutting_down: AtomicBool::new(false), | |
| }; | |
| Box::new(env) | |
| } | |
| fn set(&self, now: SystemTime) { | |
| let mut state = self.state.lock().unwrap(); | |
| state.now = now; | |
| } | |
| fn advance(&self, dur: Duration) { | |
| let mut state = self.state.lock().unwrap(); | |
| state.now += dur; | |
| } | |
| fn rewind(&self, dur: Duration) { | |
| let mut state = self.state.lock().unwrap(); | |
| state.now -= dur; | |
| } | |
| fn shutdown(&self) { | |
| self.is_shutting_down.store(true, Ordering::SeqCst); | |
| } | |
| fn reset_stats(&self) { | |
| let mut state = self.state.lock().unwrap(); | |
| state.reset_stats(); | |
| } | |
| fn num_stalls(&self) -> usize { | |
| self.state.lock().unwrap().num_stalls | |
| } | |
| fn max_stall_duration(&self) -> Duration { | |
| self.state.lock().unwrap().max_stall_duration | |
| } | |
| } | |
| impl SnowflakeEnv for TestSnowflakeEnv { | |
| fn shard_id(&self) -> u16 { | |
| 3 | |
| } | |
| fn persisted_system_time(&self) -> Option<SystemTime> { | |
| self.persisted_timestamp | |
| } | |
| fn now(&self) -> SystemTime { | |
| self.state.lock().unwrap().now | |
| } | |
| fn is_shutting_down(&self) -> bool { | |
| self.is_shutting_down.load(Ordering::SeqCst) | |
| } | |
| fn sleep_for(&self, dur: Duration) { | |
| self.advance(dur + Duration::from_nanos(1)); | |
| } | |
| fn on_error(&self, err: ClockError, retry_count: u64, state: &SnowflakeState) -> bool { | |
| use ClockError::*; | |
| if VERBOSE { | |
| eprintln!( | |
| " {}\tretry_count={retry_count} rollback_offset={:?} sequence={} last_timestamp=\"{}\" | |
| -> {err:?}", | |
| time::OffsetDateTime::from(snowflake_epoch() + Duration::from_millis(state.last_timestamp_ms)), | |
| state.rollback_offset, | |
| state.sequence, | |
| time::OffsetDateTime::from(self.now()), | |
| ); | |
| } | |
| match err { | |
| Underflow => { | |
| self.set(snowflake_epoch() + YEAR_ISH); | |
| } | |
| Overflow => { | |
| self.set( | |
| snowflake_epoch() + (Duration::from_millis(MAX_TIMESTAMP_PART) - YEAR_ISH), | |
| ); | |
| } | |
| Skew(skew) => { | |
| // Jump directly towards the required correction to avoid | |
| // getting stuck retrying with a permanently stale clock. | |
| self.advance(skew); | |
| } | |
| Shutdown => unreachable!("on_error() is never called with `Shutdown` errors"), | |
| }; | |
| // Recovers in no more than 2 retries. Property tests fail | |
| // if this ends up returning false for a run. | |
| retry_count < 2 | |
| } | |
| fn exhausted_sequence(&self, stall: Duration) { | |
| self.sleep_for(stall); | |
| let mut state = self.state.lock().unwrap(); | |
| state.num_stalls += 1; | |
| if stall > state.max_stall_duration { | |
| state.max_stall_duration = stall; | |
| } | |
| } | |
| } | |
| fn test_snowflake_env(start_time: SystemTime) -> Arc<TestSnowflakeEnv> { | |
| Arc::new(TestSnowflakeEnv::new(start_time)) | |
| } | |
| #[test] | |
| fn test_snowflake_epoch() { | |
| let epoch = snowflake_epoch(); | |
| assert_eq!( | |
| OffsetDateTime::from(epoch).format(&Rfc3339).unwrap(), | |
| "2021-08-23T00:57:24.449Z" | |
| ); | |
| } | |
| #[test] | |
| fn test_max_timestamp() { | |
| let timestamp = snowflake_epoch() + Duration::from_millis(MAX_TIMESTAMP_PART); | |
| assert_eq!( | |
| OffsetDateTime::from(timestamp).format(&Rfc3339).unwrap(), | |
| "2091-04-29T16:45:00Z" | |
| ); | |
| } | |
| #[test] | |
| fn test_snowflake_increases() { | |
| let env = test_snowflake_env(SystemTime::now()); | |
| let gen = SnowflakeGenerator::new(Arc::clone(&env) as Arc<dyn SnowflakeEnv>); | |
| let first = gen.next_id().unwrap(); | |
| let second = gen.next_id().unwrap(); | |
| assert!(second > first); | |
| assert!(is_valid_snowflake_id(first)); | |
| assert!(is_valid_snowflake_id(second)); | |
| assert_eq!(env.num_stalls(), 0); | |
| env.advance(Duration::from_millis(1)); | |
| let mut prev_id = gen.next_id().unwrap(); | |
| for _ in 0..MAX_SEQUENCE { | |
| let id = gen.next_id().unwrap(); | |
| assert!(id > prev_id); | |
| prev_id = id; | |
| } | |
| assert_eq!(env.num_stalls(), 0); | |
| let id = gen.next_id().unwrap(); // first stall after exhausting sequence in just 1ms | |
| assert!(id > prev_id); | |
| assert_eq!(env.num_stalls(), 1); | |
| assert!(env.max_stall_duration() < Duration::from_millis(1)); | |
| } | |
| struct AppState { | |
| env: Arc<TestSnowflakeEnv>, | |
| gen: SnowflakeGenerator, | |
| persisted_timestamp: Option<SystemTime>, | |
| last_id: u64, | |
| } | |
| impl AppState { | |
| fn new(start_time: SystemTime) -> Self { | |
| let env = test_snowflake_env(start_time); | |
| let gen = SnowflakeGenerator::new(Arc::clone(&env) as Arc<dyn SnowflakeEnv>); | |
| Self { | |
| env, | |
| gen, | |
| persisted_timestamp: None, | |
| last_id: 0, | |
| } | |
| } | |
| fn restart(&mut self) { | |
| let restarted = self.env.restarted(self.persisted_timestamp).into(); | |
| self.env = restarted; | |
| self.gen = SnowflakeGenerator::new(Arc::clone(&self.env) as Arc<dyn SnowflakeEnv>); | |
| } | |
| fn generate(&mut self, n: u16) -> Result<(), TestCaseError> { | |
| for _ in 0..n { | |
| let res = self.gen.next_id(); | |
| let id = match res { | |
| Ok(id) => id, | |
| Err(ClockError::Shutdown) => { | |
| self.restart(); | |
| continue; | |
| } | |
| res => { | |
| // any other error is a property violation | |
| prop_assert!(res.is_ok()); | |
| unreachable!(); | |
| } | |
| }; | |
| prop_assert!(is_valid_snowflake_id(id)); | |
| prop_assert!(id > self.last_id); | |
| let timestamp_part = id >> TIMESTAMP_SHIFT; | |
| prop_assert!(timestamp_part <= MAX_TIMESTAMP_PART); | |
| self.last_id = id; | |
| } | |
| Ok(()) | |
| } | |
| fn shutdown(&mut self) { | |
| self.env.shutdown(); | |
| let persisted_timestamp = self.env.now(); | |
| if VERBOSE { | |
| eprintln!( | |
| " persisted_timestamp=\"{}\"", | |
| time::OffsetDateTime::from(persisted_timestamp) | |
| ); | |
| } | |
| self.persisted_timestamp = Some(persisted_timestamp); | |
| } | |
| } | |
| #[derive(Clone, Debug)] | |
| enum TestAction { | |
| Advance { dur: Duration, count: u16 }, | |
| Rewind { dur: Duration, count: u16 }, | |
| Shutdown, | |
| } | |
| proptest! { | |
| #![proptest_config(ProptestConfig::with_cases(0))] | |
| #[test] | |
| fn prop_clock_skew_underflow_overflow_scenarios( | |
| actions in prop::collection::vec(action_strategy(), 420) | |
| ) { | |
| let mut app = AppState::new(snowflake_epoch()); | |
| if VERBOSE { | |
| eprintln!("-- Running test with {} actions", actions.len()); | |
| } | |
| for action in actions { | |
| if VERBOSE { | |
| eprintln!("{}:\t\t{:?}", time::OffsetDateTime::from(app.env.now()), action); | |
| } | |
| match action { | |
| TestAction::Advance { dur, count } => { | |
| app.env.advance(dur); | |
| app.generate(count)?; | |
| } | |
| TestAction::Rewind { dur, count } => { | |
| app.env.rewind(dur); | |
| app.generate(count)?; | |
| } | |
| TestAction::Shutdown => { | |
| // This doesn't simulate concurrent generation and shutdown, | |
| // but it's a good approximation. | |
| app.shutdown(); | |
| } | |
| } | |
| } | |
| } | |
| } | |
| fn duration_strategy() -> impl Strategy<Value = Duration> { | |
| prop_oneof![ | |
| Just(Duration::from_micros(1)), | |
| Just(Duration::from_micros(2)), | |
| Just(Duration::from_micros(999)), | |
| Just(Duration::from_micros(1000)), | |
| Just(Duration::from_micros(1001)), | |
| Just(Duration::from_millis(9)), | |
| Just(Duration::from_millis(10)), | |
| Just(Duration::from_millis(11)), | |
| Just(MAX_CLOCK_ROLLBACK - Duration::from_millis(1)), | |
| Just(MAX_CLOCK_ROLLBACK), | |
| Just(MAX_CLOCK_ROLLBACK + Duration::from_millis(1)), | |
| Just(Duration::from_millis(MAX_TIMESTAMP_PART / 25)), | |
| ] | |
| } | |
| fn count_strategy() -> impl Strategy<Value = u16> { | |
| prop_oneof![ | |
| Just(1u16), | |
| Just(2u16), | |
| Just(MAX_SEQUENCE), | |
| Just(MAX_SEQUENCE + 1), | |
| ] | |
| } | |
| fn action_strategy() -> impl Strategy<Value = TestAction> { | |
| prop_oneof![ | |
| 48 => (duration_strategy(), count_strategy()) | |
| .prop_map(|(dur, count)| TestAction::Advance { dur, count }), | |
| 12 => (duration_strategy(), count_strategy()) | |
| .prop_map(|(dur, count)| TestAction::Rewind { dur, count }), | |
| 1 => Just(TestAction::Shutdown), | |
| ] | |
| } | |
| } |
Author
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Snowflake ID generation
Snowflake IDs are a form of unique identifier used in distributed computing. The format was created by Twitter. Every snowflake has a unique structure, so they took the name "snowflake ID". The format has been adopted by other companies, including Discord and Instagram.
The pros of Snowflake IDs include:
Systems that don't want to leak timestamp information should use a dual system with snowflakes as internal 64-bit IDs used as primary and foreign keys in the database and external IDs (UUID, ULID, KSUID, etc) for public APIs and client-side usage.
Binary layout (
u64)Field meanings:
timestamp: milliseconds since [SNOWFLAKE_EPOCH]shard: node/shard identifier in[0, 1023]seq: per-millisecond sequence in[0, 4095]Packing formula:
id = (timestamp << 22) | (shard << 12) | seqSnowflake pitfalls and how this implementation handles them:
MAX_CLOCK_ROLLBACK]); larger rollback returns [ClockError::Skew].SnowflakeEnv::exhausted_sequence] until the next millisecond.SnowflakeEnv::persisted_system_time] and forces first post-restart ID to move to a strictly newer millisecond.ClockError::Overflow] when range is exceeded. Future maintainers of a system using these IDs can migrate by using the sign bit to indicate the new encoding that will survive beyond this century.SnowflakeEnv::is_shutting_down] causing everynext_idcall to fail with [ClockError::Shutdown] after that. A clean application shutdown is not always possible, but unless clocks are severely misconfigured, the window for ID collisions should be small.References