Skip to content

Instantly share code, notes, and snippets.

@amosbird
Created March 1, 2026 12:43
Show Gist options
  • Select an option

  • Save amosbird/d32a79433535accab41b9ee484ed9aa5 to your computer and use it in GitHub Desktop.

Select an option

Save amosbird/d32a79433535accab41b9ee484ed9aa5 to your computer and use it in GitHub Desktop.
RFC: Selective Replication for ReplicatedMergeTree

RFC: Selective Replication for ReplicatedMergeTree

Status: Draft Based on: #45766, PR #58132

Summary

Extend ReplicatedMergeTree so that each replica is responsible for an automatically selected subset of partitions. Replicas and shards are unified — no Distributed table needed. Partition key doubles as sharding key.

Motivation

Today, scaling ReplicatedMergeTree beyond a single node's capacity requires a Distributed table on top of manually sharded ReplicatedMergeTree tables. This makes resharding (adding/removing nodes) painful and operationally complex. The original RFC (#45766) proposed solving this by making each replica store only the partitions assigned to it while sharing the same replication log.

Relationship to PR #58132

PR #58132 is a full implementation attempt (~3600 lines). It works but has issues:

  • Stores assignment data inside block_numbers/ (invasive to existing ZK structures)
  • Introduces many new classes (6 new files for cluster/balancer/partition/selector/replica/sink)
  • Single-partition serial balancer (slow resharding)
  • No Analyzer support, needs heavy rebase

This RFC keeps the same design direction but aims for a cleaner, phased implementation.

User Interface

CREATE TABLE events ON CLUSTER '{cluster}'
(
    date Date,
    user_id UInt64,
    event String
)
ENGINE = ReplicatedMergeTree(...)
PARTITION BY (toYYYYMM(date), user_id % 16)
ORDER BY (date, user_id)
SETTINGS
    selective_replication = 1,
    replication_factor = 2;

10 nodes, 16 buckets, replication_factor=2 → each partition lives on 2 replicas, each replica holds ~3 partitions per month.

ZooKeeper Layout

/clickhouse/tables/{shard}/{table}/
├── ...                              # all existing nodes unchanged
└── selective/                       # new subtree
    ├── config                       # {"replication_factor": 2}
    └── assignments/                 # partition_id → assigned replicas
        ├── 202401_0                 # "replica1,replica3" (znode version for CAS)
        ├── 202401_1                 # "replica2,replica4"
        └── ...

Key decision: do not reuse block_numbers/. PR #58132 embedded assignment info inside block_numbers/{partition_id}, which couples with dedup/mutation tracking. A separate selective/ subtree has zero impact on existing behavior.

Design

Partition Assignment

When a new partition first appears (on INSERT), the receiving replica allocates it:

  1. List active replicas (those with ephemeral is_active node)
  2. Count partitions already assigned to each replica
  3. Pick the replication_factor least-loaded replicas
  4. CAS-create /selective/assignments/{partition_id}
  5. If ZNODEEXISTS → another replica won the race, read existing assignment

Filtered Log Processing

The core mechanism. In ReplicatedMergeTreeQueue::pullLogsToQueue(), each log entry is checked:

Entry type Behavior
GET_PART, ATTACH_PART, MERGE_PARTS, MUTATE_PART, DROP_RANGE, REPLACE_RANGE Check partition assignment. Skip if not assigned to this replica.
ALTER_METADATA, CLEAR_COLUMN, CLEAR_INDEX Always process (global schema operations).

Skipped entries still advance log_pointer — they are simply not enqueued.

SELECT Routing

Each replica maintains an in-memory assignment map, refreshed asynchronously from ZK (no synchronous ZK access on SELECT path — per original RFC).

  1. Determine which partitions are needed (partition pruning from WHERE clause)
  2. Local partitions → local read
  3. Remote partitions → ReadFromRemote with _partition_id filter in PREWHERE
  4. Union all results

Stale assignment maps are safe: old replicas keep data for old_parts_lifetime after reassignment (delayed cleanup), so in-flight queries don't break.

INSERT Routing

  1. Split block into sub-blocks by partition key
  2. For each sub-block:
    • If this replica owns the partition → local write (normal ReplicatedMergeTreeSink)
    • Otherwise → send to an assigned replica via connection pool
  3. New partition → allocate first, then write

Online Resharding (Partition Migration)

When nodes are added/removed or partitions are unbalanced, partitions migrate between replicas:

CLONE:  Add new replica to assignment (replication_factor temporarily +1)
        New replica FETCHes all existing parts from source
        New replica starts processing log entries for this partition

SWITCH: Wait for new replica to catch up (no pending entries for this partition)
        CAS-update assignment: remove old replica

CLEANUP: Old replica deletes data after old_parts_lifetime
         In-flight queries on old replica still work during this window

Parallel migration: Unlike PR #58132's serial balancer (one partition at a time), multiple partitions should migrate concurrently (different source → same target). With 10 source nodes each contributing 1GB/s, a new node can ingest ~10GB/s, reducing resharding time from ~1 hour to minutes.

Scaling Characteristics

Adding 2 nodes to a 10-node cluster (192 partitions, 50GB each):

Aspect Behavior
New data (new partitions) Immediately assigned to new nodes. Zero migration needed.
Historical data rebalance ~64 partitions × 50GB = 3.2TB to migrate
Serial migration (PR #58132) ~1 hour
Parallel migration (this RFC) ~3-5 minutes (bounded by target node ingest bandwidth)
Query performance during migration Unaffected — traffic stays on old nodes until SWITCH
Query performance after SWITCH Full speed — data is on local disk, no cold cache

Note: SharedMergeTree achieves "instant" scaling by deferring data locality to query time (cold reads from S3). Both approaches ultimately pay the same cost — moving bytes to local storage — just eager vs. lazy. Selective replication gives predictable, full-speed performance immediately after SWITCH.

Triggers

v1: manual only.

-- Move specific partition to specific replica
ALTER TABLE t REASSIGN PARTITION '202401_5' TO REPLICA 'replica3';

-- Auto-rebalance (select most unbalanced partitions and migrate)
SYSTEM REBALANCE TABLE t;

Future: automatic background rebalancer.

Phased Implementation

Phase 1: Static Selective Replication

Partition assignment is permanent. No online resharding.

  • 1a: Settings (selective_replication, replication_factor) + ZK selective/ subtree creation
  • 1b: Partition assignment on INSERT (least-loaded-first, CAS)
  • 1c: Filtered log processing in ReplicatedMergeTreeQueue
  • 1d: SELECT routing (assignment map cache + ReadFromRemote)
  • 1e: INSERT routing (split by partition, send to assigned replica)
  • 1f: system.selective_assignments table + tests

Phase 2: Online Resharding

  • 2a: Migration state machine (CLONE → SWITCH → CLEANUP)
  • 2b: Parallel migration support
  • 2c: ALTER TABLE REASSIGN PARTITION / SYSTEM REBALANCE

Phase 3: Advanced (as needed)

  • Anti-affinity tags for fault-domain-aware placement
  • Partition-key-aware query optimization (skip replicas when WHERE implies partition)
  • Automatic background rebalancer
  • Capacity-aware allocation (disk space, IO load)

Compatibility

  • selective_replication = 0 (default): 100% unchanged behavior, zero overhead
  • Once enabled, all replicas under the same ZK path must run a version supporting it
  • Not compatible with allow_remote_fs_zero_copy_replication (Phase 1)
  • Cannot be enabled on existing tables via ALTER (requires new table)
  • No new replication log entry types needed (uses existing FETCH mechanism)

Open Questions

  1. Assignment storage: Separate selective/ subtree (this RFC) vs. reuse block_numbers/ (PR #58132). Trade-off: cleaner separation vs. fewer ZK nodes.
  2. Parallel migration concurrency limit: How many partitions to migrate simultaneously? Needs to be tunable to avoid saturating network/IO.
  3. Interaction with MUTATE/MERGE during migration: Should mutations be blocked for a partition during CLONE phase, or applied on both old and new replica?
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment