Status: Draft Based on: #45766, PR #58132
Extend ReplicatedMergeTree so that each replica is responsible for an automatically selected subset of partitions. Replicas and shards are unified — no Distributed table needed. Partition key doubles as sharding key.
Today, scaling ReplicatedMergeTree beyond a single node's capacity requires a Distributed table on top of manually sharded ReplicatedMergeTree tables. This makes resharding (adding/removing nodes) painful and operationally complex. The original RFC (#45766) proposed solving this by making each replica store only the partitions assigned to it while sharing the same replication log.
PR #58132 is a full implementation attempt (~3600 lines). It works but has issues:
- Stores assignment data inside
block_numbers/(invasive to existing ZK structures) - Introduces many new classes (6 new files for cluster/balancer/partition/selector/replica/sink)
- Single-partition serial balancer (slow resharding)
- No Analyzer support, needs heavy rebase
This RFC keeps the same design direction but aims for a cleaner, phased implementation.
CREATE TABLE events ON CLUSTER '{cluster}'
(
date Date,
user_id UInt64,
event String
)
ENGINE = ReplicatedMergeTree(...)
PARTITION BY (toYYYYMM(date), user_id % 16)
ORDER BY (date, user_id)
SETTINGS
selective_replication = 1,
replication_factor = 2;10 nodes, 16 buckets, replication_factor=2 → each partition lives on 2 replicas, each replica holds ~3 partitions per month.
/clickhouse/tables/{shard}/{table}/
├── ... # all existing nodes unchanged
└── selective/ # new subtree
├── config # {"replication_factor": 2}
└── assignments/ # partition_id → assigned replicas
├── 202401_0 # "replica1,replica3" (znode version for CAS)
├── 202401_1 # "replica2,replica4"
└── ...
Key decision: do not reuse block_numbers/. PR #58132 embedded assignment info inside block_numbers/{partition_id}, which couples with dedup/mutation tracking. A separate selective/ subtree has zero impact on existing behavior.
When a new partition first appears (on INSERT), the receiving replica allocates it:
- List active replicas (those with ephemeral
is_activenode) - Count partitions already assigned to each replica
- Pick the
replication_factorleast-loaded replicas - CAS-create
/selective/assignments/{partition_id} - If
ZNODEEXISTS→ another replica won the race, read existing assignment
The core mechanism. In ReplicatedMergeTreeQueue::pullLogsToQueue(), each log entry is checked:
| Entry type | Behavior |
|---|---|
| GET_PART, ATTACH_PART, MERGE_PARTS, MUTATE_PART, DROP_RANGE, REPLACE_RANGE | Check partition assignment. Skip if not assigned to this replica. |
| ALTER_METADATA, CLEAR_COLUMN, CLEAR_INDEX | Always process (global schema operations). |
Skipped entries still advance log_pointer — they are simply not enqueued.
Each replica maintains an in-memory assignment map, refreshed asynchronously from ZK (no synchronous ZK access on SELECT path — per original RFC).
- Determine which partitions are needed (partition pruning from WHERE clause)
- Local partitions → local read
- Remote partitions → ReadFromRemote with
_partition_idfilter in PREWHERE - Union all results
Stale assignment maps are safe: old replicas keep data for old_parts_lifetime after reassignment (delayed cleanup), so in-flight queries don't break.
- Split block into sub-blocks by partition key
- For each sub-block:
- If this replica owns the partition → local write (normal ReplicatedMergeTreeSink)
- Otherwise → send to an assigned replica via connection pool
- New partition → allocate first, then write
When nodes are added/removed or partitions are unbalanced, partitions migrate between replicas:
CLONE: Add new replica to assignment (replication_factor temporarily +1)
New replica FETCHes all existing parts from source
New replica starts processing log entries for this partition
SWITCH: Wait for new replica to catch up (no pending entries for this partition)
CAS-update assignment: remove old replica
CLEANUP: Old replica deletes data after old_parts_lifetime
In-flight queries on old replica still work during this window
Parallel migration: Unlike PR #58132's serial balancer (one partition at a time), multiple partitions should migrate concurrently (different source → same target). With 10 source nodes each contributing 1GB/s, a new node can ingest ~10GB/s, reducing resharding time from ~1 hour to minutes.
Adding 2 nodes to a 10-node cluster (192 partitions, 50GB each):
| Aspect | Behavior |
|---|---|
| New data (new partitions) | Immediately assigned to new nodes. Zero migration needed. |
| Historical data rebalance | ~64 partitions × 50GB = 3.2TB to migrate |
| Serial migration (PR #58132) | ~1 hour |
| Parallel migration (this RFC) | ~3-5 minutes (bounded by target node ingest bandwidth) |
| Query performance during migration | Unaffected — traffic stays on old nodes until SWITCH |
| Query performance after SWITCH | Full speed — data is on local disk, no cold cache |
Note: SharedMergeTree achieves "instant" scaling by deferring data locality to query time (cold reads from S3). Both approaches ultimately pay the same cost — moving bytes to local storage — just eager vs. lazy. Selective replication gives predictable, full-speed performance immediately after SWITCH.
v1: manual only.
-- Move specific partition to specific replica
ALTER TABLE t REASSIGN PARTITION '202401_5' TO REPLICA 'replica3';
-- Auto-rebalance (select most unbalanced partitions and migrate)
SYSTEM REBALANCE TABLE t;Future: automatic background rebalancer.
Partition assignment is permanent. No online resharding.
- 1a: Settings (
selective_replication,replication_factor) + ZKselective/subtree creation - 1b: Partition assignment on INSERT (least-loaded-first, CAS)
- 1c: Filtered log processing in
ReplicatedMergeTreeQueue - 1d: SELECT routing (assignment map cache + ReadFromRemote)
- 1e: INSERT routing (split by partition, send to assigned replica)
- 1f:
system.selective_assignmentstable + tests
- 2a: Migration state machine (CLONE → SWITCH → CLEANUP)
- 2b: Parallel migration support
- 2c:
ALTER TABLE REASSIGN PARTITION/SYSTEM REBALANCE
- Anti-affinity tags for fault-domain-aware placement
- Partition-key-aware query optimization (skip replicas when WHERE implies partition)
- Automatic background rebalancer
- Capacity-aware allocation (disk space, IO load)
selective_replication = 0(default): 100% unchanged behavior, zero overhead- Once enabled, all replicas under the same ZK path must run a version supporting it
- Not compatible with
allow_remote_fs_zero_copy_replication(Phase 1) - Cannot be enabled on existing tables via ALTER (requires new table)
- No new replication log entry types needed (uses existing FETCH mechanism)
- Assignment storage: Separate
selective/subtree (this RFC) vs. reuseblock_numbers/(PR #58132). Trade-off: cleaner separation vs. fewer ZK nodes. - Parallel migration concurrency limit: How many partitions to migrate simultaneously? Needs to be tunable to avoid saturating network/IO.
- Interaction with MUTATE/MERGE during migration: Should mutations be blocked for a partition during CLONE phase, or applied on both old and new replica?