All problems

Q6 Real-Time Analytics & Monitoring 32 min read 11 sections

Ad Click / Hashtag Aggregator — L7 System Design

Ingest a high-volume event stream, serve realtime dashboards, and still produce audit-grade historical counts.

StreamingPartitioningConsistencyObservabilityCost / efficiency

1 Problem Restatement & Clarifying Questions #

Restatement

Design a system ingesting a high-volume event stream (ad clicks or hashtag mentions) at 10K events/sec baseline, 100K/sec peak, serving two consumers with very different contracts:

  1. Near-real-time dashboards — operators and advertisers watching clicks-per-minute, top-K trending ads, CTR curves; tolerate ~1% approximation, want <2s p99 freshness.
  2. Billing-grade historical totals — daily per-advertiser invoices, audited, must be exact to the cent, SOX/GDPR reconcilable.

These contracts drive Lambda architecture (not Kappa v1): the correctness requirement on billing is not negotiable, but the latency requirement on dashboards is. A single pipeline cannot simultaneously optimize both without sacrificing one.

Why ad-click (not hashtag) as the primary framing

  • Money flows through it — a 0.1% miscount = regulatory exposure, not a cosmetic bug.
  • Forces exactly-once effective semantics, which is the L7 discriminator on this problem.
  • Fraud/bot filtering is a first-class concern, not an afterthought.
  • Hashtag is recoverable by declaring "we showed approximate counts; refresh shows true-up" — ad-click billing is not.

If interviewer wants hashtag: same pipeline, drop the billing reconciler, relax dedup window, accept HLL/CMS everywhere.

Clarifying questions (ask 4-5, don't drown the interviewer)

  1. Latency SLO split: Dashboards p99 <2s for trailing-hour aggregates; billing totals reconciled within 24h with <0.01% error vs raw log? → assume yes.
  2. Dedup semantics: Is a client-generated event_id available and stable across retries? Dedup window (24h? 7d?) → assume client SDK emits UUIDv4 event_id; 24h dedup window in streaming, extended to 7d in batch.
  3. Fraud scope: Do we filter bot clicks inline (billing excludes them) or tag them and let a downstream fraud service decide? → assume tag-in-stream, subtract-in-batch. Inline filtering adds a synchronous dependency we can't afford at 100K eps.
  4. Tenant count & isolation: 10K advertisers? 100K? Per-tenant hard isolation or soft quotas? → assume ~100K advertisers, soft quota (rate limiting at edge), shared Kafka topic partitioned by ad_id.
  5. Replay / backfill: Must support re-aggregating last 30 days after a schema or attribution bug? → yes, batch layer is the system of record for this.
  6. Geo / regulatory: Multi-region? GDPR right-to-erasure reaches pre-aggregated cubes? → single-region v1, multi-region v3; GDPR handled via user_id hashing + tombstone replay in batch (Section 10).
  7. Top-K granularity: Trending = top-100 hashtags in last 5 min globally, or per-country? → global in v1, dimensional top-K in v3.

Assumptions locked

  • Client SDK (web pixel, mobile SDK) generates event_id + timestamp at emit.
  • Clocks within ~5 min skew (handled by watermarks).
  • Dashboard consumers are internal advertisers & ops — ~10K concurrent queriers, not end-users.
  • Billing is per-day, closed T+1.

2 Functional Requirements #

In-scope

  1. F1. Ingest a click event: {event_id, ad_id, user_id_hash, timestamp_ms, dims: {country, device, placement, campaign_id}, client_ts, server_ts}.
  2. F2. Deduplicate by event_id within a configurable window (24h stream, 7d batch).
  3. F3. Near-real-time dashboard query: "Clicks for ad_id X in last 5m / 1h / 24h, sliced by dim." p99 <2s, freshness <10s.
  4. F4. Top-K trending: "Top 100 ads by clicks in last 5m", "Top 100 hashtags in last 15m". p99 <1s.
  5. F5. Billing-grade daily totals: "Total billable clicks for advertiser A on date D, excluding fraud." Exact, reconciled, immutable once closed.
  6. F6. Backfill / replay from raw event log — idempotent, re-runnable without double-counting.
  7. F7. Fraud scoring hook — every event annotated with fraud_score; billing uses threshold; dashboards optionally show raw vs net.
  8. F8. Audit trail — ability to go from a billing total back to the raw events contributing (L7 often-missed requirement; regulators ask).

Out-of-scope (explicit)

  • Attribution logic (last-click, multi-touch) — separate attribution service consumes our clean event stream.
  • Payment processing, invoice rendering, tax — billing totals is our output; invoicing is another system.
  • Ad serving / click URL redirect — we consume the emitted event, we don't cause it.
  • Full-fraud ML model training — we expose the feature store and scoring hook; ML team owns the model.
  • Real-time per-user personalization.

3 Non-Functional Requirements + Capacity Estimate #

NFRs

Dimension Target Rationale
Ingest availability 99.9% (8.7h/yr) Lost events are lost money; but 99.99% would require multi-region active-active from day 1 — defer to v3.
Billing-total availability 99.99% (52 min/yr) Read path, batch-produced, can be cached.
Dashboard freshness (p99) <10s end-to-end Watermark lag + OLAP ingest + query
Dashboard query latency (p99) <2s Pre-aggregated cubes, not scans
Billing reconciliation SLA T+24h, <0.01% drift vs raw Auditors accept this; anything looser voids audit
Exactly-once semantics Effective (idempotent sinks) True exactly-once is a lie; see §7a
Durability 0 lost events within replication window Kafka RF=3, min.insync.replicas=2, acks=all
Max event-time lateness tolerated 24h (stream) / 7d (batch) Mobile offline events
Tenant isolation Soft (rate limits, quota) in v1; hard in v3 Cost vs complexity

Back-of-envelope capacity (100K eps peak — the design-to number)

Event size:

  • event_id (16B UUID) + ad_id (8B) + user_hash (16B) + ts (8B) + 5 dims × avg 16B (80B) + fraud_features (~200B) + proto overhead
  • ~500 bytes wire-format protobuf

Ingest bandwidth:

  • Peak: 100K × 500B = 50 MB/s = 400 Mb/s
  • After Kafka RF=3: 150 MB/s network to brokers, 150 MB/s disk write per broker-aggregate

Daily volume:

  • Average ≈ 30K eps (peak 100K, off-peak 10K, diurnal) × 86,400s = ~2.6B events/day
  • Assume peak-hour concentration: 100K × 3600 = 360M events/hour × 4 peak hours + avg elsewhere = ~2–3B events/day, round to 3B
  • Raw storage: 3B × 500B = 1.5 TB/day raw, 4.5 TB/day with RF=3 in hot store
  • Compressed (zstd 4×): **380 GB/day/replica hot, ~95 GB/day cold (S3 Parquet + column compression)**
  • Annual cold: 95 GB × 365 = ~35 TB/year/advertiser-cluster (Parquet)

Kafka partition math:

  • Per-partition sustainable throughput: ~5K eps (conservative; Kafka can do 10–20K/partition but headroom for rebalance, spikes, consumer lag)
  • Partitions needed for ingest: 100K / 5K = 20 partitions minimum
  • Plus headroom for hot-key skew (celebrity ads with 10× traffic of p99): 64 partitions (power of 2, room for re-key + salt scheme)
  • Plus 3× for parallel consumers (dashboard Flink job, billing Flink job, fraud Flink job): reuse same partitions with consumer groups, partition count stays at 64
  • Replication: RF=3, min.insync.replicas=2
  • Retention: 72h hot (for Flink replay + reconciliation comparison) = 3B × 3 × 500B = ~4.5 TB × 3 RF = 13.5 TB across Kafka cluster (zstd-compressed ~3.5 TB)

Flink job sizing (streaming dashboard aggregator):

  • Parallelism = partition count = 64 slots minimum, typically 1.5× for catchup: 96 slots
  • Per slot: ~2 GB heap (window state) + 2 GB off-heap (RocksDB state backend)
  • Cluster: 96 slots / 8 slots-per-TM = 12 TaskManagers × 32 GB = 384 GB total
  • Checkpoint state size: rough estimate 100M distinct (ad_id, window) keys × 100B state = 10 GB checkpoint, to S3, every 60s

OLAP (ClickHouse) sizing:

  • Pre-aggregated 1-minute tumbling windows, 1M distinct ad_ids × 1440 min/day × ~40B/row (ad_id, window, count, HLL sketch) = **60 GB/day pre-agg**
  • 90-day retention hot: 5.4 TB, RF=2, compressed 5× → ~2 TB disk
  • QPS: 10K concurrent dashboards, ~1 query/5s = 2K qps; with Redis cache hit rate 80% → 400 qps to ClickHouse
  • Nodes: ClickHouse handles ~1K qps/node for pre-agg → 3 nodes sufficient, deploy 4 for HA

Top-K structure memory:

  • Per Flink slot: min-heap size K=1000 (10× over-provisioning of top-100) × 32B/entry = 32 KB per slot per window — negligible
  • Global reduce: 64 slots × 1000 = 64K entries → heap of 100K → 3.2 MB total — trivial

Cost (rough, AWS-ish, monthly, for sanity):

  • Kafka: MSK 6× m5.2xlarge = ~$8K/mo
  • Flink: EMR/EKS 12 TMs + 2 JMs = ~$6K/mo
  • ClickHouse: 4× r5.4xlarge = ~$4K/mo
  • S3 raw + Parquet cold: 35 TB × $0.023 = ~$800/mo + egress
  • Redis cluster: ~$1K/mo
  • Total ~$20K/mo for 3B events/day = ~$7 per billion events (rough; real production 2–4× with redundancy, observability, staging)

These numbers drive every downstream choice.


4 High-Level APIs #

Two ingest paths (producer), three query paths (consumer).

Ingest (producer contract)

Preferred: Kafka direct publish via client SDK → edge proxy → Kafka. HTTP fallback for environments without Kafka client.

# HTTP fallback (edge proxy)
POST /v1/events
Content-Type: application/x-protobuf  (or application/json)
Idempotency-Key: <event_id>   # client-generated UUIDv4
Authorization: Bearer <sdk_token>

Body (proto):
  ClickEvent {
    string event_id = 1;        // REQUIRED, dedup key
    string ad_id = 2;           // REQUIRED, partition key
    string user_id_hash = 3;    // SHA-256(user_id, daily_salt)
    int64 client_ts_ms = 4;     // event-time
    int64 server_ts_ms = 5;     // filled at edge
    map<string,string> dims = 6; // country, device, placement, ...
    string sdk_version = 7;
    bytes signature = 8;        // HMAC for anti-tampering
  }

Response 202 Accepted:
  { "event_id": "...", "accepted_at_ms": ... }
Response 409 Conflict (duplicate in edge LRU):
  { "event_id": "...", "first_seen_at_ms": ... }
Response 429 Too Many Requests (per-SDK-token rate limit)
Response 400 Bad Request (schema / signature failure — dropped, alerted)

Why client event_id + Idempotency-Key? Foundation of exactly-once. Without it, the whole pipeline becomes best-effort. (See §7a.)

Why proto? Schema evolution without JSON breakage; 3× smaller on wire; schema registry integration.

Why HMAC? Billing means financial incentive to forge. Signature verified at edge; forged drops are logged for fraud team.

Dashboard query

GET /v1/metrics/{entity_type}/{entity_id}?range=1h&window=1m&agg=count&group_by=country
  entity_type: ad | campaign | hashtag | advertiser
  range: 5m | 15m | 1h | 24h | 7d
  window: 1m | 5m | 1h | 1d  (bucket granularity)
  agg: count | unique_users (HLL-backed) | sum_bid
  group_by: optional dim (country, device, placement, hour_of_day)

Response:
  {
    "entity_id": "ad_123",
    "series": [
      {"ts": 169..., "count": 4321, "unique_users_approx": 3910},
      ...
    ],
    "data_freshness_ms": 8400,   // L7: expose watermark lag to client
    "is_approximate": true,      // L7: truthful
    "source": "speed_layer"
  }

L7 nuance: data_freshness_ms and is_approximate are first-class. Clients can choose to wait for batch-reconciled numbers or accept the speed layer's approximation — hiding this breaks trust. (Google's internal monitoring systems do exactly this: stale_at_micros on every datapoint.)

Top-K trending

GET /v1/trending?entity_type=hashtag&window=5m&k=100&country=US

Response:
  {
    "window_start_ms": ..., "window_end_ms": ...,
    "entries": [
      {"entity_id": "halloween", "count_approx": 12840, "rank": 1, "error_bound": 128},
      ...
    ],
    "algorithm": "count-min-sketch+min-heap",
    "is_approximate": true
  }

Error_bound published (see §7 CMS math) — L7 systems publish their own uncertainty.

Billing totals (separate service, separate SLA)

GET /v1/billing/daily_totals?tenant=advertiser_42&date=2026-05-03

Response:
  {
    "tenant": "advertiser_42", "date": "2026-05-03",
    "status": "CLOSED",             // OPEN | PROVISIONAL | CLOSED | DISPUTED
    "closed_at_ms": ...,
    "raw_clicks": 1234567,
    "fraud_excluded": 12345,
    "billable_clicks": 1222222,
    "reconciliation_version": "v3",  // bumped on re-run
    "source_batch_run_id": "dt=2026-05-03/run=2026-05-04T06:00Z",
    "checksum": "sha256:...",         // audit integrity
    "dispute_allowed_until_ms": ...   // T+7d
  }

Distinct service from dashboards. Different SLO, different storage, different team owns on-call. Billing outage = money paused, not dashboards dark.

Replay / backfill (internal)

POST /v1/ops/replay
  { "source_topic": "clicks-raw",
    "from_offset": ..., "to_offset": ...,
    "target_job": "daily_reconciler_v4",
    "reason": "schema_fix_PROJ-1234" }

Authenticated to on-call only. Must produce a new reconciliation_version.

gRPC variants

All of the above exposed via gRPC too (typed protos, streaming for /metrics watch-mode). HTTP+JSON for external advertiser dashboards, gRPC internal.


5 Data Schema #

Five storage engines, each chosen for its specific access pattern. Justification mandatory.

5a. Kafka topic clicks.raw.v1

  • Partitions: 64
  • Replication factor: 3, min.insync.replicas=2
  • Key: hash(ad_id) :: salt — salt is 0 for cold keys, small int for hot keys (see §7b)
  • Value: ClickEvent proto (see §4)
  • Retention: 72h (Kafka as replayable buffer, not durable log — batch layer owns durable copy)
  • Compaction: No — event log, not keyed state
  • Compression: zstd at producer, level 3
  • Why Kafka (not Pulsar, Kinesis, Pub/Sub)? Flink's Kafka connector with exactly-once producer (idempotent + transactional) is the most mature streaming contract. Pulsar has better multi-tenancy but Flink integration lags. Kinesis caps at 1K records/sec/shard — would need 100 shards, 2× expensive and no compacted topic. Pub/Sub is at-least-once only, no partition ordering guarantee per key — kills our watermark story.

5b. Cassandra table clicks_raw_by_day

  • Role: Source-of-truth durable log for billing reconciliation, queryable by ad_id + day for debugging.
  • Schema:
    CREATE TABLE clicks_raw_by_day (
      ad_id text,
      event_day date,
      event_id uuid,
      client_ts_ms bigint,
      server_ts_ms bigint,
      user_id_hash blob,
      dims map<text,text>,
      fraud_score float,
      PRIMARY KEY ((ad_id, event_day), event_id)
    ) WITH compaction = {'class':'TimeWindowCompactionStrategy', 'compaction_window_unit':'HOURS', 'compaction_window_size':1};
    
  • Partition key (ad_id, event_day) — bounds partition size, enables single-partition read for audit queries.
  • Clustering key event_id — dedup via INSERT with IF NOT EXISTS? No — too slow (Paxos). Instead write unconditional; downstream batch dedupes. Cassandra's LWW semantics are fine here because event_id + content are immutable once emitted.
  • Why Cassandra (not Bigtable, ScyllaDB, HBase, just-S3)?
    • LSM tree tuned for write-heavy (100K eps × RF=3 = 300K writes/s — Cassandra sustains this on ~9 nodes).
    • Per-partition row cache covers hot ad lookups in audit.
    • Multi-DC replication without external tooling — needed for v3 geo.
    • Rejected Bigtable: would work, locks us to GCP. If interview is Google, mention: "On GCP I'd use Bigtable with row key ad_id#rev(day)#event_id; the design is isomorphic."
    • Rejected just-S3: daily Parquet is our cold path (§5e). Cassandra is for the ~24h hot recent lookup where S3 latency (seconds) breaks audit UX.
    • Rejected ScyllaDB: better per-node throughput but smaller ecosystem; reasonable swap at scale.

5c. ClickHouse table clicks_agg_1m (pre-aggregated cube)

  • Role: Powers dashboard queries. Pre-aggregated to 1-minute tumbling windows, multiple dimensional rollups.
  • Schema:
    CREATE TABLE clicks_agg_1m (
      window_start DateTime,
      ad_id String,
      campaign_id String,
      country FixedString(2),
      device Enum8('mobile','desktop','tablet','tv','other'),
      placement String,
      clicks_count UInt64,
      unique_users_hll AggregateFunction(uniq, UInt64),
      fraud_flagged UInt64,
      ingest_version UInt32  -- for reconciliation; see §7c
    ) ENGINE = ReplacingMergeTree(ingest_version)
      PARTITION BY toDate(window_start)
      ORDER BY (ad_id, window_start, country, device);
    
  • Why ReplacingMergeTree? Enables idempotent upsert by Flink. Flink commits (window, ad_id, dims, count, version=chkpt_id); on retry same key, higher version wins on merge. Dedup happens lazily in background merges. This is the L7 answer to "how do you get exactly-once into ClickHouse despite it having no transactions" — see §7a.
  • Rollup variants: Also maintain clicks_agg_5m, clicks_agg_1h, clicks_agg_1d as materialized views → cheaper dashboard queries for longer ranges. Daily pre-agg serves 30-day trend without scanning 1m granularity.
  • Why ClickHouse (not Druid, Pinot, BigQuery, ES)?
    • Sub-second aggregations on pre-cubed data at our row scale (60 GB/day).
    • Columnar, vectorized — p99 <500ms for SELECT sum(count) GROUP BY country WHERE ad_id=X AND window BETWEEN ....
    • Rejected Druid: comparable; operationally heavier (6 process types vs ClickHouse's 2). Druid's real-time ingest with Kafka indexing service is strong — reasonable alternative, mentioned as "could swap."
    • Rejected Pinot: LinkedIn-origin, strong at LinkedIn scale; operationally similar to Druid. Slight edge on ingest freshness. Also reasonable.
    • Rejected BigQuery: query latency (1–3s) too slow for p99<2s dashboards with 80% cache miss; cost per TB scanned punishes frequent small queries. Great for batch analytics (§5e alternative).
    • Rejected Elasticsearch: not columnar, aggregations on billions of rows don't beat ClickHouse 5×.

5d. Redis dash:{metric_key} → value + top-K ZSET

  • Role: Hot cache for dashboard API, top-K materialized as sorted set.
  • Key design:
    • dash:ad:{ad_id}:1m:{window_start}{count, hll_bytes, fraud_count}, TTL 10m
    • topk:hashtag:5m:{window_start} → ZSET {member=hashtag, score=count}, TTL 15m
  • Populated by: Flink sink writes to both ClickHouse (source of truth) and Redis (cache). Cache miss falls back to ClickHouse.
  • Why Redis (not Memcached, no-cache)? Need ZSET for efficient top-K (ZRANGE 0 99 WITHSCORES in O(log N + K)); Memcached doesn't. Read-through from ClickHouse at 400 qps sustained would work without cache, but p99 suffers during Flink catch-up.

5e. S3 + Parquet data lake s3://clicks-lake/raw/dt=YYYY-MM-DD/hr=HH/

  • Role: Durable, immutable source of truth for billing reconciliation and arbitrary backfill.
  • Format: Parquet, partitioned by dt and hr, Snappy compression, row group 128 MB.
  • Schema: superset of Kafka proto, flattened.
  • Lifecycle: Hot 30d (SSD-backed S3 Standard), warm 90d (Standard-IA), cold 2y (Glacier).
  • Written by: Separate "archiver" Flink job reading Kafka with distinct consumer group, writing to S3 via Flink's StreamingFileSink with exactly-once via transactional rename. (Or, simpler: Kafka → MSK Connect → S3 Sink — but then exactly-once depends on sink config.)
  • Why Parquet on S3 (not HDFS, Iceberg-only, Hive)? Parquet columnar + predicate pushdown for billing queries ("all clicks for advertiser X on day D"). Add Apache Iceberg as table format layer — gives ACID table operations, schema evolution, time travel. v1 can be plain Parquet + Hive metastore; v2 upgrade to Iceberg — it's the direction. HDFS rejected: ops burden, no v3 multi-region story.

5f. Billing snapshot store (Postgres or DynamoDB)

  • Role: Final closed daily totals. Small, immutable-after-close, queried by invoicing system.
  • Rows: ~100K advertisers × 1 row/day = 36M rows/year — trivial.
  • Schema:
    daily_totals(tenant_id, date, status, raw_clicks, fraud_excluded, billable_clicks,
                 reconciliation_version, source_batch_run_id, closed_at, checksum)
    PRIMARY KEY (tenant_id, date, reconciliation_version)
    
  • Why Postgres (not Cassandra, DynamoDB)? Read-heavy, low write rate, need ACID for "close the day" transaction. Plus joins for operator UIs. DynamoDB also fine; Postgres wins on ecosystem (auditor SQL access, BI tools).

5g. Fraud feature store (Feast / Redis-backed)

  • Role: Stream-enriched features per (user_hash, ad_id, ip) — click velocity, geo, device fingerprint. Consumed by fraud scorer.
  • Storage: Redis hot (last 1h rolling counters via Flink), Cassandra warm (last 30d).
  • Why separate from main path? Fraud model iterates faster than billing pipeline. Isolation means retraining doesn't risk billing correctness.

Schema registry

  • Confluent Schema Registry or equivalent — proto-schema versioned, compatibility checks (BACKWARD_TRANSITIVE) on every producer deploy.
  • Why? Schema drift is a top-5 production incident source in streaming systems. L7 answer is always: schema is a contract, versioned, validated in CI.

6 System Diagram (ASCII) — Centerpiece #

Master architecture diagram

                                  ╔═══════════════════════════════════════╗
                                  ║         CLIENT TIER                   ║
                                  ║  Web SDK / Mobile SDK / Server SDK    ║
                                  ║  emits event_id (UUIDv4) + client_ts  ║
                                  ║  + HMAC signature                     ║
                                  ╚════════════════╦══════════════════════╝
                                                   │ HTTPS POST /v1/events
                                                   │ 500B/event × 100K/s = 50MB/s
                                                   │ (or direct Kafka producer)
                                                   ▼
  ┌─────────────────────────────────────────────────────────────────────────────┐
  │                             EDGE INGEST TIER                                │
  │  ┌────────────────┐  ┌────────────────┐  ┌────────────────┐                │
  │  │ Envoy + auth   │  │ Envoy + auth   │  │ Envoy + auth   │   global ALB   │
  │  └───────┬────────┘  └───────┬────────┘  └───────┬────────┘                │
  │          │                   │                   │                          │
  │          ▼                   ▼                   ▼                          │
  │  ┌────────────────────────────────────────────────────────┐                 │
  │  │  Ingest proxy (Go, stateless, horizontally scaled N=20)│                 │
  │  │   - HMAC verify                                         │                 │
  │  │   - schema validate (proto)                             │                 │
  │  │   - per-SDK rate limit (token bucket, Redis)            │                 │
  │  │   - LRU dedup cache (event_id, last 5 min, 10M entries) │                 │
  │  │       hit → 409, no Kafka write                         │                 │
  │  │   - assign server_ts_ms                                 │                 │
  │  │   - Kafka produce (acks=all, idempotent=true,           │                 │
  │  │       enable.idempotence=true, max.in.flight=5)         │                 │
  │  └───────────────────────────┬─────────────────────────────┘                 │
  └────────────────────────────────┼─────────────────────────────────────────────┘
                                   │ Kafka protocol, 50MB/s × RF=3 = 150MB/s
                                   │ transactional producer
                                   ▼
  ┌──────────────────────────────────────────────────────────────────────────────┐
  │                         KAFKA CLUSTER  (topic: clicks.raw.v1)                │
  │    ┌──────┐ ┌──────┐ ┌──────┐ ┌──────┐ ┌──────┐ ┌──────┐ ... 64 partitions  │
  │    │ P0   │ │ P1   │ │ P2   │ │ P3   │ │ P4   │ │ P5   │      RF=3, ISR=2   │
  │    └──────┘ └──────┘ └──────┘ └──────┘ └──────┘ └──────┘   zstd, 72h ret.   │
  │    Key: hash(ad_id) :: salt       Value: ClickEvent proto                   │
  └───────┬─────────────────────┬──────────────────────┬──────────────────────────┘
          │                     │                      │
          │ consumer group      │ consumer group       │ consumer group
          │ "dashboard-rt"      │ "billing-archiver"   │ "fraud-enrich"
          │ 64 parallelism      │ 16 parallelism       │ 32 parallelism
          ▼                     ▼                      ▼
  ┌───────────────────┐  ┌──────────────────┐   ┌──────────────────────┐
  │ SPEED LAYER       │  │ ARCHIVE PATH     │   │ FRAUD SCORER         │
  │ Flink Streaming   │  │ Flink → S3       │   │ Flink + ML model     │
  │                   │  │ StreamingFile    │   │  (features from §5g) │
  │ Operators:        │  │ Sink, Parquet,   │   │                      │
  │  ├ key_by(ad_id)  │  │ hourly rollover  │   │ emits fraud_score    │
  │  ├ assign_        │  │ EXACTLY-ONCE     │   │ back to enriched     │
  │  │   watermark    │  │ via 2PC rename   │   │ topic                │
  │  │   (max_lag=5m) │  │                  │   │ clicks.enriched.v1   │
  │  ├ tumble(1m)     │  │ writes:          │   └──────────┬───────────┘
  │  ├ aggregate:     │  │  s3://.../       │              │
  │  │   count, HLL,  │  │   dt=YYYY-MM-DD/ │              ▼
  │  │   fraud_flag   │  │   hr=HH/         │        ┌─────────────┐
  │  ├ allowed_       │  │   part-*.parquet │        │ Redis: per- │
  │  │   lateness=10m │  └────────┬─────────┘        │ user/IP     │
  │  ├ side_output:   │           │                  │ velocity    │
  │  │   late events  │           │                  │ counters    │
  │  │   → Kafka      │           │                  └─────────────┘
  │  │   "late-clicks"│           │
  │  └ 2PC sink to    │           │
  │    ClickHouse +   │           │
  │    Redis          │           │
  │                   │           │
  │ Checkpoint: S3    │           │
  │ every 60s, 10GB   │           │
  │ RocksDB backend   │           │
  └─────┬──────┬──────┘           │
        │      │                  │
        ▼      ▼                  │
  ┌─────────────────┐             │
  │  CLICKHOUSE     │             │
  │  ReplacingMerge │             │
  │  clicks_agg_1m  │             │
  │  + MV to 5m,1h, │             │
  │    1d           │             │
  │                 │             │
  │  4 nodes, RF=2  │             │
  │  60GB/day hot   │             │
  └────────┬────────┘             │
           │                      │
           ▼                      │
  ┌─────────────────┐             │
  │ REDIS CACHE     │             │
  │ dash:*, topk:*  │             │
  │ TTL 10–15 min   │             │
  └────────┬────────┘             │
           │                      │
           ▼                      ▼
  ┌────────────────────────┐  ┌─────────────────────────────────────────────┐
  │ DASHBOARD API (Go)     │  │           BATCH LAYER (Spark / Trino)       │
  │ Kubernetes HPA N=10    │  │                                             │
  │ GET /v1/metrics/...    │  │  Daily reconciler (Spark on EMR):           │
  │ GET /v1/trending       │  │   1. Read s3://clicks-lake/dt=D-1           │
  │ p99<2s                 │  │   2. Dedup by event_id (window=7d)          │
  │                        │  │   3. Join fraud scores (final, not stream)  │
  │ returns source +       │  │   4. Aggregate per (tenant, day, ad_id)     │
  │ data_freshness_ms      │  │   5. Diff vs ClickHouse speed-layer totals  │
  └────────────────────────┘  │   6. Alert if drift > 0.01%                 │
                              │   7. Write CLOSED totals to Postgres        │
                              │   8. Write audit log to separate S3 bucket  │
                              └────────┬────────────────────────────────────┘
                                       │
                                       ▼
                              ┌────────────────────────┐
                              │ BILLING POSTGRES       │
                              │ daily_totals           │
                              │ (immutable once CLOSED)│
                              └────────┬───────────────┘
                                       │
                                       ▼
                              ┌────────────────────────┐
                              │ BILLING API            │
                              │ GET /v1/billing/...    │
                              │ 99.99% SLO             │
                              └────────────────────────┘

Sub-diagram A: Edge dedup (LRU vs distributed)

  Event arrives at ingest proxy
           │
           ▼
  ┌────────────────────────────────┐
  │ Local LRU (per proxy pod)      │    catches retries from same client
  │ 10M entries × (16B id + 8B ts) │    going to same pod (sticky routing
  │ ≈ 240 MB                       │    not guaranteed → high miss rate
  │ hit rate ~60% (sticky routing  │    without it; accept 40% downstream)
  │  not enforced — can't reliably │
  │  shard by event_id because     │
  │  clients don't know to do so)  │
  └────────┬───────────────────────┘
           │ miss (40%)
           ▼
  ┌────────────────────────────────┐
  │ OPTIONAL: Redis bloom filter   │    would catch cross-pod dups
  │ shared, 100M entries,          │    COST: extra RTT (~1ms p50, ~5ms p99)
  │ bloom fp=0.1% → ~150MB         │    → ingest p99 +5ms
  │                                │    VERDICT: skip at ingest, rely on
  │                                │    Flink dedup (§7a). Edge LRU is only
  │                                │    a load-shedding optimization for
  │                                │    client-retry storms, not correctness.
  └────────┬───────────────────────┘
           │
           ▼
  Kafka produce (idempotent by producer_id + sequence)
           │
           ▼
  Flink dedup via KeyedState<event_id, bool> TTL=24h
  (correctness dedup — edge is just optimization)

Sub-diagram B: Watermark semantics and allowed lateness

 Event-time axis  ───┬───┬───┬───┬───┬───┬───┬───►
                     T0  T1  T2  T3  T4  T5  T6
                         ▲watermark(T0)     ▲current watermark(T4)
                             window [T1,T2)
                             window [T2,T3) ← fires when watermark reaches T3

  Watermark strategy:
    assignTimestampsAndWatermarks(
       WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofMinutes(5))
         .withTimestampAssigner((e,ts) -> e.client_ts_ms)
         .withIdleness(Duration.ofMinutes(1))     // handle empty partitions
    )

  Window: TumblingEventTimeWindows.of(1 min)
  AllowedLateness: 10 min
  SideOutputLateData: tag "late-clicks" → Kafka "clicks.late.v1"

  Consequence chain:
   T = event's client_ts
   W(t) = watermark at processing-time t = max(client_ts seen)−5min
   Window [A, A+1min) fires at W = A+1min, accepts late events until W = A+1min+10min,
     after which late events go to side output and flow to batch layer ONLY.

   Dashboard shows "count for window W" that MAY update within 10min after W closes
     → dashboard query response includes is_provisional:true until W+10min.

   Billing: never reads speed-layer totals directly. Reads s3://clicks-lake/dt=D-1
     which contains ALL events (including late) replayed with event-time bucketing.
     → Billing bucketing depends on client_ts; speed layer matches via allowed_lateness;
       beyond 10min late, only batch sees it.
       → drift between speed and batch totals bounded by: events arriving >10min late
         typically <0.05% at 95% confidence for web, ~0.5% for mobile (poor connectivity).
         → drives our reconciliation SLA of <0.01% for billing (batch is ground truth).

Sub-diagram C: Reconciliation flow

  ┌────────────┐                                       ┌─────────────────┐
  │ Flink RT   │ writes clicks_agg_1m (ClickHouse)     │ Spark Batch     │
  │ speed      │  + ingest_version=chkpt_id            │ daily reconciler│
  │            │                                       │                 │
  └─────┬──────┘                                       └────────┬────────┘
        │                                                        │
        │   publishes speed-layer totals:                         │
        │    POST /internal/reconcile/speed                       │
        │    { date: D, ad_id: X, speed_total: N }                │
        │                                                        │
        └──────────────┬─────────────────────────────────────────┘
                       ▼
              ┌─────────────────────┐
              │ RECONCILER          │
              │  1. Batch total B   │
              │  2. Speed total S   │
              │  3. Diff = |B − S|  │
              │  4. If Diff/B ≤ ε   │
              │     (ε = 0.01%)     │
              │     → CLOSE         │
              │     else → INVEST.  │
              │  5. Write CLOSED    │
              │     to Postgres     │
              │  6. Replace         │
              │     speed-layer     │
              │     numbers in      │
              │     ClickHouse with │
              │     batch numbers   │
              │     (new ingest_ver)│
              │     → next dashboard│
              │     query gets      │
              │     reconciled      │
              │     value (speed    │
              │     becomes batch)  │
              └──────────┬──────────┘
                         ▼
                ┌────────────────┐
                │ Billing POST-  │
                │ gres CLOSED    │
                └────────────────┘
                         │
                         ▼
                ┌────────────────┐
                │ Invoicing sys  │
                └────────────────┘

L7 detail in reconciliation: The ClickHouse numbers get overwritten by batch once reconciled — that is, the speed layer's purpose expires at T+24h. Dashboards reading a 2-day-old window see the reconciled number. ReplacingMergeTree with ingest_version=batch_run_id (strictly greater than any stream checkpoint id) ensures merges prefer batch. This is Lambda done right.


7 Deep-Dive (3 critical topics at L7 depth) #

7a. Exactly-once semantics: the guarantee gradient

Why critical

The whole design collapses if we lose or double-count events. "Exactly-once" as a marketing term is a lie — what exists is a guarantee gradient enforced by each hop having an idempotency key + either transactional commit or idempotent sink. Miss one link → silent duplication or loss. At 100K eps, a 0.01% dup rate = 10 dups/sec = 864K/day of bogus billable clicks. That's the L7 trap to avoid.

Alternatives considered

Approach End-to-end guarantee Cost Failure mode
At-most-once Events lost on any failure Cheapest Under-billing, angry advertisers
At-least-once + no dedup Dups guaranteed Cheap Over-billing, regulatory risk
At-least-once + downstream dedup Exactly-once effective Medium (dedup state) Dedup state TTL bug → dup leak
Transactional end-to-end (Flink 2PC + Kafka txns + transactional sink) True exactly-once at transaction boundary High (coordination, throughput hit) Sink must support XA / atomic commit
Chosen: Hybrid per-hop Exactly-once effective Medium See failure modes below

Chosen approach — the guarantee chain

Hop 1: Client → Edge. Client generates event_id (UUIDv4) before first attempt. Retries use same id. Edge LRU gives 60% catch; Flink state gives 100%.

Hop 2: Edge → Kafka. Producer config:

enable.idempotence = true        # producer_id + sequence per partition
acks = all
max.in.flight.requests.per.connection = 5   # safe with idempotence
retries = INT_MAX
delivery.timeout.ms = 120000

→ Kafka broker-level dedup via producer_id + sequence. On producer restart, producer.transactionalId retains identity.

Hop 3: Kafka → Flink. FlinkKafkaConsumer with isolation.level=read_committed, offsets checkpointed with Flink state. On recovery, resume from last checkpoint.

Hop 4: Flink processing. KeyedState for dedup: ValueState<Boolean> seen keyed by event_id, TTL 24h (RocksDB backend). New event → check state, if seen → drop + metric. State TTL is correctness-critical.

Hop 5: Flink → ClickHouse. ClickHouse has no XA/transactions. Solution: ReplacingMergeTree + idempotent event_id upsert pattern.

  • Flink sink writes aggregates keyed by (ad_id, window_start, ingest_version=checkpoint_id).
  • On failure, Flink replays from last checkpoint, re-writes same key with same version.
  • ReplacingMergeTree collapses duplicates in background merges (eventually consistent — this is the subtle bit).
  • Failure mode: Dashboard reads before merge completes see both versions → over-count.
  • Fix: Dashboard queries use FINAL modifier on ReplacingMergeTree: SELECT sum(count) FROM clicks_agg_1m FINAL WHERE ... — forces merge-on-read. Costs ~2× query time. Acceptable at our qps. Without this, silent duplicates show up in daily totals at ~0.01% rate — enough to void billing audit.
  • Alternative we considered: Switch to ClickHouse CollapsingMergeTree with sign=-1/+1 for retractions. More complex for writers; harder to reason. Stuck with ReplacingMergeTree + FINAL.

Hop 6: Flink → S3 (archive path). StreamingFileSink with exactly-once via two-phase commit on rename. Flink writes .inprogress files, on checkpoint renames to final path atomically. On recovery, unrenamed files discarded. This IS true 2PC because rename is atomic in S3 (object-level) + Flink has the filename inventory.

Hop 7: Flink → Kafka (enriched topic for fraud). FlinkKafkaProducer with Semantic.EXACTLY_ONCE — 2PC via Kafka transactional producer. Transaction ids scoped per Flink subtask.

End-to-end guarantee summary

Hop Mechanism Guarantee
Client → Edge Idempotency-Key header At-least-once (client retry)
Edge dedup LRU Opportunistic
Edge → Kafka Idempotent producer Exactly-once per partition
Kafka → Flink Checkpointed offsets Exactly-once on consume
Flink dedup KeyedState TTL 24h Drops dups within window
Flink → ClickHouse Idempotent key + ReplacingMergeTree + FINAL Eventually exactly-once-on-read
Flink → S3 2PC rename Exactly-once
Flink → Kafka enriched Kafka txns Exactly-once
Batch reconciler event_id dedup + 7d window Exactly-once for billing

The L7 answer: Billing never trusts speed layer. It trusts S3 (exactly-once archived) + batch dedup (7d window covers all late arrivals). Speed layer is best-effort-correct within ε; batch is audit-correct. The pipeline is two gradients stitched at reconciliation.

Failure modes

  1. Flink state backend corruption → restore from last checkpoint, replay ≥1min of Kafka, dedup KeyedState catches dups.
  2. Kafka producer_id rotation (app restart with lost transactional.id) → new producer_id, old epoch invalidated, but dedup window in Flink still catches; event_id idempotency is the true anchor.
  3. Checkpoint corruption → Flink retries previous successful checkpoint, re-processes gap — safe because downstream sinks are idempotent.
  4. Clock skew >5min at client → events bucket into wrong event-time window; allowed_lateness covers 10min; beyond that, only batch layer corrects.
  5. Dedup state TTL too short (<24h) → client retries from 25h ago double-count in stream. Batch catches. Bump TTL if needed, but watch state size.
  6. ClickHouse replica desync during merge → query routing must pin to primary or use FINAL; else different replicas return different counts during merge window.

Real systems that do this

  • Google Ads (internal): F1/Spanner-backed billing ledger with idempotent upserts keyed by click_id; dataflow pipelines for real-time dashboards; truth lives in batch.
  • Facebook Scuba: approximate, speed-layer-only analytics; explicitly does not claim exactness — shows is_sampled in every query.
  • Kafka Streams: alternative to Flink with similar semantics; weaker on windowing ergonomics, stronger on op simplicity. Rejected for this problem because Flink's event-time + allowed-lateness + side-outputs are more ergonomic for our watermark story.
  • Flink @ Alibaba Singles' Day: ran peak ~1 billion events/sec with exactly-once end-to-end — proof the pattern scales. Their case study describes ReplacingMergeTree + idempotent upsert into HBase/ClickHouse variants.

7b. Hot-key partitioning (celebrity ad / viral hashtag)

Why critical

A single celebrity ad with 10× traffic collapses one partition. Kafka's ordering guarantee is per-partition, so you can't just scale by adding partitions — existing keys stick to their hash. At 100K eps peak with one hot ad at 10K/sec, that one partition saturates at 5K/sec nominal → broker CPU pegged, consumer lag grows, dashboards stale, billing backs up.

Alternatives considered

Approach Throughput gain Ordering Complexity
Static partitioning by ad_id (baseline) Per-ad ordering Trivial
Increase partition count broadly Linear but doesn't fix skew Same Low
Consistent hashing with virtual nodes Smoother distribution, still sticks hot key to one node Per-key Low
Key salting (random prefix) Linear for hot keys Lost per-ad ordering Medium
Two-stage aggregation (salt then reduce) Linear Per-ad at global reduce Medium-high
Chosen: Adaptive key salting + two-stage aggregation for top-K hot keys Linear Per-ad restored at global stage Medium

Chosen approach

Detect hot keys dynamically. A monitor job reads Kafka metrics + Flink operator state, identifies keys with >5% of partition throughput as "hot." List pushed to producers via config service (Zookeeper / etcd).

Salt at producer. For hot keys, producer prepends salt 0..N-1 (N = degree of hot-key split, start at 16):

effective_key = (ad_id, salt)   where salt = random(0,N) if ad_id ∈ hot_keys else 0

Kafka partition = hash(effective_key) % 64 → hot ad distributes across N partitions instead of 1.

Two-stage aggregation in Flink.

stream
  .keyBy(e -> e.ad_id + "#" + salt(e))        // stage 1: salted key
  .window(Tumble 1m)
  .aggregate(local_count)                     // partial sums
  .keyBy(e -> e.ad_id)                        // stage 2: restore true key
  .window(Tumble 1m)
  .reduce((a,b) -> a+b)                       // global sum

Stage 1 runs on N×original parallelism (e.g., 16 slots for a 16-way split); Stage 2 runs on fewer slots but with small data (already pre-aggregated — ~N rows per ad per window).

Top-K with hot keys. Same two-stage pattern:

  • Stage 1: each partition maintains local min-heap of top-K×2 (over-sample to catch near-boundary entries).
  • Stage 2: global reduce maintains true top-K by merging heaps.
  • Why over-sample 2×? An entity that's #101 locally everywhere might be #50 globally if it's evenly distributed; top-K×1 local would miss it. K×2 covers 99th-percentile skew cases; K×10 covers everything but wastes bandwidth.

Alternative considered: Count-Min Sketch + heavy hitters. Instead of heap, use CMS with "heavy hitters" algorithm (space-saving, frequent). Bounded memory (e.g., 10K buckets → ~80 KB per partition), approximate counts.

  • Trade-off: CMS has frequency overestimation (never underestimates) with bound ε·N with probability 1-δ. For top-K with N=1B events and ε=0.001, w=2719 buckets, d=7 hash fns, total memory ~150KB. Top-K accuracy >99% for heavy items.
  • Why not use CMS as primary? Top-K heap is already cheap (K≤1000 entries). CMS shines when cardinality is hundreds of millions and exact counts matter less than ranking. We use CMS as a backup for extreme cardinality (trending hashtags with millions of distinct tags), not for ad_ids (~1M cardinality, heap fits).

Back-pressure signals. Flink emits backpressure if downstream slow. Producer SDK watches Kafka's per-partition throttle metric + consumer lag exposed via control topic → throttles hot-key emission at source when lag >30s. Shed load on producer before broker dies.

Failure modes

  1. Hot-key detector lags (new celebrity not yet flagged) → few seconds of saturation; Kafka absorbs via disk buffer, Flink catches up. Auto-heal.
  2. Salt cardinality too high (N=64) → stage 2 reduce explodes for cold keys that got over-sharded. Keep N small (8-16) and per-key.
  3. Stage 2 becomes bottleneck → if one ad is >50% of all traffic (rare; big brand Superbowl ad), stage-2 reducer for that key is single-threaded. Mitigation: hierarchical tree-reduce, 3 stages.
  4. Order lost within salted partitions → acceptable for count aggregations (commutative); problematic for sequence-sensitive logic (e.g., session windows by user). Fix: for session windows, key by user_id not ad_id, orthogonal partition scheme.

Real systems

  • LinkedIn Brooklin + Samza documented the salt-then-reduce pattern at scale.
  • Flink @ Pinterest for trending pins: explicit two-stage aggregation with hot-key sampling.
  • Google's Dataflow recommends the same; Beam's Combine.perKey does hot-key detection in their HotKeyLogger and suggests withHotKeyFanout() API.

7c. Reconciliation: when to trust the speed layer, when to fall back to batch (Lambda vs Kappa)

Why critical

Lambda costs 2× (two pipelines, two codebases, two operational burdens). Kappa is seductive ("just one pipeline, replay Kafka for corrections"). But choosing wrong for billing = audit failure. L6 answer: "Use Kappa, simpler." L7 answer: why that's a trap here.

Alternatives

Architecture Billing correctness Operational cost Use case
Lambda (speed + batch) Batch is ground truth; speed approximate 2× code, 2× pipelines When billing-grade & dashboards share source but differ in SLA
Kappa (stream-only, replay from Kafka) Correct IF stream is exact + Kafka retains forever 1× code, but long Kafka retention When stream can be made exactly-once end-to-end
Kappa+ / Kappa with versioned re-processing Stream-only, but re-process from data lake 1× code, data lake as log Modernized Kappa, emerging
Chosen: Lambda in v1, convergence toward Kappa+ in v3 Both Decreasing over time See reasoning

Why Lambda for v1

  1. Billing TTL conflict. Kappa requires Kafka retention ≥ the longest possible correction window. Legal/audit requires 7-year retention. Kafka at 3TB/day × 7yrs = 7.7 PB cluster. Possible but insane cost. S3 Parquet at 95GB/day × 7yrs = ~240 TB + Glacier pricing — 50× cheaper.

  2. Fraud model iteration. Fraud scoring changes monthly. Replaying last 30 days through stream every month burns compute. Batch replays a daily slice on-demand, order of magnitude cheaper.

  3. Different SLAs. Billing's 24h SLA means we can wait; dashboards' 10s means we can't. Decoupling means billing pipeline outages don't dark dashboards and vice versa.

  4. Different correctness bars. Speed layer accepts HLL (unique users approx) because dashboards show ±1%. Billing needs exact unique-user counts for per-user frequency cap enforcement — can't HLL.

Why not pure batch (no streaming)

Dashboards would be stuck on T-1h staleness minimum — operator can't fight a bot storm with data that's an hour old.

Convergence to Kappa+ in v3

As the ecosystem matures:

  • Iceberg / Delta Lake as unified log. Both speed and batch read the same source — a transactional data lake. Flink writes, Spark reads, same table, same schema, no drift.
  • Flink as the only compute engine. Streaming jobs + batch jobs in the same code via Flink's unified Table API; RUNTIME_MODE=BATCH for daily reconciler.
  • Then: reconciler is not "compare two codebases" but "re-run the same Flink job in batch mode, diff vs streaming output, alert on drift."

This halves the codebase surface area. V3.

The reconciliation mechanics (L7 detail)

Speed layer writes to ClickHouse with ingest_version = checkpoint_id (monotonically increasing). When batch reconciliation runs at T+6h (next day 06:00 UTC for previous day's data):

  1. Spark reads Parquet for day D.
  2. Dedups by event_id (window = D-7d to D+1d to cover late arrivals).
  3. Applies final fraud scores.
  4. Aggregates per (ad_id, window_1m, dims).
  5. For each (ad_id, window) computes batch_count and compares against speed_count from ClickHouse.
  6. If |batch_count − speed_count| / batch_count > 0.01%:
    • Flag for investigation.
    • Common causes: late events beyond allowed_lateness, fraud re-scoring, schema bug.
  7. Writes to ClickHouse: INSERT INTO clicks_agg_1m (..., ingest_version=batch_run_id) where batch_run_id = 10^15 + day_epoch (always greater than any checkpoint_id).
  8. ReplacingMergeTree takes max(ingest_version) → next merge collapses speed-layer row, batch row wins.
  9. Dashboard SELECT ... FINAL reads reconciled value.
  10. Writes CLOSED total to Postgres.

Drift budget & investigation

Drift Interpretation Action
<0.001% Normal (floating-point, late tail) Log only
0.001–0.01% Monitor, likely late events Alert info
0.01–0.1% Yellow Auto-investigate: is reason in known list?
>0.1% Red Page on-call, hold billing close
>1% Critical Rollback reconciliation, escalate

Known causes list (runbook):

  • Fraud model retrained → expected delta
  • Schema migration → version mismatch
  • Kafka truncation past 72h (extremely long Flink outage)
  • Time zone / DST edge case

Failure modes

  1. Batch job fails overnight → billing stays OPEN until reconciled. Paging. v3 mitigation: batch redundancy in second region.
  2. Fraud model change produces large delta → reconciliation flags, but "expected." Runbook allows operator to accept after review.
  3. Silent schema drift → speed layer version A, batch version B, counts diverge consistently. Schema registry enforcement + CI catch this.

Real systems

  • Google Ads: two-path reconciliation with batch (MillWheel → Dataflow) as source of truth, speed layer as serving cache. Their "guaranteed delivery" model.
  • Meta's Scuba: explicit speed-layer-only, no reconciliation — different product (operator analytics, not billing).
  • Netflix Keystone: Kafka + Flink for Lambda until 2019, then migrated to Iceberg-backed Kappa+. Case study confirms the v1→v3 arc.
  • Uber's Hudi: similar convergence, used Hudi as the transactional log.
  • Kappa Architecture (Kreps, 2014 post): original proposal. Notable that Kreps now acknowledges billing/compliance workloads as Lambda's persistent niche.

8 Failure Modes & Resilience #

Per-component failure matrix. SRE pager-runbook style.

Component Failure Detection Blast radius Mitigation Recovery
Client SDK Network loss SDK error metrics Single client session Local batch + retry w/ backoff, persist to disk queue Replay on reconnect via same event_id
Ingest proxy pod Crash / OOM K8s liveness, prometheus alerts ~5% of traffic (load balanced), recovery <30s Stateless, N+2 pods, HPA, memory limits, LRU bounded K8s restart, traffic rebalance
Ingest proxy total outage Region down ALB health All new events HTTPS returns 503; client SDK persists locally Clients replay when region up; event_id catches re-sends
Kafka broker loss Disk fail / node crash Broker metrics, URP 1/3 of write path per-partition RF=3 + min.insync.replicas=2 → writes continue on 2 of 3 URP replica catches up or replaced; rebalance
Kafka all-broker outage AZ failure Cluster metrics All new events (regional) Multi-AZ broker placement; producer buffer (128MB / pod) absorbs 30–60s v3: second region failover
Kafka disk full Retention mis-set Disk alerts Eventual produce failures Alert at 70%; retention tuned Expand disk or shorten retention
Flink TaskManager crash JVM OOM, node fail JM detects, Prometheus 1/12 of streaming throughput Checkpoint every 60s; TM restart re-reads from last checkpoint <5 min; side effect: dup in state TTL — dedup catches
Flink JobManager crash HA ZK detects Whole streaming job stalls JM HA (2 JMs + ZK); checkpoints persisted to S3 <2 min failover, job resumes from checkpoint
Flink checkpoint corruption Storage fail Checkpoint metrics Can't recover from latest Retain N=5 checkpoints; fall back to N-1 Small data re-processed; idempotent sinks swallow
Flink watermark stuck (idle partition) Watermark metric Monitoring Window never fires withIdleness on watermark strategy handles empty partition; alert if stuck >5min Kick consumer; upstream producer issue
ClickHouse replica out Replica health Keeper + metrics Queries degrade RF=2; queries auto-route; writes buffer via Distributed engine Auto-heal via replication; <10 min if disk intact
ClickHouse merge backlog Background merge slow system.parts Dashboard reads see dupes FINAL on query forces merge-on-read; raise merge threads Bump cluster; shard by day partition
ClickHouse write amplification Too many small parts system.merges Ingest bottleneck Flink batches rows (100K / 1s whichever first) before sink Tune sink; insert buffer engine
Redis cache outage Redis sentinel Dashboard metrics Cache miss → more ClickHouse load (5×) Dashboard API tolerates cache miss; graceful degradation Redis auto-failover; rebuild cache lazy
S3 write fail SDK error Flink sink metrics Batch layer input lost Retries with backoff; Flink fails task, replays from checkpoint Few minutes; at-least-once via checkpointed offsets + idempotent commit
Spark batch reconciler fail Airflow task Alert Billing stays OPEN, dashboards OK Retry with smaller date range; fallback to previous day On-call page; manual replay
Postgres billing fail Replica lag / down pg_stat Billing reads degraded Read replicas; sync writes via RDS Multi-AZ Failover <2 min
Schema drift (proto field added non-backward-compat) Deserialize errors in Flink Metric + DLQ New events dropped Schema registry BACKWARD_TRANSITIVE; CI gate; DLQ for rescue Rollback producer; rescue DLQ events
Bot storm / DDoS Ingest qps spike 10× Rate metrics Real events drowned Per-SDK-token rate limit at edge; fraud scorer flags; billing excludes Deploy block-list; advertiser refund if needed
Late-arriving events (>10min) Side-output metric Speed layer misses them Side output → separate Kafka topic → batch layer picks up Batch reconciliation trues up at T+24h
Clock skew in clients (>5min) Watermark regression Event-time vs server-time delta Windows mis-bucket Watermark uses max not min; server_ts fallback for billing Clamp future timestamps; fraud flag
Celebrity hot key Partition lag metric One partition saturates Salt + two-stage agg (§7b) Auto-detect + deploy salt list
"Thundering herd" after outage recovery Clients all replay buffered events Edge qps spike Ingest saturates Edge rate limit + Kafka producer backpressure; clients use jittered retry Ingest scales via HPA; absorb via Kafka buffer
Data loss from simultaneous double failure N-2 critical infra SEV page Potentially events lost RF=3 + producer buffers + client disk persistence = 3 layers of redundancy Root-cause analysis; client replay for anything with event_id
Double-counting due to reconciliation version bug Drift alarm Reconciler diff > threshold Billing over-charges Drift budget alarms; manual gate before CLOSE Rollback reconciliation_version, re-run
GDPR erasure request User request Ticketing integration Must purge raw + pre-agg Tombstone user_hash in raw Cassandra; replay batch with filter; update pre-agg See §10; takes ~24h to propagate

Cross-cutting resilience patterns

  • Every sink is idempotent (event_id, ReplacingMergeTree, S3 rename, Kafka producer_id).
  • No component blocks another for >30s (buffers, queues, back-pressure).
  • Every topic has a DLQ (clicks.dlq.v1 for deserialize failures; clicks.late.v1 for lateness; separate for auditability).
  • Observability: every event passes through 4 metric emission points (edge, Kafka, Flink, sink). Count invariant count_at_edge - count_at_sink ≈ 0 (within dedup-dropped budget) is the primary alarm.
  • Exactly-once counter invariant alarm: total_accepted - total_sink_written - total_dedup_dropped - total_late_sidelined = 0 ± ε. Tripped → page on-call.

9 Evolution Path #

v1 — "Get to 100K eps with billing-safe semantics"

Timeline: 0–6 months. Scope:

  • Single region, single Kafka cluster, single Flink cluster.
  • Kafka + Flink + ClickHouse speed layer.
  • Spark daily reconciler to Postgres.
  • S3 Parquet archive.
  • Fraud scorer as tag-only (doesn't filter in stream).
  • Dashboards show is_approximate + data_freshness.
  • Basic observability: Kafka lag, Flink checkpoint, ClickHouse merge backlog.

Unlocked: billing-grade daily totals, near-real-time dashboards, basic top-K trending.

Open risks: single-region blast radius; fraud is eventually-consistent with billing (maybe under-detects on billing close day).

v2 — "Harden billing + accelerate trending + multi-tenant"

Timeline: 6–12 months. Added:

  • Fraud scorer inline filtering for severe-fraud signals (bot farms with >1000 clicks/min/IP auto-dropped at edge).
  • Dimensional top-K (per-country, per-device) via additional pre-aggregated cubes.
  • Per-tenant quota enforcement — each advertiser has ingest rate limit at edge, prevents noisy neighbor.
  • Schema registry + CI gate for proto evolution.
  • Batch reconciliation dashboard — SRE visibility into drift per advertiser per day.
  • Iceberg adoption replacing raw Parquet for ACID operations.
  • Synthetic canary events continuous — inject known event_id/count pairs, verify end-to-end count match within 10s.
  • Audit trail S3 bucket with immutable object-lock for billing evidence.

Unlocked: SOX compliance, advertiser SLAs signed, trending dashboard per-country.

v3 — "Multi-region active-active + Kappa+ convergence"

Timeline: 12–24 months. Added:

  • Geo-DNS + regional Kafka clusters — clients route to nearest region.
  • Cross-region Kafka mirror (MirrorMaker 2) for global aggregates; per-region billing for data-residency compliance.
  • Flink running globally via Flink K8s Operator; deduplication state spans regions via shared Redis+Kafka control topic for global event_id seen-set.
  • Kappa+ via Iceberg unified log — Spark batch reconciler rewritten as Flink BATCH_MODE job, same code as streaming; reconciliation becomes "re-run streaming job in batch mode."
  • Hard tenant isolation for top-100 advertisers — dedicated Kafka topics, dedicated Flink slots, per-tenant ClickHouse shards.
  • Per-tenant encryption keys (crypto-erasure path for GDPR: destroy key = erase all data for tenant, no scan needed).
  • ML-based fraud with feature store at scale (Feast, Redis feature serving).
  • Active-active dashboards with read-from-nearest-region; billing still centralized for auditor simplicity.

Unlocked: EU data residency, 99.99% ingest availability (regional outage = <1hr traffic shift), halved codebase (Kappa+).

v4 — speculative

  • Fully serverless (Flink → managed Kinesis Data Analytics / Dataflow).
  • Homomorphic aggregation for advertiser isolation (privacy-preserving analytics).
  • Federated dashboards (advertisers serve own data, we aggregate across).

10 Out-of-1-Hour Notes #

10a. Sketch math recap

HyperLogLog (unique users):

  • Standard error ≈ 1.04 / √m, where m = 2^p buckets.
  • p=14 (16384 buckets) → error ≈ 0.81%, memory ≈ 13 KB per HLL.
  • Per (ad_id, window_1m, country): 100K active ads × 1440 min/day × ~10 countries × 13 KB ≈ 18 TB/day in uncompressed state. Too much. → use HLL++ sparse representation (small n = small memory) and HLL union-on-query only for large buckets. Or roll up hourly: ~300 GB/day.

Count-Min Sketch (top-K, frequency):

  • Dimensions: w (columns), d (rows = hash fns).
  • Overestimate bound: ε · N with probability 1 − δ.
  • ε = e/w, δ = 1/e^d.
  • For N=1B/day events, ε=0.001 (1M overestimate OK for ranking), δ=0.01:
    • w = ⌈e / ε⌉ = ⌈2.718 / 0.001⌉ = 2719
    • d = ⌈ln(1/δ)⌉ = ⌈ln(100)⌉ = 5
    • Memory: 2719 × 5 × 4B (uint32) = 54 KB — trivial.
  • Combined with heavy-hitters heap (Misra-Gries / Space-Saving) → accurate top-K with bounded memory.

Bloom filter (edge dedup):

  • Optimal bits/element m/n = 1.44 · log2(1/fp).
  • For fp=0.001, m/n ≈ 14.4 bits. For 100M elements → 180 MB + k=10 hashes.
  • Rejected in favor of LRU because LRU gives definite answer (no fps) and dedup doesn't need to be exact at edge anyway.

10b. Schema registry & evolution

  • Proto schema versioned in git, pushed to Confluent Schema Registry on deploy.
  • Compatibility mode: BACKWARD_TRANSITIVE — new reader must read all old schemas; new writer must be readable by the oldest deployed reader.
  • Breaking changes workflow: bump topic version (clicks.raw.v1clicks.raw.v2), dual-write during migration (N days), cut readers over, decommission v1. This is expensive; use only for true breaking changes (field semantics change).
  • Common non-breaking changes: add optional field (default null), rename via json_name annotation, deprecate field but retain tag.
  • Flink schema evolution: Flink state backends (RocksDB) support state schema evolution for Avro/POJO; proto requires custom serializer. Keep value format as bytes in state, re-parse on read.

10c. GDPR right-to-erasure

  • The hard part: pre-aggregated cubes contain user contribution without user id. Retracting one user's events from a count requires reverse-aggregation.
  • Solution:
    1. Tombstone user_hash in raw Cassandra + S3 Parquet (write deletion_request record).
    2. Next daily batch reconciler re-aggregates from scratch, excluding tombstoned users.
    3. New pre-agg cubes via ReplacingMergeTree with higher ingest_version → supersede old.
    4. Old cubes purged on retention.
  • For speed layer: best-effort. Dashboards stale for up to 24h on erasure. GDPR deadline is 30 days.
  • Crypto-erasure alternative (v3): each user has a per-user key; raw events encrypted with it. Erasure = destroy key. Instant, but more complex — requires Flink to decrypt on read + re-encrypt on write (cost), and breaks HLL because we can't union encrypted blobs.

10d. Fraud / invalid-click separation (vs billing)

  • Tag in stream, subtract in batch. Inline dropping is too risky (false positives void good billings).
  • Fraud categories (industry standard from IAB IVT spec): SIVT (sophisticated invalid traffic — bot farms), GIVT (general invalid — crawlers), valid.
  • Billing excludes SIVT + GIVT. Dashboards show both gross and net.
  • Advertiser dispute flow: if advertiser disputes fraud classification, new reconciliation_version re-runs with different fraud model version; billing amended.

10e. Tail-end corrections flow

  • Events arriving 1h–24h late: picked up by batch layer only, never reach speed layer. Billing catches them on T+1 close.
  • Events arriving 1d–7d late (mobile app offline, rare): batch layer dedup window is 7d — included in next day's re-aggregation.
  • Events arriving >7d late: dropped with metric. Report to fraud/auditing team.

10f. Billing audit trail

  • Immutable audit log in S3 with Object Lock + MFA delete. Every CLOSED total's source_batch_run_id points to the exact Parquet files + Spark job run log.
  • Auditor query path: given a billing total, reconstruct the contributing events. Path: daily_totalssource_batch_run_id → Spark job manifest → S3 Parquet files → original event_ids. Time: ~1h for a query.
  • Auditor trust criterion: given event_ids, verify against raw Kafka archive (if within 72h) or S3 (always). The chain must be cryptographically attestable (checksum at each stage).

10g. Observability (the list SRE asks for on day 1)

Ingest-side alarms:

  • Edge qps (global + per-tenant).
  • Kafka producer error rate, produce latency p99.
  • Schema validation failure rate.
  • HMAC signature failure rate.

Pipeline-side alarms:

  • Kafka consumer lag per group, per partition — primary staleness signal.
  • Flink watermark skew — max watermark across operators minus min. >5min = stuck.
  • Flink checkpoint duration / failure rate.
  • Flink operator backpressure ratio.
  • Flink state size growth rate — state backend OOM predictor.

Sink-side alarms:

  • ClickHouse merge backlog (parts count per table).
  • ClickHouse insert error rate.
  • S3 sink 2PC rename failures.
  • Redis cache hit rate.

Correctness alarms (the L7 differentiators):

  • Exactly-once counter invariant: ingested_count == (stored_count + dedup_dropped_count + late_sidelined_count) ± small epsilon (<0.01%).
  • Reconciliation drift: |batch_total − speed_total| / batch_total per (advertiser, day). Alert on >0.01%.
  • Canary event end-to-end latency p99 (synthetic known events injected at edge, should land in ClickHouse within 10s).
  • Billing close timeliness: percentage of tenant-days CLOSED by T+24h.

Query-side alarms:

  • Dashboard API p99 / p50.
  • Redis hit rate.
  • ClickHouse query queue depth.

10h. Cost per billion events (rough)

Component $/billion events
Kafka (MSK) ~$2.70
Flink (EMR) ~$2.00
ClickHouse ~$1.30
S3 + Parquet ~$0.30
Redis ~$0.30
Spark daily ~$0.20
Postgres ~$0.10
Observability (Prom/Grafana) ~$0.30
Overhead / staging / CI ~$1.50
Total ~$9/billion

At 3B/day → $27/day → $10K/month (matches earlier estimate rough). Per-event cost ~$0.000009. For comparison: at $1 CPM ad, cost of pipeline is ~0.001% of revenue. Comfortable.

10i. Why not just use a warehouse (Snowflake / BigQuery) for everything?

  • Latency. Warehouses optimize throughput (GB/s scanned), not p99 query latency. 2s bound on dashboards impossible for unpredictable queries against raw. Pre-aggregation is mandatory.
  • Cost per query. BigQuery at $5/TB scanned × 10K dashboard qps × even small scans = millions/month vs ClickHouse's fixed cluster cost.
  • Where they DO fit: ad-hoc analyst queries, longer-than-90d retention, ML training data. BigQuery is our "data lake query layer" above S3 Parquet; ClickHouse is our "operational dashboard layer."

10j. What's uniquely L7 about this design

  1. Guarantee gradient as explicit design artifact (§7a) — most designers hand-wave "exactly-once"; L7 owns the per-hop guarantees and the known gaps.
  2. Is_approximate exposed to query caller — most designers bury uncertainty; L7 makes it a first-class response field.
  3. Reconciliation version as a schema primitive — ClickHouse ReplacingMergeTree + ingest_version lets batch retroactively overwrite speed without downtime.
  4. Budget-based drift alerting, not binary — recognizes that 0.001% drift is normal, 0.1% is a page-worthy incident.
  5. Explicit Lambda → Kappa+ evolution with reasoning for why not Kappa in v1 (billing TTL, fraud iteration).
  6. Failure modes crossed by a count invariant (§8 cross-cutting) — one number that tells you the pipeline is correct end-to-end.
  7. Crypto-erasure as v3 GDPR option — most designers just say "tombstone." L7 knows the limits of retention-based erasure and the architectural cost of true instant erasure.
  8. Hot-key detection as a closed loop — detector → producer salt table → auto-heal — not a manual mitigation.

Appendix: quick-ref for interview whiteboard #

The four numbers to memorize:

  • 100K eps peak × 500B = 50 MB/s ingest
  • 3B events/day = 1.5 TB raw / 95 GB Parquet compressed
  • 64 Kafka partitions (hash + salt for hot keys)
  • Reconciliation SLA: T+24h, <0.01% drift, else page

The three deep-dive topics:

  1. Exactly-once guarantee gradient (client event_id → Kafka idempotence → Flink state → ReplacingMergeTree FINAL)
  2. Hot-key salting + two-stage aggregation
  3. Lambda in v1, Kappa+ in v3, billing dictates

The one sentence architecture: "Client-side event_id anchors end-to-end exactly-once; Kafka (64 partitions, salted hot keys) feeds Flink (event-time windows, 10min allowed lateness) which writes idempotently to ClickHouse ReplacingMergeTree for dashboards and S3 Parquet for the Spark daily reconciler, whose output is the billing source of truth in Postgres."

esc
navigate open esc close