Skip to content

Instantly share code, notes, and snippets.

@rockwotj
Created January 10, 2026 18:09
Show Gist options
  • Select an option

  • Save rockwotj/770af47e3d2df5465050021061034160 to your computer and use it in GitHub Desktop.

Select an option

Save rockwotj/770af47e3d2df5465050021061034160 to your computer and use it in GitHub Desktop.
Windowed Epoch Fencing Design for Redpanda Cloud Topics

Windowed Epoch Fencing for Cloud Topics

Background

Current Epoch Fencing Mechanism

The Cloud Topics Producer (CTP) State Machine uses epoch fencing to ensure that writes to cloud topics happen with monotonically increasing cluster epochs, preventing out-of-order writes and maintaining consistency across the distributed system.

Current Architecture:

  • Each produce request uploads data to object storage and receives an epoch
  • Before replicating metadata, the producer must "fence" that epoch
  • The fence_epoch function uses a reader-writer lock pattern:
    • Same epoch: Acquires read lock (allows concurrent writes)
    • New epoch: Acquires write lock, updates max_seen_epoch, downgrades to read lock
    • Old epoch: Rejects with stale_cluster_epoch error

Code locations:

  • State machine: src/v/cloud_topics/level_zero/stm/ctp_stm.{h,cc}
  • State management: src/v/cloud_topics/level_zero/stm/ctp_stm_state.{h,cc}
  • Usage: src/v/cloud_topics/frontend/frontend.cc

The Problem

The current lock-step coordination is too strict. When a new epoch N+1 arrives, it immediately updates max_seen_epoch, causing any in-flight requests with epoch N to be rejected with stale_cluster_epoch errors.

Example failure scenario:

  1. Producer A uploads batch with epoch N, starts fencing
  2. Producer B uploads batch with epoch N+1, completes fencing first
  3. Producer A's fence attempt now sees max_seen_epoch = N+1
  4. Producer A is rejected even though its request was valid when initiated
  5. Client receives spurious timeout/retry error

This overly aggressive rejection causes unnecessary failures and poor user experience during epoch transitions.

Proposed Solution: Windowed Epochs

Instead of accepting only a single epoch at a time, maintain a sliding window of two epochs where both are valid concurrently.

Window Semantics

Window states:

  • Empty window (initial): No epochs seen yet
  • Single epoch window: First epoch accepted, before second arrives
  • Double epoch window: Normal steady state with [oldest_accepted, latest_seen]

Window transitions:

Initial state:     []
First epoch N:     [N]
Second epoch N+1:  [N, N+1]
Third epoch N+2:   [N+1, N+2]  ← N is pushed out

Acceptance rule: Accept epoch E if oldest_accepted_epoch ≤ E ≤ latest_seen_epoch

Benefits

  1. Reduced spurious failures: In-flight requests with the previous epoch can complete successfully
  2. Graceful epoch transitions: Two-epoch window provides buffer during cluster epoch changes
  3. Maintained safety: Epochs below the window are still rejected, preserving monotonicity
  4. Simple implementation: Minimal changes to existing code structure

Implementation Plan

Phase 1: State Structure Changes

File: src/v/cloud_topics/level_zero/stm/ctp_stm_state.h

Add new state field and methods:

class ctp_stm_state {
    // Existing fields
    std::optional<cluster_epoch> _max_seen_epoch;
    std::optional<cluster_epoch> _max_applied_epoch;

    // NEW: Oldest epoch still accepted in the window
    std::optional<cluster_epoch> _oldest_accepted_epoch;

public:
    /// Advance max_seen_epoch and slide the window
    void advance_max_seen_epoch(cluster_epoch epoch) noexcept {
        if (!_max_seen_epoch.has_value()) {
            // First epoch - no window yet
            _max_seen_epoch = epoch;
            // oldest_accepted_epoch stays nullopt
        } else if (epoch > _max_seen_epoch.value()) {
            // Slide window: current latest becomes oldest accepted
            _oldest_accepted_epoch = _max_seen_epoch;
            _max_seen_epoch = epoch;
        }
        // If epoch <= _max_seen_epoch, no change needed
    }

    /// Get the oldest epoch still accepted in the window
    std::optional<cluster_epoch> get_oldest_accepted_epoch() const noexcept {
        return _oldest_accepted_epoch;
    }

    /// Check if an epoch is within the current acceptance window
    bool is_epoch_in_window(cluster_epoch e) const noexcept {
        auto latest = _max_seen_epoch;
        auto oldest = _oldest_accepted_epoch.or_else([&]() { return latest; });

        if (!latest.has_value()) {
            return false;  // No window established yet
        }
        if (!oldest.has_value()) {
            // Single epoch window (first epoch only)
            return e == latest.value();
        }
        // Double epoch window - check bounds
        return oldest.value() <= e && e <= latest.value();
    }
};

Serialization update:

auto serde_fields() {
    return std::tie(
      _max_applied_epoch,
      _last_reconciled_offset,
      _last_reconciled_log_offset,
      _max_applied_epoch_offset,
      _min_epoch_lower_bound,
      _start_offset
      // NOTE: _max_seen_epoch and _oldest_accepted_epoch are NOT serialized
      // They represent in-flight state and will be reconstructed after recovery
    );
}

Phase 2: Update Fence Logic

File: src/v/cloud_topics/level_zero/stm/ctp_stm.cc

Replace the fence_epoch function (lines 357-397):

ss::future<std::expected<cluster_epoch_fence, stale_cluster_epoch>>
ctp_stm::fence_epoch(cluster_epoch e) {
    auto holder = _gate.hold();
    if (!co_await sync(sync_timeout, _as)) {
        vlog(_log.warn, "ctp_stm::fence_epoch sync timeout");
        throw std::runtime_error(fmt_with_ctx(fmt::format, "Sync timeout"));
    }
    auto term = _raft->confirmed_term();

    // Helper to get latest epoch from state
    auto get_applied_epoch = [this] { return _state.get_max_epoch(); };

    // Get current window bounds
    auto latest = _state.get_max_seen_epoch().or_else(get_applied_epoch);
    auto oldest = _state.get_oldest_accepted_epoch()
        .or_else([&]() { return latest; });

    // Case 1: Epoch below window - REJECT
    if (oldest.has_value() && e < oldest.value()) {
        vlog(
          _log.debug,
          "Rejecting epoch {} below window [{}, {}]",
          e,
          oldest.value(),
          latest.value_or(cluster_epoch{-1}));
        co_return std::unexpected(
          stale_cluster_epoch(latest.value_or(cluster_epoch{-1})));
    }

    // Case 2: Epoch within window - READ LOCK
    if (_state.is_epoch_in_window(e)) {
        auto unit = co_await ss::get_units(_lock, 1, _as);
        // Double-check window after scheduling point
        if (_state.is_epoch_in_window(e)) {
            vlog(
              _log.trace,
              "Acquired fence for epoch {} within window [{}, {}]",
              e,
              _state.get_oldest_accepted_epoch().value_or(e),
              _state.get_max_seen_epoch().value_or(e));
            co_return cluster_epoch_fence{
              .unit = std::move(unit), .term = term};
        }
        // Window shifted while we were waiting, fall through to rejection
    }

    // Case 3: New epoch beyond window - WRITE LOCK to slide window
    if (!latest.has_value() || e > latest.value()) {
        auto unit = co_await ss::get_units(
          _lock, ss::semaphore::max_counter(), _as);
        auto current_latest = _state.get_max_seen_epoch()
          .or_else(get_applied_epoch);

        if (!current_latest.has_value() || e >= current_latest.value()) {
            // Slide window: old latest becomes oldest accepted
            auto old_latest = _state.get_max_seen_epoch();
            _state.advance_max_seen_epoch(e);
            vlog(
              _log.debug,
              "Sliding epoch window from [{}, {}] to [{}, {}]",
              _state.get_oldest_accepted_epoch().value_or(cluster_epoch{-1}),
              old_latest.value_or(cluster_epoch{-1}),
              _state.get_oldest_accepted_epoch().value_or(e),
              e);

            // Demote to read lock after window is updated
            unit.return_units(unit.count() - 1);
            co_return cluster_epoch_fence{
              .unit = std::move(unit), .term = term};
        }
        // Another thread advanced epoch while we waited, fall through
    }

    // If we reach here, epoch is stale or window shifted during lock acquisition
    vlog(
      _log.debug,
      "Rejecting stale epoch {}, current window: [{}, {}]",
      e,
      _state.get_oldest_accepted_epoch().value_or(cluster_epoch{-1}),
      _state.get_max_seen_epoch()
        .or_else(get_applied_epoch)
        .value_or(cluster_epoch{-1}));
    co_return std::unexpected(
      stale_cluster_epoch(_state.get_max_seen_epoch()
                            .or_else(get_applied_epoch)
                            .value_or(cluster_epoch{-1})));
}

Phase 3: Update API Layer

File: src/v/cloud_topics/level_zero/stm/ctp_stm_api.h

Add accessor for debugging/observability:

/// Get the current epoch acceptance window
std::pair<std::optional<cluster_epoch>, std::optional<cluster_epoch>>
get_epoch_window() const {
    return {
        _stm->state().get_oldest_accepted_epoch(),
        _stm->state().get_max_seen_epoch()
    };
}

Phase 4: Testing Strategy

Unit Tests

File: src/v/cloud_topics/level_zero/stm/tests/ctp_stm_test.cc

Add test cases:

  1. Test window sliding:

    • Fence epoch N → verify window is [N]
    • Fence epoch N+1 → verify window is [N, N+1]
    • Fence epoch N+2 → verify window is [N+1, N+2]
  2. Test concurrent fencing within window:

    • Fence epoch N (acquire lock)
    • Concurrently fence epoch N (should succeed with read lock)
    • Concurrently fence epoch N+1 (should succeed after window slides)
  3. Test rejection below window:

    • Establish window [N, N+1]
    • Attempt to fence epoch N-1 → should fail with stale_cluster_epoch
  4. Test non-contiguous epochs:

    • Fence N → window [N]
    • Fence N+5 → window [N, N+5]
    • Verify epochs N+1 through N+4 are accepted (within window)
  5. Test snapshot/recovery:

    • Establish window [N, N+1]
    • Take snapshot
    • Recover from snapshot
    • Verify window collapses to single epoch (max_applied_epoch)

Integration Tests

File: tests/rptest/tests/cloud_topics_test.py (or similar)

  1. Concurrent producers with mixed epochs:

    • Start 2 producers
    • Producer A gets epoch N, starts slow upload
    • Producer B gets epoch N+1, completes first
    • Verify Producer A still succeeds
  2. Rapid epoch transitions:

    • Generate epochs N, N+1, N+2, N+3 rapidly
    • Verify requests with N+2 and N+3 succeed
    • Verify requests with N+1 and earlier fail
  3. Leader failover during epoch transition:

    • Establish window [N, N+1]
    • Trigger leader failover
    • Verify new leader recovers window correctly

Phase 5: Observability

Add metrics for monitoring epoch window behavior:

// In ctp_stm.cc or appropriate probe file
struct ctp_stm_epoch_probe {
    void record_window_slide(cluster_epoch old_min, cluster_epoch old_max,
                            cluster_epoch new_min, cluster_epoch new_max) {
        _window_slides++;
        _window_size = new_max() - new_min() + 1;
    }

    void record_epoch_rejected_below_window(cluster_epoch rejected,
                                           cluster_epoch window_min) {
        _epochs_rejected_stale++;
        _last_rejected_epoch_gap = window_min() - rejected();
    }

    void record_fence_acquired(cluster_epoch epoch, bool is_new_epoch) {
        if (is_new_epoch) {
            _new_epoch_fences++;
        } else {
            _same_epoch_fences++;
        }
    }

    uint64_t _window_slides = 0;
    uint64_t _window_size = 0;
    uint64_t _epochs_rejected_stale = 0;
    int64_t _last_rejected_epoch_gap = 0;
    uint64_t _new_epoch_fences = 0;
    uint64_t _same_epoch_fences = 0;
};

Edge Cases

1. Snapshot and Recovery

Behavior: Window state (_oldest_accepted_epoch and _max_seen_epoch) is NOT persisted in snapshots.

Reasoning: These represent in-flight volatile state. After recovery:

  • Window collapses to single epoch (_max_applied_epoch)
  • First new produce request re-establishes the window
  • This is safe: epochs older than _max_applied_epoch are never accepted

Implementation: Exclude window fields from serde_fields().

2. Non-Contiguous Epochs

Scenario: Window is [5, 6], then epoch 10 arrives.

Behavior:

  • Window becomes [6, 10]
  • Epochs 7, 8, 9 are implicitly skipped
  • Any requests with epochs 6-10 are accepted

Reasoning: Cluster epochs don't need to be contiguous. The window defines a range, and all epochs within that range are valid.

3. Concurrent Window Transitions

Scenario: Multiple threads try to advance epoch simultaneously.

Protection:

  • Write lock (all semaphore units) prevents concurrent window slides
  • Double-check pattern after acquiring lock ensures correctness
  • First thread to acquire write lock slides the window
  • Subsequent threads see updated window and proceed accordingly

4. Out-of-Order Epoch Arrivals

Scenario: Requests with epochs N+1, N, N+2 arrive in that order.

Behavior:

  1. N+1 arrives → window becomes [N+1]
  2. N arrives → rejected (below window)
  3. N+2 arrives → window becomes [N+1, N+2]

Reasoning: This is correct - if N+1 was already accepted, N is truly stale and should be rejected.

Migration Path

This change is backwards compatible with existing deployments:

  1. Before rollout: Single epoch acceptance
  2. During rollout: Mixed behavior (some nodes use windowing, others don't)
  3. After rollout: Full windowed epoch behavior

The window automatically adapts to epoch sequences, so no migration steps are needed.

Success Metrics

After deployment, expect to see:

  1. Reduced error rates: Fewer stale_cluster_epoch rejections during normal operation
  2. Improved latency: Less retries due to epoch fencing failures
  3. Window metrics: Typical window size of 2, with slides corresponding to epoch transitions
  4. No safety violations: No epoch regressions or out-of-order writes

References

  • Current implementation: src/v/cloud_topics/level_zero/stm/ctp_stm.{h,cc}
  • State management: src/v/cloud_topics/level_zero/stm/ctp_stm_state.{h,cc}
  • Usage in frontend: src/v/cloud_topics/frontend/frontend.cc (lines 570-591, 712-735)
  • Existing tests: src/v/cloud_topics/level_zero/stm/tests/ctp_stm_test.cc
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment