All problems

Q3 Storage & Query Systems 27 min read 11 sections

Lexicographic Range Query Store

Build an ordered store that supports inclusive lexicographic range scans across billions of keys.

PartitioningConsistencyStreamingCost / efficiency

1 Problem Restatement & Clarifying Questions #

Restatement: Design a storage system indexed by byte-string keys that, given an inclusive [lo, hi] range, returns all stored keys (and optionally values) in lexicographic order, scaling from one laptop to billions of keys across thousands of nodes. The primary read path is ordered range scan, not point lookup — this one constraint eliminates several otherwise reasonable designs.

Clarifying Questions [LIVE: ask 5-6, note the rest]

  1. Key distribution & size: Uniformly random byte-strings, ASCII identifiers, timestamp-prefixed (Bigtable row-keys), or adversarial (all share a long common prefix like /user/<uuid>/...)? → Drives whether prefix compression and prefix bloom filters are worth it, and whether naive range partitioning produces hotspots.
  2. Key/value size: avg key length (50B? 200B?), avg value length (0B — index only? 1KB rows? 1MB blobs?). → Drives block size, memory budget, decision on inline vs BlobStore (WiscKey-style KV separation).
  3. Inclusive/exclusive bounds & empty bounds: [lo, hi] inclusive on both ends confirmed. Is lo = "" legal (scan from start)? Is hi = nil legal (scan to end)? Prefix scan (key.startsWith(p)) a first-class op or client-synthesized as [p, p + 0xFF…]?
  4. Values attached? Keys-only (think: Google's pre-indexed Bloom set / a giant std::set<string>) vs KV (Bigtable) vs KV + secondary indexes. Assume KV with no secondary indexes for this design.
  5. Read:write ratio & QPS: 10:1 read-heavy? 1:10 ingest-heavy (observability)? Peak vs sustained? → Drives compaction strategy (leveled favors reads, tiered favors writes) and cache sizing.
  6. Consistency: strong linearizable (Spanner), bounded-staleness (Bigtable follower reads), or eventual (Cassandra ONE)? Do range scans need a consistent snapshot across shards, or may they splice per-shard snapshots?
  7. Durability & RPO/RTO: sync WAL-fsync per write (RPO=0) or group-commit batched (RPO ~10ms)? Multi-region sync (loses latency) or async (loses durability)?
  8. Deletes, TTL, multi-region: tombstones with compaction GC, TTL per row/column-family, active-active across regions?

For the rest of this doc I assume: arbitrary UTF-8 keys up to 1 KB, values up to 16 KB, 10 B keys × 10¹⁰ rows ≈ the BOE below, 10:1 read:write, bounded-staleness default + optional strong-read flag, inclusive bounds, tombstones + TTL, single region first (multi-region in §10).


2 Functional Requirements #

FR (numbered, testable):

  1. FR-1 Put(key, value) — idempotent on client_token; last-writer-wins on (key, ts).
  2. FR-2 Get(key) — exact-match point lookup; returns value or NOT_FOUND.
  3. FR-3 Delete(key) — writes a tombstone; future reads return NOT_FOUND until GC'd.
  4. FR-4 RangeScan(lo, hi, limit, cursor, direction) — returns ≤ limit entries in strictly ascending (or descending) lex order within [lo, hi] inclusive; returns opaque next_cursor if more data exists.
  5. FR-5 Batch(ops) — up to N mixed Put/Delete in one RPC, atomic per row (not cross-row). Multi-row atomicity is out-of-scope (see below).
  6. FR-6 Prefix scanScan(prefix) sugar == RangeScan(prefix, prefix_upper_bound).
  7. FR-7 TTL — per-row TTL honored by reads (filtered out) and reclaimed by compaction.
  8. FR-8 Server-side filter (optional) — simple predicate pushdown (value regex, column filter) to reduce network bytes on scans.

Out of scope [LIVE: explicitly call out so scope doesn't balloon]

  • Full-text / inverted-index search (that's a different system — Elasticsearch / Vespa).
  • Secondary indexes on value fields.
  • Cross-row / cross-shard transactions (requires 2PC + TrueTime à la Spanner — a different interview).
  • SQL-like joins, aggregations, GROUP BY.
  • Schema management / strong typing (this is bytes-in, bytes-out).

3 NFRs + Capacity Estimate #

3.1 Non-Functional Requirements

NFR Target Method
Availability 99.99% region-wide (4.38 min/month down) Multi-replica Raft per shard; failover < 10 s
Latency (point Get) p50 < 2 ms, p99 < 10 ms In-memory index + bloom + SSD block cache
Latency (1K-key scan) p50 < 8 ms, p99 < 20 ms Range-partition → single-shard; no cross-shard fan-out for most scans
Latency (10K-key cross-shard scan) p99 < 80 ms Parallel fan-out, ordered merge
Write throughput 500 K writes/s/cluster sustained Async group commit + LSM buffered writes
Durability 11 nines of object durability 3× replication across 3 racks/AZs + fsync-on-commit WAL
Consistency Bounded-staleness (~100 ms) default; strong (Raft read-index) opt-in Per-shard Raft; leader reads for strong
Scale 10 B keys, 10 TB replicated, 3 K nodes Range-partitioned tablets, 100-1000/node

3.2 Back-of-Envelope Math (showing work)

Logical data size

  • 10 B keys × (avg key 100 B + avg value 200 B + ~40 B overhead per row: row header, ts, seq#, 2× length-prefix) ≈ 10¹⁰ × 340 B ≈ 3.4 TB logical.

On-disk size after LSM compression (Zstd level 3 ≈ 2.5× on text-ish data)

  • 3.4 TB / 2.5 ≈ 1.36 TB compressed; call it ~1.5 TB to leave headroom.
  • Replicated ×3 → 4.5 TB physical across cluster.

Shard (tablet) sizing

  • Target tablet size: 256 GB compressed (Bigtable default 100-200 GB; Spanner splits above ~1 GB; we pick 256 GB to amortize overhead but keep split time reasonable).
  • 4.5 TB / (256 GB × 3 replicas) = ~6 tablets minimum for just data. That's laughably small — in reality you want many more tablets than nodes for load balancing.
  • Rule of thumb: 50-200 tablets/node. With 30 nodes, ~3000 tablets × 256 GB pre-replication is pointless — we're over-provisioning tablets.
  • Resolve: start with tablet size 1 GB (matches Spanner/TiKV), giving 1500 tablets × 3 replicas = 4500 replica-shards. Spread across ~30 nodes (each node holds ~150 replica-shards), each node ~150 GB of data. This is the right granularity: splits are fast, rebalance is cheap, per-tablet Raft group has a manageable quorum.

Write QPS and WAL/memtable math

  • 500 K writes/s × 340 B/write = 170 MB/s cluster ingest.
  • Per node (30 nodes): ~5.7 MB/s — trivial SSD-wise.
  • Per tablet at peak (assume hot tablet absorbs 10×): 56 MB/s → leader fsync budget.
  • Memtable size 64 MB → flush every 64 / 5.7 ≈ 11 s per tablet at uniform load; the hot tablet flushes every ~1.1 s (that's a bad sign — see §7b).

Read QPS and disk IOPS

  • Read QPS 5 M/s cluster (10:1), i.e. 167 K/s/node.
  • Each Get = (memtable probe) + (N × bloom check) + ≤1 SSD read with cache. Bloom FP rate at 1% × N SSTables (say N=10 on a tablet) = ~10% of misses hit disk unnecessarily → tune bloom to 0.1% with 10 bits/key.
  • Effective IOPS/node: 167 K × (1 miss rate 0.3 + 0.1 bloom-FP) ≈ 67 K IOPS/node → fits comfortably on modern NVMe (500 K IOPS/drive).

Memory budget per node

  • Bloom filters: 1.5 TB / 10 bits/key equiv … actually compute per-key: 10⁸ keys/node × 10 bits = 125 MB for blooms.
  • Sparse index (1 entry per 64 KB block): 150 GB / 64 KB × ~40 B/entry ≈ 93 MB.
  • Block cache: target 8 GB/node (keeps hot working set).
  • Memtable + immutable memtables awaiting flush: 64 MB × 150 tablets × 2 = 19 GBtoo much. This is a real concern: with 150 tablets/node, per-tablet memtables dominate RAM. Mitigation: shared global memtable budget (RocksDB db_write_buffer_size), force flush of the largest memtable when the global cap is hit. Realistic budget: 4 GB shared.
  • Total RAM/node: ~128 GB commodity (plenty of headroom).

Network

  • Cross-shard scan fan-out: ~10 shards × 1 MB result each = 10 MB → on 10 Gbps NIC is 8 ms transfer (non-trivial, matters for p99).

These numbers justify our 30-node, 1-GB-tablet, NVMe, 128 GB RAM baseline.


4 High-Level API #

// gRPC service — v1
service RangeStore {
  rpc Put(PutRequest)       returns (PutResponse);
  rpc Get(GetRequest)       returns (GetResponse);
  rpc Delete(DeleteRequest) returns (DeleteResponse);
  rpc Batch(BatchRequest)   returns (BatchResponse);

  // Streaming scan — server-side flow-controlled.
  rpc RangeScan(ScanRequest) returns (stream ScanChunk);
}

message PutRequest {
  bytes  key         = 1;
  bytes  value       = 2;   // opaque bytes
  string client_token= 3;   // idempotency; dedupe window ~10 min
  int64  ttl_seconds = 4;   // 0 = no TTL
  ConsistencyLevel  consistency = 5; // default: LEADER_COMMIT
}

message ScanRequest {
  bytes  lo            = 1;   // inclusive; empty = -inf
  bytes  hi            = 2;   // inclusive; empty = +inf
  int32  limit         = 3;   // per response chunk or total; see semantics
  bytes  cursor        = 4;   // opaque; replaces lo when resuming
  Direction direction  = 5;   // ASC | DESC
  Snapshot  snapshot   = 6;   // HLC timestamp; empty = latest bounded-stale
  Filter    filter     = 7;   // optional predicate pushdown
  int32     max_bytes_per_chunk = 8;
}

message ScanChunk {
  repeated Entry entries = 1;  // strictly ordered
  bytes          next_cursor = 2;  // empty => exhausted
  bool           is_last_chunk = 3;
  int64          read_hlc     = 4; // snapshot the server served
}

Design decisions and why

  • Streaming RPC for scans, not unary: a 100 K-row scan must not buffer 100 MB in the coordinator. gRPC server streaming lets us pipe rows as they emerge from the merge iterator and apply flow-control backpressure (HTTP/2 WINDOW_UPDATE) — the client controls pace.
  • Opaque cursor, not last_key_seen:
    • Encodes (shard_version, last_key, per_shard_hlc[], filter_hash); sealed with an HMAC so clients can't forge it.
    • Shard-aware — survives tablet splits: cursor pins the read to snapshot_hlc, coordinator re-derives which (possibly newly-split) shards cover the remaining range.
    • Never leak the raw last key alone: if a split happens between chunks, a naive lo = last_key + 0x00 can produce dupes or skips because shard boundaries moved.
  • client_token per write → the server dedupes within a time-boxed LRU (e.g. 10 min / 1 M entries per leader), avoiding duplicate writes when clients retry on timeout. This is the standard exactly-once delivery on at-least-once transport pattern.
  • Batch is atomic per row, not cross-row. Explicitly documented. Callers wanting cross-row atomicity must layer 2PC or use transactions (out of scope).
  • Consistency level per-call: LEADER_COMMIT (Raft read-index), BOUNDED_STALE_100MS (follower reads), QUORUM (if we pick Cassandra-style later — not chosen here).
  • Error codes: NOT_FOUND, ABORTED_SHARD_SPLIT_RETRY, UNAVAILABLE_SHARD, INVALID_CURSOR, DEADLINE_EXCEEDED, SNAPSHOT_TOO_OLD (for bounded reads past the GC horizon).
  • Idempotency: writes by token, reads by cursor. Scans are idempotent under the same cursor + snapshot.

4.1 Scan semantics: the dupes-and-skips trap

A correct scan under concurrent splits must guarantee exactly-once delivery per key and strict ordering. The coordinator must:

  1. Fix snapshot_hlc at the start of the scan (HLC timestamp).
  2. Resolve the tablet map at that HLC (metadata service serves historical tablet maps within a bounded window, e.g. last 1 hour).
  3. Any split that occurs during the scan is invisible at snapshot_hlc; we keep reading the old tablet's frozen SSTables until done, then let GC reclaim.
  4. When the cursor resumes, it carries snapshot_hlc. If the snapshot is older than the GC horizon (SNAPSHOT_TOO_OLD), the client must restart.

This is the earned-secret difference between "works in the happy path" and "works in production during a repartitioning storm."


5 Data Schema #

5.1 On-disk layout (SSTable format, RocksDB/LevelDB lineage)

┌────────────────────────── SSTable file ──────────────────────────┐
│ Data Block 0   (~32 KB, shared-prefix-compressed, Zstd-framed)   │
│ Data Block 1                                                     │
│ ...                                                              │
│ Data Block N                                                     │
│ Filter Block (bloom, or ribbon filter — see §7a)                 │
│ Index Block  (one entry per data block: first_key → block_offset)│
│ Properties Block (min_key, max_key, num_entries, seqno range,    │
│                   bloom_bits_per_key, compression, creation_time)│
│ Footer (fixed size: index_block_offset, filter_offset,           │
│         format_version, magic_number, CRC32C)                    │
└──────────────────────────────────────────────────────────────────┘

Inside a data block:

┌─── Data Block (restart-interval-compressed) ───────────┐
│ Entry: [shared_prefix_len][unshared_len][val_len]      │
│        [unshared_bytes][value_bytes]                   │
│ ... every 16 entries, a restart point resets prefix    │
│ Restart Point Array [offset0, offset1, ...]            │
│ Num Restarts                                           │
│ Block CRC32C                                           │
└────────────────────────────────────────────────────────┘
  • Shared-prefix compression inside a block: since keys are sorted, consecutive keys often share long prefixes. Bigtable row-keys like com.example.user/abc/... share 20+ bytes. We store shared_len + the suffix only. Every 16 entries we insert a restart point (full key) so binary search within a block is still O(log (block_size / restart_interval)).
  • Block checksum (CRC32C): hardware-accelerated on modern CPUs; ~1 GB/s, negligible cost.
  • Compression: Zstd level 3 per block (2.5× ratio, ~500 MB/s decode). We compress per-block (not whole file) so a single block read decompresses independently. Snappy is a cheaper alternative (1.5× ratio, ~2 GB/s decode) — chosen historically for Cassandra but Zstd has won on the ratio/speed Pareto frontier since ~2018.

5.2 In-memory components

  • Memtable: concurrent lock-free skiplist keyed by (key, seqno desc). Why skiplist over B-tree? Lock-free writes (CAS-based insert), O(log n) ordered iteration, same cache locality as a B-tree for this size (64 MB). LevelDB/RocksDB canonical choice. Arena-allocated so we can free it with a single pointer flip at flush.
  • Immutable memtables: once memtable hits 64 MB, it's sealed and a new one is allocated; a background thread flushes the immutable one to L0 SSTable. We keep up to 2-3 immutable memtables in RAM to absorb write bursts.
  • Block cache: LRU (or ClockPro / S3-FIFO) over decompressed data blocks. Shared across all tablets on a node. Rows fetched via Get/Scan also populate a row cache (optional, only helpful if read-mostly hot keys; else it's dead weight — mostly skipped in LSM-heavy workloads).

5.3 Write-ahead log (WAL)

  • Per-tablet WAL is one append-only file, synced on every commit (fsync) or group-committed (10 ms window, better throughput).
  • Format: [seqno][tablet_id][op_type][key_len][val_len][key][value][crc32c].
  • On crash recovery: replay WAL into a fresh memtable, discard up through the last flushed SSTable's max seqno.
  • Log rotation: once the corresponding memtable is flushed and fsynced to SSTable, the WAL segment is deletable.
  • Group commit trick: batch many writes, one fsync — this is the difference between 5 K writes/s (per-write fsync) and 100 K+ writes/s (group commit). The latency cost is bounded by the commit window (default 10 ms).

5.4 Shard / tablet metadata

Stored in a separate metadata table (itself a range-partitioned LSM, smaller; its own metadata lives in Chubby/etcd — classic bootstrap trick, Bigtable does exactly this: METADATAROOT → Chubby).

TABLET META ROW
  tablet_id (UUID)          PK
  range_start (bytes, inclusive)
  range_end   (bytes, exclusive)
  leader_node_id
  replica_node_ids[]
  raft_group_id
  version (monotonic per tablet; incremented on split/merge/leader-change)
  state (ACTIVE | SPLITTING | MERGING | MIGRATING)
  parent_tablet_id (nullable; set during split until commit)
  last_heartbeat_hlc
  size_bytes_estimate
  bloom_fp_rate_rolling
  read_qps, write_qps (for balancer)

5.5 Tombstones and TTL

  • Tombstone: a row entry with op_type = DELETE and no value. Written to memtable → WAL → SSTable like any other write, with a sequence number.
  • Reads must scan until they either find a live version with seqno > tombstone's, or hit the tombstone → return NOT_FOUND.
  • GC horizon: tombstones are preserved until we can prove no older SSTable can still "resurrect" the deleted key, i.e. until the tombstone has been compacted down to the bottom level and no older readers hold snapshots before it. Compaction drops the tombstone at that point.
  • TTL: stored as expire_hlc per row. Reads filter out expired rows. Compaction drops expired rows (not tombstones, just the row itself — cheaper than delete-then-GC).

5.6 Why LSM beats B+ tree and trie here — quantified

Structure Write amp Read amp (point) Read amp (range) Space amp Range scan cost Notes
Sorted array + binary search (rewrite all on insert) 1 1 1.0 excellent Only viable for read-only bulk-loaded data (think: immutable index). Still useful inside LSM as the SSTable itself.
B+ tree 1-3× (in-place page writes; worse w/ SMOs) 1 (leaf read) 1-2 1.5-2× (page fragmentation, ~66% fill factor) good Writes random-IO: update in-place → ~3 IOPS per write (leaf + ancestors w/ COW, or WAL + in-place + checkpoint). Great for OLTP with high random read ratio (Postgres, MySQL InnoDB).
Trie / Radix tree 2-4× (node splits, path compression rebalances) O(key_len) node hops O(key_len + range_size) 1.5-3× (node overhead per char) ok — random pointer chase Only wins if keys share massive prefixes and you need in-memory ops (e.g. IP prefix matching in DPDK). Poor disk locality.
LSM (leveled) 10-30× (writes copied across levels) log_F(N) ≈ 3-4 (worst case: 1 read per level, filtered by bloom) 1 per level × range length 1.1× (low space amp post-compaction) excellent — sequential disk IO once positioned Batched sequential writes = absorbs write bursts. SSTables are immutable → trivial to snapshot, replicate, and cache. Chosen.
LSM (tiered) 3-5× up to 10-20× (many SSTables per level) bad (N-way merge across many files) 1.5-3× (duplicate keys across SSTables) middling Chosen by Cassandra for write throughput; poor for us because reads and scans suffer.

Decision: leveled LSM. Range scan cost dominates our NFR target, leveled compaction keeps each level non-overlapping so a scan hits ~log_F(N) ≈ 4 SSTables per tablet instead of potentially hundreds. Write amp 10-30× is acceptable given NVMe endurance (3 DWPD for enterprise NVMe × 4 TB × 30 nodes = 360 TB writes/day budget vs our 15 TB/day ingest × 20× amp = 300 TB/day; tight but fits).


6 System Diagram (ASCII) — Centerpiece #

6.1 Full cluster view

                                       CLIENT (SDK)
                                            │
                                            │ gRPC (TLS, HTTP/2, keepalive, streaming for scans)
                                            ▼
           ┌─────────────────────────────────────────────────────────────────────┐
           │                      ROUTER / COORDINATOR TIER                       │
           │  (stateless, horizontally scaled, L4 LB in front, anycast)           │
           │  Caches tablet-map from METADATA svc (TTL 30s, push-invalidated)     │
           │  Responsibilities:                                                   │
           │    - Auth & rate-limit                                               │
           │    - Resolve key / range → tablet(s)                                 │
           │    - Fan-out scan; ordered merge results                             │
           │    - Cursor encode/decode + HMAC                                     │
           │    - Retry on ABORTED_SHARD_SPLIT_RETRY                              │
           └─────┬───────────────────────────────────┬───────────────────────────┘
                 │                                   │
                 │ (for single-tablet op)            │ (for multi-tablet scan;
                 │                                   │  N parallel RPCs, ordered merge)
                 ▼                                   ▼
     ┌──────────────────────┐             ┌─────────────────────────────────────┐
     │   METADATA SERVICE   │◄────────────┤ Parallel RPCs to tablet leaders    │
     │  (own range-partit.  │  tablet-map │ for every tablet in [lo, hi] under │
     │   LSM; its root      │  queries    │ snapshot_hlc                       │
     │   pointer in         │             └─────────────────────────────────────┘
     │   Chubby/etcd)       │
     │  Stores TABLET META  │                          │
     │  Multi-paxos,        │                          │
     │  5 replicas          │                          │
     └──────────────────────┘                          ▼
                                        ┌────────────────────────────────┐
                                        │   STORAGE NODES (30× N1-highmem)│
                                        │   ~150 tablet-replicas each     │
                                        └────────────────────────────────┘
                                                     │
                                                     ▼
   ┌───────────────── ONE TABLET (Raft group of 3) ─────────────────────┐
   │                                                                    │
   │        ┌──────────────┐            ┌──────────────┐                │
   │        │ REPLICA L    │◄──Raft────►│  REPLICA F1  │                │
   │        │ (leader)     │   AppendE. │              │                │
   │        └──────┬───────┘            └──────────────┘                │
   │               │                                                    │
   │               │   Raft AppendEntries  ┌──────────────┐             │
   │               └────────────────────►  │  REPLICA F2  │             │
   │                                       └──────────────┘             │
   │                                                                    │
   │   Each replica local engine:                                       │
   │   ┌──────────────────────────────────────────────────────┐         │
   │   │ Client write path:                                    │         │
   │   │   1. WAL fsync  →  2. Memtable insert (skiplist)      │         │
   │   │   3. Return ACK                                       │         │
   │   │                                                       │         │
   │   │  ┌──────────┐  flush  ┌─────────────────────────────┐ │         │
   │   │  │ Memtable │────────►│ L0  (may overlap)           │ │         │
   │   │  │ (64 MB,  │         │ SSTable_1, SSTable_2, ...   │ │         │
   │   │  │  mutable)│         ├─────────────────────────────┤ │         │
   │   │  └──────────┘         │ L1  (non-overlapping, 10×L0)│ │         │
   │   │                       ├─────────────────────────────┤ │         │
   │   │  ┌──────────┐         │ L2  (10×L1)                 │ │         │
   │   │  │ Immut    │         ├─────────────────────────────┤ │         │
   │   │  │ Memtable │         │ L3  (10×L2)                 │ │         │
   │   │  └──────────┘         ├─────────────────────────────┤ │         │
   │   │                       │ L4  (base, largest)         │ │         │
   │   │                       └─────────────────────────────┘ │         │
   │   │                          ▲                            │         │
   │   │                Background compaction threads          │         │
   │   │                picks L_k overlap → L_{k+1}, merges,   │         │
   │   │                drops tombstones if at bottom          │         │
   │   │                                                       │         │
   │   │  Read path:                                           │         │
   │   │    Memtable → Immut → L0 (all, filtered by bloom)     │         │
   │   │             → L1 (1 file) → L2 (1) → L3 (1) → L4 (1)  │         │
   │   │                                                       │         │
   │   │  Block cache (shared) + Bloom filters per SSTable     │         │
   │   └──────────────────────────────────────────────────────┘         │
   │                                                                    │
   │   Local disks: NVMe for hot tiers (L0-L2), larger NVMe or           │
   │   SATA SSD for L3-L4 (tiered storage, optional)                    │
   │                                                                    │
   │   Durability: WAL on local NVMe + each Raft replica on              │
   │   a different rack; commit on Raft quorum (2/3) + fsync             │
   └────────────────────────────────────────────────────────────────────┘

Arrow legend (protocol · QPS · payload)

# From → To Protocol Peak QPS Payload
1 Client → Router gRPC/HTTP2 TLS 5 M ops/s req ~500B, resp up to 1 MB/chunk
2 Router → Metadata gRPC 100 K/s (cache-miss) ~1 KB (tablet map slice)
3 Router → Tablet Leader gRPC streaming 5 M/s (fan-out multiplies) same as client
4 Leader ↔ Follower Raft gRPC / custom ~500 K AppendEntries/s ~1-50 KB batched log entries
5 Leader → WAL (local fsync) direct IO ~500 K/s batched 64-256 B/entry
6 Memtable → SSTable flush local seq write ~0.1 ops/s per tablet 64 MB file
7 Compactor reads/writes local seq IO ~5 MB/s/tablet avg GB-scale files

6.2 Hot-shard split pathway (section 7b in diagram form)

             ┌──────── Tablet T (range [A, Z], 4 GB, 200 Kqps, HOT) ─────────┐
             │        ┌──────────┐ ┌──────────┐ ┌──────────┐                 │
             │        │   L      │ │   F1     │ │   F2     │                 │
             │        └──────────┘ └──────────┘ └──────────┘                 │
             └──────────────┬────────────────────────────────────────────────┘
                            │
                    ① Balancer observes T size > 2 GB OR qps > 150 K
                    ② Leader computes mid-key "M" (approx median
                       from block index; no full scan required)
                            │
                            ▼
                    ③ PROPOSE SPLIT(T, mid=M) via Raft
                       consensus; all replicas flush and
                       seal SSTables for T at seqno S*
                            │
                            ▼
                    ④ Metadata service adds T1=[A,M), T2=[M,Z]
                       as SPLITTING children in METADATA
                       (atomic CAS on tablet map version)
                            │
                            ▼
                    ⑤ Physical split: children share same
                       SSTable files (hardlinks / ref-counted);
                       on next compaction, each child rewrites
                       its slice only (split point propagates
                       down the tree lazily)
                            │
                            ▼
                    ⑥ Router cache invalidation pushed;
                       in-flight scans get ABORTED_SHARD_SPLIT_RETRY
                       *only if they lacked a pinned snapshot*;
                       snapshot-pinned scans continue against
                       the parent's sealed SSTables until done
                            │
                            ▼
                    ⑦ Possibly migrate one child to another node
                       if this node is overloaded

6.3 Cross-shard range scan fan-out

  Client Scan([lo="alpha", hi="zulu"], limit=10000)
        │
        ▼
  Router: lookup tablet map @ now() → 4 tablets cover the range
        T1=[alpha, d),  T2=[d, m),  T3=[m, tango),  T4=[tango, zulu]
        │
        ├─── gRPC stream ──► Leader(T1) → scan iterator → chunks stream
        ├─── gRPC stream ──► Leader(T2) → scan iterator → chunks stream
        ├─── gRPC stream ──► Leader(T3) → scan iterator → chunks stream
        └─── gRPC stream ──► Leader(T4) → scan iterator → chunks stream
                             │
  Router: K-WAY MERGE iterator (heap of size 4)
          - buffers a bounded # of rows per source (e.g. 1 MB / source)
          - pulls from source with lex-smallest head key
          - applies flow control: pauses sources whose buffers are full
          - emits up to `limit` rows total, strictly ordered
          - if a source's stream closes without error, drop from heap
          - if a source returns ABORTED_SHARD_SPLIT_RETRY:
               re-resolve that range slice @ same snapshot_hlc,
               fan-out to children, **deduplicate at head-key boundary**
                 (last emitted key from that range slice + 1)
          - on `limit` reached: encode cursor {snapshot_hlc,
               per-shard last-seen-key map, filter_hash, shard_map_version}
        │
        ▼
  Client receives ordered, non-duplicating stream

Why bounded buffers — If we let one slow shard starve fast shards' memory, we OOM the coordinator. Per-source high-watermark of ~1 MB applies gRPC flow control back to that shard (pauses WINDOW_UPDATE) while fast shards keep feeding.

6.4 Is this SRE-pager-carryable?

Yes — every layer is runbook-addressable: metadata outage → fallback to cached tablet map up to TTL; tablet replica down → Raft re-elects in <10s; hot shard → autosplit; compaction storm → throttle via write-stall. See §8.


7 Deep-Dive #

7a. LSM Compaction Strategies — Leveled vs Tiered vs FIFO

Why critical: Compaction is the single operator dial that moves the Pareto frontier between write-amp, read-amp, and space-amp. Pick wrong and your cluster either melts under write load (write-amp explodes, disks saturate) or dies on reads (too many SSTables per level, bloom cascade misses, long tail latency).

Alternatives with quantified trade-offs

Let N = total data size, T = size ratio between levels (fanout, typically 10), L = number of levels, B = memtable size.

Strategy Write Amp Read Amp Space Amp When used
Leveled ~L × T / 2 ≈ 10-30× ≤ L + 1 point, ≤ L range ~1.11× (steady state, next-level-size/current=10 gives 1/(T−1) overhead) RocksDB default; read-heavy; our choice
Tiered (size-tiered) ~L ≈ 4-7× (each level rewritten once) up to O(L × T) point due to many SSTables/level; range gets all of them ~2× (duplicate keys across SSTables in same level) Cassandra default; write-heavy
FIFO no effort; works only for TTL-ed time-series ~1× Metrics DBs (OpenTSDB-like) where data naturally ages out
Universal (tiered w/ bounded file count) ~log N moderate 1.5× RocksDB option for mixed loads

Derivation of leveled write amp: Data enters L0 (size B). Level L_k is T × L_{k-1}. To compact L_k into L_{k+1}, we rewrite all overlapping L_{k+1} data — in worst case each byte of L_k contributes to rewriting T bytes of L_{k+1}. Summed over all levels: write-amp ≈ T × L ≈ 10 × log_T(N/B). For our setup N/B = 150 GB / 64 MB ≈ 2400, log_10 ≈ 3.4, so 34× worst-case, 10-15× typical under skewed workloads.

Pathological case (the L7 bit you don't read in LevelDB docs): compaction can't keep up with ingest → L0 grows unbounded → point reads degrade O(number of L0 files) because L0 files overlap and each must be consulted. Mitigations:

  1. Write stall: at L0_slowdown_threshold (default 20 files), throttle client writes (sleep N μs per write). At L0_stop_threshold (default 36), block writes entirely. This is backpressure as a feature — better than OOM.
  2. Subcompaction parallelism: split a single large compaction into parallel subranges; uses multiple cores/SSDs. RocksDB max_subcompactions.
  3. Partitioned compaction: ScyllaDB's "incremental compaction" reduces temporary disk space during compaction (usually 2× target file size).

Our choice: Leveled, with pluggable override per column family. Reasoning:

  • Range scans dominate — leveled's single SSTable per level above L0 gives strictly bounded scan amplification (L+1 iterators).
  • Write amp 10-30× × 15 TB/day ingest × 30 nodes = 10-15 TB/day/node. NVMe with 3 DWPD × 4 TB = 12 TB/day endurance budget per drive — borderline. We allocate 2 drives per node (RAID0 or independent tablet assignment) → 24 TB/day budget, safe.
  • Allow tiered as a per-column-family override for write-heavy cold data (logs).

Failure modes

  • Compaction storm: many tablets compacting simultaneously after a mass flush (e.g. after recovering from a long WAL). Mitigation: global compaction rate limiter (bytes/s across all tablets on node), randomized jitter in flush scheduling.
  • Compaction stuck: a corrupt SSTable or a bug. Detection: compaction progress not advancing for 5 min. Mitigation: alert, quarantine SSTable, trigger repair via Raft catch-up from peers.
  • Write amp exceeding endurance: detected by SMART metrics. Mitigation: move to tiered for low-SLA column families, or raise fanout T (write amp scales as T × L ≈ T × log_T(N); increasing T cuts L faster than it grows T).

Real systems named: RocksDB (leveled default, tunable), Cassandra/Scylla (size-tiered default), HBase (tiered), LevelDB (leveled, origin), TokuDB (fractal tree — different regime entirely, write-amp 2-3× at cost of more complex reads), InfluxDB TSM (custom tiered for time-series).

7b. Dynamic Range Repartitioning — Hot-Tablet Split Without Freezing the Range

Why critical: Range-partitioning gives us locality (scans stay on one shard) but at the cost of load skew: a hot key prefix (e.g. every user wrote to timestamp/2026-04-19/* today) slams one tablet while others idle. Without automatic mid-key splits à la Bigtable tablets, the cluster melts. The earned-secret part is doing this without dropping in-flight range scans on the floor.

The L7 detail — three-phase split:

Phase 1 (Prepare): cheap, non-blocking

  • Leader of hot tablet T samples median key from in-memory block-index structure. Block index already has ~100 K entries for a 1 GB tablet (one per 10 KB block), so the median is approx-median in O(index size) with no SSTable reads.
  • Leader proposes SPLIT(T, mid=M, S*) via Raft where S* = current seqno. All replicas agree at log position.

Phase 2 (Commit): metadata atomic

  • Leader flushes memtable so the split point is clean (all pre-S* writes are in SSTables). Reason: we don't want an active memtable straddling a boundary.
  • Leader CAS on tablet-map in Metadata service: T (state=ACTIVE)T (state=SPLITTING), T1=[start,M) ACTIVE, T2=[M,end) ACTIVE. Version bumps atomically.
  • Metadata service bumps a tablet_map_version. Routers notice stale cache → refresh.
  • All future writes/reads go to T1 or T2 by range.

Phase 3 (Physical split): lazy

  • T1 and T2 initially share T's SSTable files via OS-level hardlinks (or ref-counted handles in the engine). They share the SSTables read-only.
  • Each child has its own WAL, new memtable, new L0 flushes.
  • On next major compaction for T1 (or T2), the child rewrites only its slice of the shared files. Ref-count of old file drops; when 0, unlink.
  • Savings: we avoid rewriting 1 GB of data during the split. Split is O(metadata) in hot path.

In-flight scan handling — the earned-secret mechanism:

  • Every scan carries snapshot_hlc. Metadata service retains tablet-map versions for at least max_snapshot_age (e.g. 1 hour).
  • A scan that started at snapshot_hlc = T0 sees the pre-split tablet map. It pins T's replica's SSTables (reference count), continues reading. The replica keeps T's SSTables around (ref-counted) until all snapshots < S* drain.
  • A scan whose coordinator cache is stale and it tries to hit T after split: leader returns ABORTED_SHARD_SPLIT_RETRY with current tablet-map version. Coordinator refreshes, re-fans-out to T1/T2 for the remaining range (post-cursor).
  • Deduplication: cursor's last_key_emitted_per_range + new sub-ranges' lower bound = strict boundary. We continue from last_key + 0x00 in the correct child(ren). Because children's ranges are disjoint and cover T exactly, no dupes, no skips.
  • Ordering: children's ranges are lex-disjoint and adjacent, so merging output from T1 then T2 preserves order trivially if we finish T1 first. With fan-out parallelism we use a k-way merge heap with children as sources, maintaining global order.

Alternatives considered:

  1. Freeze-and-split (naive): stop writes to T, flush, split, restart. Latency spike for duration of flush. Rejected: unacceptable in a 99.99% SLA world; a busy tablet might take 10 s to flush 64 MB through fsync queue contention.
  2. Consistent hashing ring (no splits): no splits possible, rebalance via virtual node reassignment (Cassandra). Rejected earlier: loses range locality (see §3 on data structure choice).
  3. Manual splits only: operator runs a tool. Rejected: cannot meet hot-shard SLO reactively.
  4. Pre-splitting at table creation: useful complement, but doesn't solve dynamic hotspots. We adopt it as a feature (allow user to hint salt prefixes or initial split points at table create).

Failure modes

  • Split aborted mid-flight (leader crash between Phase 2 and Phase 3): Raft guarantees Phase 2 is either committed or not. If committed, new leader resumes Phase 3 (physical split lazily). If not committed, new leader sees T still ACTIVE and might re-propose.
  • Split ping-pong (tablet splits, then immediately merges back because load drops): hysteresis — require load thresholds above split + below merge with a minimum dwell time (e.g. 1 h).
  • Too many tiny tablets: global ceiling on tablet count; reject splits when over threshold; alert operator.
  • Client cache poisoning: router caches an old tablet map, hammers wrong leader. Leader returns current version; router refreshes. Cap cache TTL at 30 s as a safety net even without push invalidation.

Real systems named: Bigtable (tablets, this is literally the Bigtable model), Spanner (splits but with Paxos + TrueTime for commit), HBase (region splits, originally blocking — since HBase 0.94 online splits via reference files == our hardlink approach), CockroachDB (range splits at 512 MB default), TiKV (region splits, multi-Raft), FoundationDB (different — shard-server layer does this for KV; built-in), YugabyteDB (tablet splits).

7c. Cross-Shard Range-Scan Coordination Under Concurrent Splits

Why critical: The one NFR that is uniquely stressed by range scans — ordered delivery across an elastic set of shards — lives here. Get/Put can get away with "route to current leader; retry on not-leader". Scan can't — it has state (cursor, ordering) that must be preserved across shard-boundary changes and leader changes.

The algorithm (implement-from-this-description)

SCAN(lo, hi, limit, cursor):
    if cursor is empty:
        snapshot_hlc = now_hlc()
    else:
        decode cursor → {snapshot_hlc, per_shard_last, map_version}

    tablet_map = MetadataSvc.get_map(snapshot_hlc)  # historical view
    shards = tablet_map.tablets_covering([lo, hi]).sorted_by_range()

    # k-way merge setup
    heap = MinHeap()  # keyed by head_entry.key
    streams = {}
    for s in shards:
        effective_lo = max(lo, s.range_start, per_shard_last.get(s.id, lo))
        effective_hi = min(hi, s.range_end)
        if effective_lo > effective_hi: continue
        streams[s.id] = s.leader.OpenScan(effective_lo, effective_hi,
                                          snapshot_hlc,
                                          buffer_high_watermark=1MB)
        first_entry = streams[s.id].peek()
        if first_entry: heap.push((first_entry.key, s.id))

    emitted = 0
    while heap and emitted < limit:
        (key, s_id) = heap.pop()
        entry = streams[s_id].next()
        yield entry
        per_shard_last[s_id] = entry.key
        emitted += 1
        next_entry = streams[s_id].peek()
        if next_entry is ABORTED_SHARD_SPLIT_RETRY:
            # s split during our scan and our snapshot was lost/GC'd
            close streams[s_id]
            new_shards = MetadataSvc.get_map(now_hlc())
                          .tablets_covering([per_shard_last[s_id], s.range_end])
            for ns in new_shards:
                open new stream @ latest_hlc,
                     from (per_shard_last[s_id]+0x00, min(hi, ns.range_end)),
                     push peek into heap
            # NOTE: snapshot isolation is now violated for these new shards;
            #       mark response with `snapshot_downgraded=true`
        elif next_entry is None:
            close streams[s_id]
        else:
            heap.push((next_entry.key, s_id))

    if heap still has entries AND emitted == limit:
        next_cursor = encode({snapshot_hlc, per_shard_last,
                              tablet_map_version, filter_hash})
    else:
        next_cursor = empty

Ordering proof sketch: Shards have disjoint ranges. Within each shard, the scan iterator yields keys in ascending order (memtable + SSTables k-way merged). At every pop from the heap we emit the globally-smallest unemitted key (heap property, over shard head pointers). No shard can later emit a key smaller than what's currently at heap head because the shard's next key ≥ current head. QED for happy path.

Under a split: New shards' ranges are ⊂ old shard's range and disjoint from other shards still in the heap (because other shards never overlapped old-shard's range). So injecting them into the heap preserves the invariant. The snapshot is downgraded for new shards (we lost the historical map for them) — flagged in response; clients needing strict snapshot reject.

Buffered pull + flow control: each stream is a gRPC server stream with a ~1 MB high-watermark; coordinator's heap only pulls when it needs the next element. This naturally pauses shards ahead of their peers. Without this, a fast shard floods coordinator memory waiting for the slow shard's lex-smaller keys.

Alternatives considered:

  • Sequential shard visit: visit shards in range order, exhaust one before moving to next. Works, simple. Cost: total latency = sum of per-shard latencies, ~N× slower than parallel. Rejected.
  • No snapshot, read latest everywhere: avoids historical metadata complexity; scan sees writes landing during scan (inconsistent across shards). Some systems do this (BigTable is technically not snapshot-consistent across rows — snapshot is per-row). Rejected here because we promised "consistent snapshot option" in API.
  • Scatter-gather with client-side merge: client does the merge. Rejected: the client SDK would need the tablet map, auth to many leaders, and retry logic — a massive surface. Coordinator centralizes it.

Failure modes

  • Coordinator crash mid-scan: cursor is client-side; client retries on a fresh coordinator. Same snapshot_hlc; same results.
  • Shard leader fails mid-stream: replica promoted via Raft; stream RPC returns UNAVAILABLE; coordinator re-opens stream to new leader at per_shard_last[s_id]+0x00 within same snapshot (leader has access to the same frozen SSTables since Raft log contains the SSTable references).
  • Snapshot too old: metadata map at snapshot_hlc has been GC'd. Return SNAPSHOT_TOO_OLD; client restarts.
  • Pathological map churn: splits happen faster than scan progresses. Snapshot is the answer — it freezes the map, and GC horizon is generous enough for realistic scan durations.

Real systems named: Spanner read-only transactions (consistent snapshot via TrueTime + Paxos), Bigtable (NOT snapshot-consistent across rows — simpler model; we're doing more), CockroachDB (snapshot via HLC + MVCC), FoundationDB (5-second read version window), TiKV (percolator-style MVCC).


8 Failure Modes & Resilience #

Component Failure Detection Blast radius Mitigation Recovery
Storage node HW Disk fail (one NVMe) SMART + IO error ≤ 150 tablets on that node lose 1 replica Other 2 replicas serve; Raft continues Replace disk; bootstrap new replica via Raft snapshot; minutes-to-hours
Storage node HW Node crash heartbeat miss 5 s 150 tablets lose 1 replica Still quorum Raft replication catch-up; new node fills in
Network Rack partition BGP/L3 alerts Entire rack's tablets lose 1 replica (often leader) Raft re-elects leaders in other racks Partition heals → stale replicas catch up
Split-brain Network partition splits Raft group 2-1 Raft term increments on minority side but can't commit Writes stall in minority Raft safety → no two leaders at same term with quorum Partition heals → minority learns new term, truncates log
Compaction Storm (many concurrent) Compaction bytes/s spikes; write stalls Write latency spike (tail) Rate-limiter kicks in; deprioritize minor compactions Self-resolves as backlog clears
Compaction Stuck compaction Progress == 0 for 5 min One tablet slows reads Alert; manual or automatic restart of compaction thread; if corrupt SST, repair from replica Re-ship SSTable from Raft peer
Bloom filter FP-rate hot loop (keys sharing bits) per-SSTable bloom_miss_rate > 5% Read amp doubles on that SSTable Rebuild bloom with more bits or prefix bloom (see §10) Next compaction fixes
WAL Corruption (partial write on crash) CRC mismatch during recovery One tablet can't recover latest writes Truncate at last-valid entry (durability is from Raft quorum, not local disk alone) Catch up from peer
WAL Disk full Monitored Writes fail Reject writes with UNAVAILABLE; alert Operator adds disk / throttles ingest
Memtable OOM (many tablets × large memtables) Node RSS spike; OOM-killer Node down Shared global memtable budget (RocksDB-style); flush largest first
Metadata svc Outage (Paxos group down) Health check No tablet-map refreshes; routers use cached map Routers continue w/ cached map until TTL; splits paused; cluster keeps serving Restore Paxos quorum
Metadata svc Slow / cache-miss storm metadata latency tail Cold routers stall on first request Per-router local cache + jittered refresh
Raft Leader isolated No AppendEntries ack Tablet reads OK via followers if allowed; strong reads stall Quick re-election (150-300 ms randomized timeout) New leader in <1 s
Coordinator/router Pod crash mid-scan client timeout Scan aborts Client retries with cursor; stateless router <100 ms to new pod
Hot shard qps spike 10× per-tablet qps monitor Node saturates Autosplit (§7b); shed to replicas for reads Minutes
Bloom storm Misses cascading across levels read amp per tablet Read latency tail Tune bloom bits/key higher; consider prefix bloom Next compaction regenerates
Time skew NTP failure chrony alert HLC drifts; bounded-staleness may be wrong HLC caps drift + uses logical counter; strong reads still safe via Raft read-index Fix NTP

Runbook rule of thumb for SRE: 90% of pages resolve in "wait for Raft/compaction/split to converge". 10% are disk-fail bootstrap and metadata-service outage — these are the drills.


9 Evolution Path #

v1: Single-node sorted-file + B+ tree

  • Use case: 10 M keys, 10 GB, one box, you're a startup.
  • Data structure: a single SSTable (sorted flat file) + in-memory B+ tree index for block-offset lookup. Writes append to a tail + occasional bulk-rebuild of the SSTable.
  • API: local library (no RPC).
  • Pros: dead simple; great read performance; zero ops.
  • Cons: writes stall during rebuild; no crash recovery beyond what OS gives you; no scale-out.

When to move on: write rate exceeds bulk-rebuild tolerance, or dataset > single disk.

v2: Single-node LSM (RocksDB-like)

  • LSM with memtable + WAL + tiered/leveled SSTables + block cache + bloom filters.
  • Embeds in the application as a library, or served via a single-node gRPC wrapper.
  • Pros: fast writes, reasonable reads, crash recovery via WAL, proper compaction.
  • Cons: still single-node. One disk loss = all data lost (unless you snapshot to blob storage).

When to move on: one box can't hold the data; or you need HA.

v3: Sharded range-partitioned LSM with Raft replication and dynamic splits (the centerpiece design)

  • Everything above + range-partitioned tablets, Raft replication per tablet, dynamic splits, metadata service, coordinator tier.
  • Bigtable / HBase / Spanner layer 1. Serves billions of keys, billions of ops/day.
  • Evolvable: multi-region follower reads → active-active with conflict resolution → transactions with 2PC+TrueTime (Spanner).

Rough timelines I've seen in practice: v1 → v2 is a 2-week project (adopt RocksDB). v2 → v3 is 2-4 engineer-years; this is "build a database team" territory. Most orgs should buy (Cloud Bigtable, Cloud Spanner, FoundationDB, TiKV) rather than build. Reasons to build: proprietary access patterns (e.g. Meta's RocksDB-based ZippyDB, MyRocks), cost at extreme scale, regulatory/sovereignty.


10 Out-of-1-Hour Notes (solo-study depth) #

10.1 Compression

  • Block-level Zstd level 3 default. Per-block so block cache holds decompressed; decompression amortized across queries.
  • Dictionary compression for small values and repetitive keys (Zstd trained dict per column family): 2-4× additional ratio for small payloads < 1 KB, where per-block context is too little. Cost: dict management (train offline, version, distribute).
  • Compression per level: L0-L1 uncompressed (hot, frequently rewritten), L2+ Zstd (cold, stable). Tunable per CF.

10.2 Compaction-aware TTL

  • TTL rows are dropped at compaction. Key earned-secret: don't wait for natural compaction; schedule TTL-aware compactions to pick SSTables whose min_expire_hlc < now first. RocksDB's "periodic compaction" does this. Saves space for aggressive-TTL data (metrics, logs).
  • Corollary: Split by time prefix for time-series (make it so an entire SSTable ages out at once → FIFO compaction for that CF).

10.3 Prefix bloom vs key bloom

  • Key bloom: FP < 1% with 10 bits/key. Perfect for point lookups.
  • Does NOT help prefix scans (startsWith(p)) — bloom tests an exact key. For Scan(p, p_upper_bound) the engine must consult every SSTable whose key range overlaps [p, p_upper_bound].
  • Prefix bloom: bloom over the first K bytes of each key (e.g. K=8). If prefix p (length ≥ K) is not in the bloom, skip this SSTable even for scans. Saves IO when most SSTables don't match the prefix.
  • Ribbon filter (Dillinger/Walzer 2021, merged into RocksDB): same FP rate as bloom with ~30% fewer bits, and faster queries. Modern replacement.

10.4 Write-stall back-pressure

  • Thresholds: soft_pending_compaction_bytes, L0_file_count, memtable_count.
  • When exceeded: client-visible latency rises (by design) — this is the system telling the client "slow down or we'll fall over."
  • Alternative: reject with UNAVAILABLE. Explicit; clients must implement token bucket. Worse UX but easier tuning.
  • Observability: export write_stall_us_per_op as a first-class SLI.

10.5 Unified cache (row cache + block cache)

  • Block cache (decompressed data blocks): universal win.
  • Row cache (whole rows keyed by key): helps repeated point Gets on the same row. Hurts range scans (row cache is useless there). Enable per-CF when read pattern warrants.
  • Unified cache (RocksDB cache_index_and_filter_blocks): puts index + filter blocks in the same LRU as data blocks, subject to the same memory budget. Prevents index/filter blocks from pinning RAM forever; avoids OOM with many CFs.

10.6 NVMe vs HDD placement

  • Tiered storage: L0-L2 on NVMe (hot, small), L3-L4 on SATA SSD or even HDD (cold, large). Bigtable did this with SSD/HDD; HBase has HFile archive tiers.
  • Cost math: 1 PB NVMe ≈ $150 K; 1 PB HDD ≈ $20 K. If 90% of reads hit top levels (empirically true for Zipfian workloads), HDD tail tier saves ~$120 K/PB.
  • Caveat: compaction reads/writes span the boundary → ensure enough NVMe headroom for compaction IO or it becomes HDD-bound.

10.7 Geo-distributed consistency

  • TrueTime (Spanner): GPS+atomic-clock synchronized clocks with bounded error ε (~7 ms p99); commit waits out ε → strict serializable. Requires hardware.
  • HLC (Hybrid Logical Clock): combines wall-clock + logical counter; safe under skew, monotonic, small (64 bits). Doesn't give external consistency like TrueTime but is enough for causal consistency, snapshot reads, stale reads with bounded-staleness guarantees.
  • Multi-region strategies:
    • Follower reads in other regions: cheap for bounded-staleness reads; leader stays in one region. CockroachDB "follower reads."
    • Leader per range, placed near hot writer: CRDB's locality-aware leaseholder assignment.
    • Active-active: per-region Raft groups; cross-region async replication + LWW or CRDT-on-value merge. Loses linearizability, gains local-write latency.

10.8 Observability (what to graph / alert on)

Must-have dashboards per-shard and per-node:

  • p50/p95/p99/p99.9 for Get, Put, Scan.
  • Write stall microseconds (key SLI).
  • Compaction bytes/s, compaction queue depth.
  • L0 file count (alert > 15).
  • Bloom FP rate rolling (alert > 2%).
  • Memtable flush latency.
  • WAL fsync p99.
  • Per-tablet qps histogram (identify hot-shard candidates).
  • Raft leader elections per minute (alert on flapping).
  • Metadata service lookup rate + cache hit rate at routers.
  • NVMe wear (% endurance used / day).
  • Cross-shard scan fan-out size distribution (p99 of shards per scan).
  • ABORTED_SHARD_SPLIT_RETRY rate (alert: > 10/s signals too-aggressive splits).

10.9 Security / isolation (bonus)

  • Per-tenant column families with separate encryption keys (envelope encryption with a KMS).
  • Per-key encryption at rest would break prefix locality — stick to block-level.
  • Row-level ACLs via a separate auth service; enforce at coordinator.
  • Rate-limit per tenant at coordinator (token bucket keyed by tenant-id + op-type).

10.10 Testing

  • Jepsen-style invariants: no dupes/skips across scans during induced splits and partition + leader-kill chaos.
  • Compaction fuzz: random SSTable corruption, ensure recovery from peer.
  • Cursor compatibility tests: old cursor served by new server version must not break (cursor schema is a compatibility surface).

Appendix: Quick-reference WHY-table #

Choice WHY Rejected
LSM over B+ tree Write amp batched to sequential IO; compaction gives near-1× space amp; SSTables are immutable → easy replication B+ tree: in-place updates → random IO; page fragmentation → 1.5-2× space amp
Leveled over tiered Bounded read amp = L+1; crucial for range scans Tiered: 10-20× scan amp, unacceptable for our primary op
Range over hash partitioning Preserves lex order end-to-end — single-shard scans stay on single shard Hash: range scan becomes a full cluster fan-out
Raft over Paxos Understandability, log consistency, battle-tested impls Multi-Paxos: equivalent in theory, harder to debug
Per-tablet Raft group Independent parallelism; failure isolated Single global Raft: doesn't scale
Coordinator tier stateless Scale horizontally; no recovery state Stateful router: extra failure mode
HLC default, TrueTime optional No special hardware; strong-read via Raft read-index TrueTime: best external consistency but hardware-dependent
Skiplist memtable Lock-free concurrent writes; simple B-tree: write contention under concurrent insert
Opaque HMAC'd cursor Survives splits; clients can't forge last_key: breaks on split
Hardlink-based split O(metadata) split; no heavy IO on hot path Full-copy split: latency bomb on hot tablet

End of document.

esc
navigate open esc close