The Cloud Topics Producer (CTP) State Machine uses epoch fencing to ensure that writes to cloud topics happen with monotonically increasing cluster epochs, preventing out-of-order writes and maintaining consistency across the distributed system.
Current Architecture:
- Each produce request uploads data to object storage and receives an epoch
- Before replicating metadata, the producer must "fence" that epoch
- The
fence_epochfunction uses a reader-writer lock pattern:- Same epoch: Acquires read lock (allows concurrent writes)
- New epoch: Acquires write lock, updates
max_seen_epoch, downgrades to read lock - Old epoch: Rejects with
stale_cluster_epocherror
Code locations:
- State machine:
src/v/cloud_topics/level_zero/stm/ctp_stm.{h,cc} - State management:
src/v/cloud_topics/level_zero/stm/ctp_stm_state.{h,cc} - Usage:
src/v/cloud_topics/frontend/frontend.cc
The current lock-step coordination is too strict. When a new epoch N+1 arrives, it immediately updates max_seen_epoch, causing any in-flight requests with epoch N to be rejected with stale_cluster_epoch errors.
Example failure scenario:
- Producer A uploads batch with epoch N, starts fencing
- Producer B uploads batch with epoch N+1, completes fencing first
- Producer A's fence attempt now sees
max_seen_epoch = N+1 - Producer A is rejected even though its request was valid when initiated
- Client receives spurious timeout/retry error
This overly aggressive rejection causes unnecessary failures and poor user experience during epoch transitions.
Instead of accepting only a single epoch at a time, maintain a sliding window of two epochs where both are valid concurrently.
Window states:
- Empty window (initial): No epochs seen yet
- Single epoch window: First epoch accepted, before second arrives
- Double epoch window: Normal steady state with
[oldest_accepted, latest_seen]
Window transitions:
Initial state: []
First epoch N: [N]
Second epoch N+1: [N, N+1]
Third epoch N+2: [N+1, N+2] ← N is pushed out
Acceptance rule:
Accept epoch E if oldest_accepted_epoch ≤ E ≤ latest_seen_epoch
- Reduced spurious failures: In-flight requests with the previous epoch can complete successfully
- Graceful epoch transitions: Two-epoch window provides buffer during cluster epoch changes
- Maintained safety: Epochs below the window are still rejected, preserving monotonicity
- Simple implementation: Minimal changes to existing code structure
File: src/v/cloud_topics/level_zero/stm/ctp_stm_state.h
Add new state field and methods:
class ctp_stm_state {
// Existing fields
std::optional<cluster_epoch> _max_seen_epoch;
std::optional<cluster_epoch> _max_applied_epoch;
// NEW: Oldest epoch still accepted in the window
std::optional<cluster_epoch> _oldest_accepted_epoch;
public:
/// Advance max_seen_epoch and slide the window
void advance_max_seen_epoch(cluster_epoch epoch) noexcept {
if (!_max_seen_epoch.has_value()) {
// First epoch - no window yet
_max_seen_epoch = epoch;
// oldest_accepted_epoch stays nullopt
} else if (epoch > _max_seen_epoch.value()) {
// Slide window: current latest becomes oldest accepted
_oldest_accepted_epoch = _max_seen_epoch;
_max_seen_epoch = epoch;
}
// If epoch <= _max_seen_epoch, no change needed
}
/// Get the oldest epoch still accepted in the window
std::optional<cluster_epoch> get_oldest_accepted_epoch() const noexcept {
return _oldest_accepted_epoch;
}
/// Check if an epoch is within the current acceptance window
bool is_epoch_in_window(cluster_epoch e) const noexcept {
auto latest = _max_seen_epoch;
auto oldest = _oldest_accepted_epoch.or_else([&]() { return latest; });
if (!latest.has_value()) {
return false; // No window established yet
}
if (!oldest.has_value()) {
// Single epoch window (first epoch only)
return e == latest.value();
}
// Double epoch window - check bounds
return oldest.value() <= e && e <= latest.value();
}
};Serialization update:
auto serde_fields() {
return std::tie(
_max_applied_epoch,
_last_reconciled_offset,
_last_reconciled_log_offset,
_max_applied_epoch_offset,
_min_epoch_lower_bound,
_start_offset
// NOTE: _max_seen_epoch and _oldest_accepted_epoch are NOT serialized
// They represent in-flight state and will be reconstructed after recovery
);
}File: src/v/cloud_topics/level_zero/stm/ctp_stm.cc
Replace the fence_epoch function (lines 357-397):
ss::future<std::expected<cluster_epoch_fence, stale_cluster_epoch>>
ctp_stm::fence_epoch(cluster_epoch e) {
auto holder = _gate.hold();
if (!co_await sync(sync_timeout, _as)) {
vlog(_log.warn, "ctp_stm::fence_epoch sync timeout");
throw std::runtime_error(fmt_with_ctx(fmt::format, "Sync timeout"));
}
auto term = _raft->confirmed_term();
// Helper to get latest epoch from state
auto get_applied_epoch = [this] { return _state.get_max_epoch(); };
// Get current window bounds
auto latest = _state.get_max_seen_epoch().or_else(get_applied_epoch);
auto oldest = _state.get_oldest_accepted_epoch()
.or_else([&]() { return latest; });
// Case 1: Epoch below window - REJECT
if (oldest.has_value() && e < oldest.value()) {
vlog(
_log.debug,
"Rejecting epoch {} below window [{}, {}]",
e,
oldest.value(),
latest.value_or(cluster_epoch{-1}));
co_return std::unexpected(
stale_cluster_epoch(latest.value_or(cluster_epoch{-1})));
}
// Case 2: Epoch within window - READ LOCK
if (_state.is_epoch_in_window(e)) {
auto unit = co_await ss::get_units(_lock, 1, _as);
// Double-check window after scheduling point
if (_state.is_epoch_in_window(e)) {
vlog(
_log.trace,
"Acquired fence for epoch {} within window [{}, {}]",
e,
_state.get_oldest_accepted_epoch().value_or(e),
_state.get_max_seen_epoch().value_or(e));
co_return cluster_epoch_fence{
.unit = std::move(unit), .term = term};
}
// Window shifted while we were waiting, fall through to rejection
}
// Case 3: New epoch beyond window - WRITE LOCK to slide window
if (!latest.has_value() || e > latest.value()) {
auto unit = co_await ss::get_units(
_lock, ss::semaphore::max_counter(), _as);
auto current_latest = _state.get_max_seen_epoch()
.or_else(get_applied_epoch);
if (!current_latest.has_value() || e >= current_latest.value()) {
// Slide window: old latest becomes oldest accepted
auto old_latest = _state.get_max_seen_epoch();
_state.advance_max_seen_epoch(e);
vlog(
_log.debug,
"Sliding epoch window from [{}, {}] to [{}, {}]",
_state.get_oldest_accepted_epoch().value_or(cluster_epoch{-1}),
old_latest.value_or(cluster_epoch{-1}),
_state.get_oldest_accepted_epoch().value_or(e),
e);
// Demote to read lock after window is updated
unit.return_units(unit.count() - 1);
co_return cluster_epoch_fence{
.unit = std::move(unit), .term = term};
}
// Another thread advanced epoch while we waited, fall through
}
// If we reach here, epoch is stale or window shifted during lock acquisition
vlog(
_log.debug,
"Rejecting stale epoch {}, current window: [{}, {}]",
e,
_state.get_oldest_accepted_epoch().value_or(cluster_epoch{-1}),
_state.get_max_seen_epoch()
.or_else(get_applied_epoch)
.value_or(cluster_epoch{-1}));
co_return std::unexpected(
stale_cluster_epoch(_state.get_max_seen_epoch()
.or_else(get_applied_epoch)
.value_or(cluster_epoch{-1})));
}File: src/v/cloud_topics/level_zero/stm/ctp_stm_api.h
Add accessor for debugging/observability:
/// Get the current epoch acceptance window
std::pair<std::optional<cluster_epoch>, std::optional<cluster_epoch>>
get_epoch_window() const {
return {
_stm->state().get_oldest_accepted_epoch(),
_stm->state().get_max_seen_epoch()
};
}File: src/v/cloud_topics/level_zero/stm/tests/ctp_stm_test.cc
Add test cases:
-
Test window sliding:
- Fence epoch N → verify window is [N]
- Fence epoch N+1 → verify window is [N, N+1]
- Fence epoch N+2 → verify window is [N+1, N+2]
-
Test concurrent fencing within window:
- Fence epoch N (acquire lock)
- Concurrently fence epoch N (should succeed with read lock)
- Concurrently fence epoch N+1 (should succeed after window slides)
-
Test rejection below window:
- Establish window [N, N+1]
- Attempt to fence epoch N-1 → should fail with
stale_cluster_epoch
-
Test non-contiguous epochs:
- Fence N → window [N]
- Fence N+5 → window [N, N+5]
- Verify epochs N+1 through N+4 are accepted (within window)
-
Test snapshot/recovery:
- Establish window [N, N+1]
- Take snapshot
- Recover from snapshot
- Verify window collapses to single epoch (max_applied_epoch)
File: tests/rptest/tests/cloud_topics_test.py (or similar)
-
Concurrent producers with mixed epochs:
- Start 2 producers
- Producer A gets epoch N, starts slow upload
- Producer B gets epoch N+1, completes first
- Verify Producer A still succeeds
-
Rapid epoch transitions:
- Generate epochs N, N+1, N+2, N+3 rapidly
- Verify requests with N+2 and N+3 succeed
- Verify requests with N+1 and earlier fail
-
Leader failover during epoch transition:
- Establish window [N, N+1]
- Trigger leader failover
- Verify new leader recovers window correctly
Add metrics for monitoring epoch window behavior:
// In ctp_stm.cc or appropriate probe file
struct ctp_stm_epoch_probe {
void record_window_slide(cluster_epoch old_min, cluster_epoch old_max,
cluster_epoch new_min, cluster_epoch new_max) {
_window_slides++;
_window_size = new_max() - new_min() + 1;
}
void record_epoch_rejected_below_window(cluster_epoch rejected,
cluster_epoch window_min) {
_epochs_rejected_stale++;
_last_rejected_epoch_gap = window_min() - rejected();
}
void record_fence_acquired(cluster_epoch epoch, bool is_new_epoch) {
if (is_new_epoch) {
_new_epoch_fences++;
} else {
_same_epoch_fences++;
}
}
uint64_t _window_slides = 0;
uint64_t _window_size = 0;
uint64_t _epochs_rejected_stale = 0;
int64_t _last_rejected_epoch_gap = 0;
uint64_t _new_epoch_fences = 0;
uint64_t _same_epoch_fences = 0;
};Behavior: Window state (_oldest_accepted_epoch and _max_seen_epoch) is NOT persisted in snapshots.
Reasoning: These represent in-flight volatile state. After recovery:
- Window collapses to single epoch (
_max_applied_epoch) - First new produce request re-establishes the window
- This is safe: epochs older than
_max_applied_epochare never accepted
Implementation: Exclude window fields from serde_fields().
Scenario: Window is [5, 6], then epoch 10 arrives.
Behavior:
- Window becomes [6, 10]
- Epochs 7, 8, 9 are implicitly skipped
- Any requests with epochs 6-10 are accepted
Reasoning: Cluster epochs don't need to be contiguous. The window defines a range, and all epochs within that range are valid.
Scenario: Multiple threads try to advance epoch simultaneously.
Protection:
- Write lock (all semaphore units) prevents concurrent window slides
- Double-check pattern after acquiring lock ensures correctness
- First thread to acquire write lock slides the window
- Subsequent threads see updated window and proceed accordingly
Scenario: Requests with epochs N+1, N, N+2 arrive in that order.
Behavior:
- N+1 arrives → window becomes [N+1]
- N arrives → rejected (below window)
- N+2 arrives → window becomes [N+1, N+2]
Reasoning: This is correct - if N+1 was already accepted, N is truly stale and should be rejected.
This change is backwards compatible with existing deployments:
- Before rollout: Single epoch acceptance
- During rollout: Mixed behavior (some nodes use windowing, others don't)
- After rollout: Full windowed epoch behavior
The window automatically adapts to epoch sequences, so no migration steps are needed.
After deployment, expect to see:
- Reduced error rates: Fewer
stale_cluster_epochrejections during normal operation - Improved latency: Less retries due to epoch fencing failures
- Window metrics: Typical window size of 2, with slides corresponding to epoch transitions
- No safety violations: No epoch regressions or out-of-order writes
- Current implementation:
src/v/cloud_topics/level_zero/stm/ctp_stm.{h,cc} - State management:
src/v/cloud_topics/level_zero/stm/ctp_stm_state.{h,cc} - Usage in frontend:
src/v/cloud_topics/frontend/frontend.cc(lines 570-591, 712-735) - Existing tests:
src/v/cloud_topics/level_zero/stm/tests/ctp_stm_test.cc