All problems

Q10 Orchestration & Coordination 28 min read 11 sections

Distributed Job Scheduler (Airflow-style)

Execute DAGs on schedule, recover from worker failures, and scale to a very large number of task runs per day.

SchedulingConsistencyAvailabilityObservabilityPartitioning

1 Problem Restatement & Clarifying Questions #

Restatement (30 sec, said aloud): Build a multi-tenant distributed scheduler that accepts user-defined DAGs of tasks, triggers them on cron/event/sensor conditions, dispatches tasks to workers honoring dependencies, retries on failure, provides a UI + audit log, and scales to 100M task executions/day. Source of truth is a metadata DB; workers are cattle; scheduler is HA with leader election; execution is at-least-once with idempotency-by-convention. Think Apache Airflow, Temporal (Uber Cadence), Argo Workflows, AWS Step Functions.

Clarifying questions (I would actually ask, grouped by how much they change the design):

Load-shape questions (change capacity numbers):

  1. "100M/day" — is that task executions (task_instance level) or DAG runs (workflow level)? Airflow-style DAGs have 10-100 tasks each, so 100M task-instances/day implies maybe 5-10M DAG runs/day. I'll assume task_instance-level (the harder number). Avg = 1.16K/sec, peak 5× = 5.8K/sec.
  2. Task duration distribution: p50 30s, p95 5min, p99 1hr, max 24hr? Long tail is what blows up worker pool sizing and poisons queues. I'll assume that distribution.
  3. Hourly/daily skew? Airflow users cron at top of hour → 12× spike at :00, 2-3× at :15/:30/:45. Bake this into queue + scheduler sizing.
  4. Peak:avg ratio: 5× nominal, 12× instantaneous at cron-alignment boundaries.

Correctness-contract questions (change deep-dive): 5. Execution semantics: at-least-once + user-idempotent is what Airflow ships. Does the user want effective exactly-once (Temporal-style deterministic replay + event sourcing)? That's a v3 feature and a different system. 6. Idempotency contract: do we require tasks to be idempotent, or do we provide fencing tokens (run_id + task_id + attempt_id) so the platform guarantees no duplicate side-effects? I'll offer both: dumb workers + convention, smart workers + deterministic replay (v3). 7. Cross-DAG dependencies? Sensors or explicit ExternalTaskMarker? Affects data model. 8. Backfill behavior: when a DAG is paused for 7 days then resumed, do we run 7 days of missed runs serially (catchup=True) or skip to now (catchup=False)? Both needed; backfill queue must not starve live queue.

Operational questions (change SLO / ops story): 9. SLA: scheduling latency = (scheduled_time → task_dispatched_to_worker). Target p99 < 5s. Also end-to-end DAG SLA alerting? Yes. 10. Multi-tenant isolation: noisy neighbor quotas, per-tenant queue? Yes, critical at 100M scale. 11. Task code: runs in containers (k8s), in-process Python (CeleryExecutor-style), or both? Both; but we do not design the container runtime — assume a K8s job API exists. 12. Regions: single-region active, multi-region DR, or active-active multi-region? I'll design single-region active + warm-standby DR; mention active-active in v3. 13. Retention: run logs & metadata — 90 days hot, 1 year cold? Log volume is the #1 cost driver. 14. Compliance: GDPR right-to-erasure on task logs? Immutable audit trail conflict? Address in §10.

Stated assumptions (I'd write these on the whiteboard so the interviewer can push back):

  • 100M task_instances/day, avg 1.16K/sec, peak 5.8K/sec sustained, 14K/sec momentary at top-of-hour
  • Avg task 60s on 1 CPU core
  • At-least-once + idempotent user tasks; v3 offers deterministic replay
  • Single writer metadata DB (Spanner or vertically-scaled Postgres with read replicas)
  • K8s exists as a runtime substrate; we don't design the pod scheduler
  • Web UI is read-mostly; 99% of reads answered from replicas or cache

2 Functional Requirements #

In scope (numbered so I can point at them during the interview):

  1. DAG authoring & registration — users submit a DAG spec (Python/YAML/JSON) declaring tasks, deps, schedule, retries, timeouts, owner, tags. System versions the DAG.
  2. Scheduling triggers — cron (with timezone + DST), interval, event (Kafka/webhook/sensor), manual trigger via API/UI, sensor polling (deferrable).
  3. Dependency resolution — topological order, upstream-must-succeed, branch operators, trigger_rule (one_success / all_done / all_failed), fan-out/fan-in (dynamic task mapping: task.expand(input=[...])).
  4. Dispatch & execution — queue-based, worker pulls, lease-based task ownership; honors priority, pool, concurrency caps (per-DAG, per-task, per-pool, global).
  5. Retries — per-task retry count, exponential backoff with jitter, retry on specific exceptions only, optional task-level circuit breaker.
  6. Timeouts — per-task hard timeout (lease expires → reassign); per-DAG SLA (alert if run > X).
  7. SLA alerting — missed SLA fires webhook / PagerDuty / email. Also: ExecutionTimeout, SensorTimeout.
  8. UI — list DAGs, inspect runs, view logs, trigger/pause/unpause, clear task state, view gantt, code view, audit log. Read-heavy; ~10:1 read:write.
  9. Historical run log — task-level logs streamed to object store; state events immutable in metadata DB; queryable for N days hot, M years cold.
  10. Deferrable tasks (triggerer) — long waits (e.g. "poll S3 for file for 12h") are offloaded from worker slots to a lightweight triggerer process that wakes the task when the async event fires.
  11. Backfill — re-run historical date ranges with throttling.
  12. Idempotency — same (dag_id, logical_date) returns same run_id; trigger_run is idempotent by caller-supplied key.

Out of scope (stated to constrain the design):

  • Container/VM scheduling (assume K8s Job API or similar runtime exists)
  • Data catalog / data lineage (mention hooks for OpenLineage, don't design)
  • In-task resource isolation (CPU/mem limits enforced by runtime, not scheduler)
  • Business-logic workflow semantics (saga compensation, etc. — covered in v3 as Temporal-style)
  • Actual DAG code sandboxing/security (auth'd via IAM; code scanning is a separate system)
  • Secrets management (assume Vault/Parameter Store integration)

3 NFRs + Capacity Estimate (BOE, show the math) #

Non-functional targets:

NFR Target Rationale
Availability (control plane) 99.95% (4.4h/yr) Scheduler down = nothing runs; missed cron = SLA breach. Google+Meta internal schedulers sit at 99.95-99.99.
Scheduling latency (scheduled_ts → dispatched) p50 1s, p99 5s, p999 30s Airflow 2.x ships ~2s p50. Tight enough that cron-misalignment isn't perceptible; loose enough that you don't need hard real-time.
Worker-start-to-task-exec latency p99 < 10s for in-process exec; < 60s cold k8s pod Split accountability: scheduler owns dispatch; runtime owns pod start.
Execution semantics At-least-once + idempotency-by-convention (v1-v2); effective exactly-once via event sourcing (v3) Distributed-systems reality: exactly-once requires deterministic replay of user code, which users won't write unless we force it.
Durability (metadata) RPO ≤ 1 min, RTO ≤ 15 min Synchronous replica + snapshot. Losing 1 min of state = re-run at most 1 min of dispatches (idempotent so safe).
Log durability 11 9's via object store (S3/GCS) Standard.

Capacity estimate (all numbers derived):

INPUT
  N_tasks       = 100,000,000 / day
  seconds_day   = 86,400
  peak_ratio    = 5× sustained,   12× instantaneous (top-of-hour)
  avg_task_dur  = 60s  (assumed; p95 = 5 min, p99 = 1 hr)
  events_per_ti = 5  (queued, running, success/fail, heartbeats compacted to 1, log-flush)
  retry_rate    = 3%  (industry: 1-5% of tasks retry)
  attempts_avg  = 1 + 0.03 * 1.5 ≈ 1.045  (most retries succeed on 2nd attempt)

DISPATCH QPS
  avg_dispatch  = 100M / 86400 × 1.045 ≈ 1.21K/sec
  peak_dispatch = 1.21K × 5 = 6.05K/sec  sustained
  burst         = 1.21K × 12 = 14.5K/sec at :00

WORKER POOL SIZING (Little's Law: L = λW)
  L = concurrent_tasks = λ × W
  λ = 1.21K/sec dispatch
  W = 60s mean duration
  L_avg  = 1.21K × 60 = 72.6K concurrent tasks (avg)
  L_peak = 6.05K × 60 = 363K concurrent tasks (peak, sustained)
  → need ~350-400K worker slots (cores) at peak
  → at 32 cores/host → ~11K-12K worker hosts at peak
  With p99 = 1hr tail: long-runners occupy ~1% of slots at steady state
    = 0.01 × 100M × 3600 / 86400 = 41,700 slots perpetually tied up by long tails
    → add ~12% headroom

METADATA DB WRITE LOAD
  events/sec avg  = 1.21K × 5 = 6.05K writes/sec
  events/sec peak = 6.05K × 5 = 30K writes/sec
  Each event row ≈ 500-1KB (state transition + metadata)
  Daily write volume = 100M × 5 × 1KB = 500 GB/day raw
  + indexes 2-3× → ~1.5 TB/day rolling WAL
  90-day hot retention → 135 TB hot tier
  Cold archive → Parquet on S3, ~10× compression → 15 TB cold

QUEUE DEPTH
  Steady state at peak: L_peak = 363K tasks "in flight"
  Queue "ready" depth (dispatched-but-not-yet-claimed): tiny, sub-second
  Queue "scheduled but not due" depth: up to 24h of future tasks
    worst case if all DAGs schedule daily at midnight:
    100M tasks sitting in a "scheduled at T" zset/heap at T-ε
    → partition by shard_key (hash(dag_id) % N), N=64-256

LEADER ELECTION / HEARTBEAT
  Scheduler lease: 10s (tight enough for <15s failover, loose enough to absorb GC pauses)
  Worker heartbeat: 10s (miss 3 → reap at 30s)
  With 12K workers, heartbeat rate = 1.2K/sec writes against a worker_liveness
    table (or better: a Redis SETEX with TTL, avoid DB hot row)

LOG VOLUME
  Avg task log = 10 KB (stdout + stderr captured)
  100M × 10KB = 1 TB/day
  90-day hot = 90 TB on S3 STANDARD_IA → ~$2K/month
  Trace/event logs (state transitions, scheduling decisions) = ~500 GB/day
  Shipped to BigQuery / Snowflake for analytics; 90d partition pruning

COST (rough order)
  12K hosts × $400/mo (m5.4xlarge class) = $4.8M/mo compute (dominant)
  Metadata DB (Spanner 3-node HA or RDS r6g.16xlarge × 3) = $30-80K/mo
  Queue (Redis Cluster 16 shards × HA pair + Kafka 32 brokers) = $40-60K/mo
  Logs + traces storage + BigQuery = $20K/mo
  Control plane (scheduler + triggerer + API servers, say 30 hosts) = $15K/mo
  TOTAL ≈ $5M/mo  → $0.0017 per task execution

Key coherence checks (the interviewer will cross-check these):

  • 6K/sec dispatch × 5 events = 30K writes/sec on metadata → single Postgres WAL tops out at ~10-30K/sec; need Spanner, or Postgres with queue events offloaded to Redis/Kafka and only lifecycle events in DB (this is what Airflow 2.x does with LocalExecutor deprecation and CeleryKubernetesExecutor push).
  • 350K worker slots: that's Meta-Borg-scale. Airflow docs say the community pushes single deployments to ~50K concurrent tasks; beyond that you shard by cluster/tenant or move to Temporal (which scales further via sharded task queues on Cassandra).
  • Leader election 10s lease: with 10s TTL and 3s heartbeat, a single scheduler crash = at most 13-15s of lost dispatch → 18-22K tasks backlogged → catchable in <10s of sprint dispatch by the new leader (needs to burst-dispatch at ~3-5K/sec for 10s, which it can because DB SKIP LOCKED scales).

4 High-Level API #

Surface: gRPC (server-to-server, e.g. sensor webhooks from other systems) + REST/JSON (UI, CLI). Auth: OIDC + IAM (per-DAG ACLs). All mutating calls idempotent via client-supplied Idempotency-Key.

service Scheduler {
  // DAG authoring
  rpc RegisterDag(RegisterDagRequest) returns (RegisterDagResponse);
    // payload: dag_spec (YAML/proto), version auto-bumped on content-hash change
    // idempotent on (dag_id, content_hash)

  rpc UpdateDag(UpdateDagRequest) returns (UpdateDagResponse);
    // pause/unpause, owner change, SLA change

  rpc DeleteDag(DeleteDagRequest) returns (google.protobuf.Empty);
    // soft delete; history retained for audit

  // Runs
  rpc TriggerRun(TriggerRunRequest) returns (TriggerRunResponse);
    // request: { dag_id, logical_date, params, run_id (optional, caller-supplied) }
    // idempotent on (dag_id, run_id); if run_id absent, server generates UUIDv7
    // returns run_id + state=queued

  rpc GetRunStatus(GetRunStatusRequest) returns (RunStatus);
    // cacheable 1s server-side; read from replica

  rpc ListRuns(ListRunsRequest) returns (ListRunsResponse);
    // filter by dag_id, state, time window; paginated

  rpc CancelRun(CancelRunRequest) returns (CancelRunResponse);
    // sets run state=cancelling; scheduler fences running tasks via lease revocation

  rpc RetryTask(RetryTaskRequest) returns (RetryTaskResponse);
    // manual retry of failed task_instance; bumps attempt_id
    // idempotent on (ti_id, target_attempt_id)

  // Task-level
  rpc ListTasks(ListTasksRequest) returns (ListTasksResponse);
  rpc GetTaskLogs(GetTaskLogsRequest) returns (stream LogChunk);
    // streamed from object store via signed URL indirection for large logs

  // Sensor / event hooks
  rpc SignalEvent(SignalEventRequest) returns (SignalEventResponse);
    // webhook endpoint external systems call ("file ready at s3://...")
    // triggerer matches waiting deferrables and advances state

  // Worker<->scheduler (internal plane, not user-facing)
  rpc WorkerHeartbeat(stream HeartbeatRequest) returns (stream HeartbeatResponse);
  rpc ClaimTask(ClaimTaskRequest) returns (ClaimTaskResponse);
    // lease-based; returns lease_token (fencing token, monotonic)
  rpc ExtendLease(ExtendLeaseRequest) returns (ExtendLeaseResponse);
  rpc ReportTaskResult(ReportTaskResultRequest) returns (ReportTaskResultResponse);
    // idempotent on (ti_id, attempt_id, lease_token); stale lease => rejected
}

Error codes (gRPC canonical + domain extensions):

  • ALREADY_EXISTS: duplicate DAG version or run_id — return existing resource
  • FAILED_PRECONDITION: DAG paused, upstream dep unmet, lease expired
  • RESOURCE_EXHAUSTED: per-tenant quota hit — client should back off (signaled via retry-after)
  • ABORTED: lease fencing token stale (another worker took over) — worker must abort
  • DEADLINE_EXCEEDED: task hit hard timeout — scheduler reassigns

Idempotency implementation:

  • Idempotency-Key (HTTP header) or client_request_id (gRPC metadata)
  • Server stores (idempotency_key, request_hash, response, expires_at) in a 24h TTL cache (Redis)
  • Replay within TTL returns cached response; replay with same key but different request body → INVALID_ARGUMENT
  • For TriggerRun, the durable idempotency is via (dag_id, run_id) PK in dag_run — the idem cache is just a speed-up for the API layer

5 Data Schema #

Storage layers & rationale:

Data Store Why Rejected
DAG spec, versioning Postgres (or Spanner) Small (10K DAGs × 100KB avg = 1 GB), transactional S3-only: no txn with state; ZooKeeper: wrong shape
DAG runs, task_instance state Postgres → Spanner at scale Source of truth; needs ACID + SKIP LOCKED DynamoDB: no SKIP LOCKED pattern; Cassandra: no transactions
Task queue (dispatched, ready-to-claim) Redis Streams or Kafka (or DB-backed SKIP LOCKED under 5K/sec) Low-latency claim, consumer groups SQS: 10s visibility timeout too coarse for 5s-p99 SLO
Scheduled-but-not-due tasks ("when to dispatch") Redis ZSET (score = unix_ts) sharded, OR partitioned table with ready_at index Need O(log N) "pop all due tasks" Kafka: not a heap
Worker liveness Redis TTL keys 1.2K heartbeat/sec is hot row in any RDBMS Postgres: WAL churn
Task logs (stdout/stderr) Object store (S3/GCS) 1 TB/day, write-heavy, immutable DB: wrong cost structure
Audit / event stream Kafka → BigQuery for analytics; mirror to S3 for cold audit Append-only, high volume DB: wrong scale
Leader election, config etcd (or ZK) Native leases, watches Postgres advisory locks: works at small scale but no watch primitive

Core tables (Postgres / Spanner flavor):

-- DAG authoring
CREATE TABLE dag_spec (
  dag_id        TEXT NOT NULL,
  version       INT  NOT NULL,
  content_hash  TEXT NOT NULL,  -- sha256 of serialized spec
  spec_blob     BYTEA NOT NULL, -- serialized YAML/proto
  owner         TEXT NOT NULL,
  tenant_id     TEXT NOT NULL,
  schedule_cron TEXT,           -- null for triggered-only DAGs
  timezone      TEXT NOT NULL,  -- IANA tz
  paused        BOOL DEFAULT FALSE,
  sla_seconds   INT,            -- null = no SLA
  catchup       BOOL DEFAULT FALSE,
  max_active    INT DEFAULT 16, -- concurrency cap
  created_ts    TIMESTAMPTZ DEFAULT NOW(),
  PRIMARY KEY (dag_id, version)
);
CREATE INDEX ON dag_spec (tenant_id, dag_id) WHERE NOT paused;

-- Run-level
CREATE TABLE dag_run (
  run_id        UUID NOT NULL,        -- UUIDv7 = time-ordered
  dag_id        TEXT NOT NULL,
  dag_version   INT  NOT NULL,
  logical_date  TIMESTAMPTZ NOT NULL, -- scheduled time (cron)
  state         SMALLINT NOT NULL,    -- queued/running/success/failed/cancelled
  trigger_type  SMALLINT NOT NULL,    -- cron/manual/backfill/event
  started_ts    TIMESTAMPTZ,
  ended_ts      TIMESTAMPTZ,
  tenant_id     TEXT NOT NULL,
  PRIMARY KEY (run_id),
  UNIQUE (dag_id, logical_date, trigger_type)  -- idempotency
);
CREATE INDEX ON dag_run (dag_id, logical_date DESC);
CREATE INDEX ON dag_run (state, logical_date) WHERE state IN (0,1);  -- hot queries

-- Task-instance (the big table)
CREATE TABLE task_instance (
  ti_id         UUID NOT NULL,        -- UUIDv7
  run_id        UUID NOT NULL REFERENCES dag_run(run_id),
  task_id       TEXT NOT NULL,        -- logical id in DAG
  map_index     INT NOT NULL DEFAULT -1,  -- for dynamic fanout; -1 = unmapped
  attempt       INT NOT NULL DEFAULT 1,
  state         SMALLINT NOT NULL,    -- queued/scheduled/running/success/failed/up_for_retry/deferred/skipped
  priority      INT DEFAULT 0,
  queue_name    TEXT NOT NULL,
  worker_id     TEXT,                 -- null until claimed
  lease_token   BIGINT,               -- monotonic fencing token
  lease_expires TIMESTAMPTZ,
  scheduled_ts  TIMESTAMPTZ NOT NULL, -- when to dispatch (after backoff etc)
  started_ts    TIMESTAMPTZ,
  ended_ts      TIMESTAMPTZ,
  exit_code     INT,
  next_retry_ts TIMESTAMPTZ,
  PRIMARY KEY (ti_id),
  UNIQUE (run_id, task_id, map_index, attempt)
);
-- Critical hot index: the dispatch query
CREATE INDEX ti_dispatch ON task_instance (queue_name, scheduled_ts)
  WHERE state = 1 /* scheduled-ready */;
-- Partition by hash(run_id) for write spread (Postgres declarative partitioning or Spanner interleaving)
CREATE INDEX ti_lease ON task_instance (lease_expires)
  WHERE state = 2 /* running */ AND lease_expires IS NOT NULL;

-- Queue partitions (if using DB-backed queue)
-- Actually a view over task_instance + queue metadata; physical queue often in Redis/Kafka.

-- Worker pool state (in Redis, mirrored periodically to DB for history)
-- Redis: SET worker:<id> {host, capacity, tags, last_hb} EX 30
-- DB (historical):
CREATE TABLE worker_event (
  worker_id   TEXT NOT NULL,
  event_ts    TIMESTAMPTZ NOT NULL,
  event_type  SMALLINT,  -- registered/unregistered/drained/died
  capacity    INT,
  PRIMARY KEY (worker_id, event_ts)
);

-- Triggerer state for deferred/async waits
CREATE TABLE trigger_row (
  trigger_id    UUID NOT NULL,
  ti_id         UUID NOT NULL REFERENCES task_instance(ti_id),
  triggerer_id  TEXT,                -- which triggerer process owns it
  classpath     TEXT NOT NULL,        -- user-provided async fn
  kwargs_json   JSONB NOT NULL,
  created_ts    TIMESTAMPTZ DEFAULT NOW(),
  next_poll_ts  TIMESTAMPTZ,
  PRIMARY KEY (trigger_id)
);
CREATE INDEX ON trigger_row (triggerer_id, next_poll_ts) WHERE triggerer_id IS NOT NULL;

-- Immutable event log (append-only, for audit + UI timeline)
CREATE TABLE ti_event (
  ti_id       UUID NOT NULL,
  event_ts    TIMESTAMPTZ NOT NULL,
  event_type  SMALLINT NOT NULL,   -- queued/dispatched/started/heartbeat/succeeded/failed/retried/cancelled
  worker_id   TEXT,
  attempt     INT,
  message     TEXT,
  PRIMARY KEY (ti_id, event_ts, event_type)
) PARTITION BY RANGE (event_ts);  -- daily partitions, drop after 90d

Why this schema is defensible:

  • UUIDv7 for run_id and ti_id: time-ordered so BTREE inserts are append-mostly, not random. Snowflake-style IDs also fine; UUIDv7 avoids centralized ID service.
  • UNIQUE (run_id, task_id, map_index, attempt): this is the core idempotency key. A worker reporting success for attempt=1 cannot conflict with attempt=2's retry.
  • lease_token monotonic: fencing. If worker A is partitioned and lease expires, worker B takes over with a higher token. A's late "I succeeded" write is rejected because token_old < token_new. This is Martin Kleppmann's fencing pattern — see §7.1.
  • Partial index on dispatch query: WHERE state=1 keeps the BTREE tiny (only "ready" rows), so SKIP LOCKED scan stays O(k) where k=batch size, not O(total_task_instances).
  • Rejected DynamoDB: Airflow-style needs SELECT tasks WHERE state=scheduled AND scheduled_ts<=NOW() ORDER BY priority LIMIT 100 FOR UPDATE SKIP LOCKED. Dynamo has no SKIP LOCKED analog; you can build one with conditional writes + retry storms, but it's worse than a well-tuned Postgres up to ~5-10K/sec.
  • Rejected single Postgres beyond v2: at 30K writes/sec peak, single-writer Postgres WAL bottlenecks around 10-30K/sec depending on row size. Spanner or Aurora with multi-master for v2→v3. Or shard the metadata by tenant_id (Airbnb did this with multiple Airflow deployments + a federation layer).

6 System Diagram (CENTERPIECE) #

6.1 Overall architecture

                                       +-----------------------------------+
                                       |             USERS                 |
                                       |  Web UI  |  CLI  |  API clients  |
                                       |  Sensors/Upstream Systems        |
                                       +------+-----------+-------+-------+
                                              |           |       |
                                       HTTPS  |  gRPC/TLS |  webhook (event)
                                              v           v       v
   +------------------------------------------+-----------+-------+----------------------------+
   |                               EDGE / API LAYER  (stateless, N=8)                          |
   |    Envoy / GFE  ->  API Server  (auth, rate limit, idem cache in Redis)                   |
   |    OpenAPI/gRPC;  reads from metadata replicas; writes via leader-aware routing           |
   +-----+---------------+--------------------------+---------------------+--------------------+
         |  write path   |  read path              |  webhook            |  WS for UI push
         v               v                         v                     v
    +----+-----+   +-----+------+             +----+-----+        +------+------+
    | DAG Reg  |   | Read Cache |             | Triggerer|        |   UI SSR    |
    | (parser, |   |  (Redis +  |             |  Pool    |        |  (Next.js)  |
    |  validator|  |  Postgres  |             |  N = 12  |        +-------------+
    |  compiler)|  |  replicas) |             | async I/O|
    +----+-----+   +------------+             +----+-----+
         |                                         |
         v                                         v
   +-----+-------------------------------------------------------------------------------------+
   |                           CONTROL PLANE  (leader-elected via etcd)                         |
   |                                                                                            |
   |   +-----------------+      etcd cluster (5 nodes, Raft) <------------+                     |
   |   |  SCHEDULER POOL |      - leader lease (10s TTL, 3s renew)        |                     |
   |   |   (N=5, 1 lead, |      - /sched/leader, /sched/shards/*         |                     |
   |   |    4 standby)   |      - compare-and-swap for shard claims      |                     |
   |   |                 |                                                |                     |
   |   | DagFileProcessor+->  SHARDED SCHEDULER LOOPS (64 shards)  <------+                     |
   |   | (parse every 30s)    shard_id = hash(dag_id) % 64                                     |
   |   |                                                                                        |
   |   |   per shard: tick every 1s:                                                            |
   |   |     1. SELECT due runs                                                                 |
   |   |     2. resolve deps                                                                    |
   |   |     3. INSERT task_instance rows (state=scheduled)                                     |
   |   |     4. enqueue to Redis/Kafka                                                          |
   |   +-------+------------+---------------------------------------------------------+--------+
   |           |            |                                                          |
   |           | 5-30 K/sec | INSERT/UPDATE                                             |
   |           v            v                                                          |
   |   +-------+------------+-------------+      +----------------------+               |
   |   |         METADATA DB              |      |    QUEUE LAYER      |               |
   |   |  (Spanner / PG+replicas)         |      |  Redis Streams +    |               |
   |   |  - dag_spec                      |      |  Kafka (mirror)     |               |
   |   |  - dag_run                       |      |  - 64 partitions    |               |
   |   |  - task_instance  (partitioned)  |      |  - per-queue + per- |               |
   |   |  - ti_event       (daily parts)  |      |    priority streams |               |
   |   |  - trigger_row                   |      |  - XADD on dispatch |               |
   |   |  HA: sync replica + 2 async      |      |  - XREADGROUP by    |               |
   |   |  Backup: WAL ship + daily snap   |      |    worker           |               |
   |   +----------------------------------+      +----+----------------+               |
   |                                                  ^                                 |
   +--------------------------------------------------|---------------------------------+
                                                      |   XREADGROUP (claim)
                                                      |   10 ms p99
                                                      |
        +---------------------------------------------+---------------------+
        |                          WORKER POOL (N = 11-12K hosts)            |
        |                                                                    |
        |   Worker process                                                   |
        |     - pulls from Redis stream (XREADGROUP count=8)                 |
        |     - ClaimTask RPC to scheduler -> lease_token (fencing)          |
        |     - forks subprocess OR submits k8s Job (runtime-dependent)      |
        |     - streams stdout/stderr to FluentBit -> S3                     |
        |     - heartbeats 10s -> Redis SETEX worker:<id>                    |
        |     - on complete: ReportTaskResult (lease_token included)         |
        |     - lease renewal every 30s while running                        |
        |                                                                    |
        |   Capacity tag per worker: {mem, cpu, gpu, queue_subscriptions}    |
        +----------+---------------------------------------------+-----------+
                   |                                             |
                   | stdout/stderr (10 KB avg)                   | stdout
                   v                                             v
           +-------+---------+                            +------+--------+
           |  FluentBit →    |                            |  K8s runtime  |
           |  Log Collector  |                            |  (pod manages |
           |  (N=20)         |                            |   task code)  |
           +-------+---------+                            +---------------+
                   |
                   v
           +-------+--------+       +----------------+       +--------------+
           |  Object Store  |       |     Kafka      |  ->   |  BigQuery /  |
           |  S3/GCS (logs) |       |  ti_event bus  |       |  Snowflake   |
           |  1 TB/day      |       |  audit stream  |       |  (analytics) |
           +----------------+       +----------------+       +--------------+

6.2 Sub-diagram: scheduling loop (per-shard tick)

tick t = every 1s per shard (64 shards; one process per shard claimed via etcd CAS)

  [1] SELECT ready dag_runs (state=scheduled, scheduled_ts <= NOW())
        ORDER BY priority, scheduled_ts
        LIMIT 500
        FOR UPDATE SKIP LOCKED;

  [2] For each run:
        resolve DAG (cached parse in memory, reload on version change)
        compute new task_instances whose deps are satisfied
        (upstream states from task_instance rows for this run_id)

  [3] BATCH INSERT new task_instance rows with:
        state = 'scheduled'
        scheduled_ts = NOW() (or + backoff for retries)
        queue_name = task.queue (default: "default")
        priority = max(task.prio, dag.prio, tenant.prio)

  [4] For each inserted ti:
        Redis: XADD queue:<queue_name>:<priority> ti_id <JSON payload>
                  MAXLEN ~ 1M (cap to avoid runaway)
        Kafka: produce to topic "dispatch" partition=hash(ti_id)
                  (mirror for durability + analytics)

  [5] UPDATE dag_run.state = 'running' where all children scheduled

  [6] Publish to ti_event log: {ti_id, event_type=queued, ts}

  Failure of step 3 after steps 1-2: transaction rollback, SKIP LOCKED releases;
  next tick re-selects.
  Failure of step 4 after step 3: task row exists but not enqueued.
    → periodic reconciler (every 60s) scans state=scheduled AND NOT in queue
    → re-enqueues. This is the anti-"lost dispatch" guardrail.

6.3 Sub-diagram: task lease lifecycle (THE fencing story)

  [Worker pulls from queue]
        XREADGROUP GROUP workers <worker_id> COUNT 8 STREAMS queue:default >
        -> gets ti_id

  [Worker claims]                             [Scheduler]
        ClaimTask(ti_id, worker_id) ---------->  CAS:
                                                  UPDATE task_instance
                                                  SET state='running',
                                                      worker_id=?,
                                                      lease_token = nextval(lease_seq),
                                                      lease_expires = NOW() + 60s
                                                  WHERE ti_id=? AND state='scheduled';
                                                 returns {lease_token: 42}  or FAILED_PRECONDITION

  [Worker executes user code]
        every 30s:
          ExtendLease(ti_id, lease_token=42) -> lease_expires += 60s

  Scenario A (happy path):
        done -> ReportTaskResult(ti_id, token=42, exit=0)
                Scheduler CAS:
                   UPDATE task_instance
                   SET state='success', ended_ts=NOW()
                   WHERE ti_id=? AND lease_token=42;
                   (if 0 rows updated: stale token, worker's write is DISCARDED)

  Scenario B (worker partitioned):
        worker's lease_expires = T+60
        worker continues running user code (cannot be killed from outside)
        at T+70: scheduler's reaper: lease_expires < NOW()
           UPDATE task_instance
           SET state='up_for_retry', worker_id=NULL, lease_token=NULL
           WHERE ti_id=? AND lease_expires < NOW() AND state='running';
           re-enqueue with attempt = attempt + 1

        meanwhile at T+75 partitioned worker finishes:
          ReportTaskResult(ti_id, token=42, exit=0)
          Scheduler: WHERE lease_token=42 → 0 rows (now token=43 or NULL)
          rejected (ABORTED) → worker logs and drops
          ← THIS IS WHERE AT-LEAST-ONCE BITES IDEMPOTENCY:
             the side-effects of attempt 1 AND attempt 2 may both have happened.
             user code must tolerate (see §7.2)

  Scenario C (lease extension fails):
        worker's ExtendLease returns ABORTED (lease already reassigned)
        worker must KILL its subprocess immediately (SIGKILL the user code
        via runtime API, or orphan the k8s Job — which is less clean)
        and report nothing.
        → minimizes duplicate-side-effect window but doesn't eliminate it.

6.4 Sub-diagram: failure recovery (scheduler crash)

  etcd lease semantics:
    scheduler leader holds /sched/leader key with TTL=10s, renew every 3s.

  T=0:   leader S1 holds lease, dispatching.
  T=5:   S1 JVM stops-the-world GC for 8s (pathological).
         S1 cannot renew.
  T=10:  etcd lease expires. S2, S3, S4, S5 all watching /sched/leader.
  T=10.1: etcd fires "key deleted" to all watchers.
  T=10.1-10.3: S2-S5 race CAS to create /sched/leader with their own ID.
         Only one wins (atomic CAS). Say S2.
  T=10.3: S2 starts dispatching. Reads last_dispatched_offset from DB
         (or from etcd /sched/last_offset); resumes SKIP LOCKED polling.

  CRITICAL: fencing token on dispatch writes.
    Every scheduler stamps task_instance.scheduler_epoch = etcd_lease_revision.
    S1, when GC finishes at T=13, thinks it's still leader.
    It tries to UPDATE task_instance ... SET scheduler_epoch=S1.epoch=100.
    Condition: WHERE scheduler_epoch < 100. But S2 already wrote epoch=101.
    So S1's writes are all rejected by the CAS.
    S1 then reads /sched/leader, sees S2, goes to standby.

  Blast radius of the 10s outage:
    - 1.21K/sec × 10s = 12K tasks that should have been dispatched.
    - They sit in task_instance.state='scheduled' for 10s extra.
    - p99 scheduling latency spikes to ~12s during the incident window.
    - SLO says p99=5s so this is a page (incident severity minor).
    - No correctness violation.

  If instead S1 double-dispatched during GC freeze (no fencing): same ti_id
  enqueued twice → worker claims twice → two workers run user code →
  DOUBLE side effects. This is the Airflow 1.10.x bug CVE-2020-17515 era.
  Fencing token makes this impossible.

6.5 Arrow annotations (protocol / QPS / payload)

Edge Protocol Avg QPS Peak QPS Payload
User → API HTTPS/gRPC 200/s 2K/s 1-10 KB
API → DB replicas SQL 1.5K/s reads 10K/s 1-50 KB
Scheduler → DB leader SQL 6K/s writes 30K/s 500 B/row
Scheduler → Redis queue RESP 1.2K XADD/s 6K/s 2 KB/msg
Scheduler → Kafka audit TCP 6K/s 30K/s 1 KB/event
Worker → Redis XREADGROUP RESP 1.2K claim/s 6K/s 2 KB/msg
Worker → Scheduler ClaimTask gRPC 1.2K/s 6K/s 100 B req, 200 B resp
Worker → Scheduler ExtendLease gRPC 24K/s (30s × 70K concurrent) 120K/s 50 B req
Worker → S3 log upload HTTPS ~5K/s 30K/s 10 KB (avg), buffered
Scheduler ↔ etcd gRPC ~100/s 500/s 100 B
Triggerer → DB SQL 200/s 2K/s 500 B

ExtendLease is 120K/sec peak — the hottest RPC. Options:

  • Keep it simple: Redis EXPIRE worker:<id>:lease:<ti_id> instead of DB.
  • Piggyback on heartbeat: one "alive" message carries all leases.
  • Variable lease durations: long-running tasks get longer leases (5min) → 10× fewer extends.

This is a real L7 detail — most candidates miss that worker-scheduler chatter dominates control-plane QPS.


7 Deep Dives #

7.1 Scheduler HA: Leader Election without Split-Brain

Why critical: Airflow 1.x shipped without leader election. Operators ran multiple schedulers "for HA" and they competed for the same task rows, double-dispatching. This caused CVE-era production incidents at Lyft, Airbnb, and Bloomberg. Airflow 2.0 (2020) shipped an HA scheduler using row-level locks (SELECT ... FOR UPDATE SKIP LOCKED) on Postgres; this works but has quirks (see below). Temporal/Cadence use a sharded model with Raft for shard ownership. Getting this right is the difference between "works in staging" and "doesn't page at 3 AM."

Three alternatives considered:

(A) Row-locking without external leader election (Airflow 2.x default)

  • Multiple scheduler processes run. Each polls the DB with SELECT ... FOR UPDATE SKIP LOCKED. Postgres gives each process disjoint rows.
  • Pro: no separate coordination service; simpler ops.
  • Con: every scheduler hits the DB every tick → N× read load. With 5 schedulers and 1s tick, you have 5 QPS just for polling plus N× index scans.
  • Con: relies on the DB's lock manager scaling. Postgres SKIP LOCKED works well up to about 5-10K ops/sec on a single node before WAL write amplification on the locks table becomes a bottleneck. Beyond that, split by queue_shard_id physically.
  • Con: no clear "primary" for non-DB operations (cache warming, DAG file parsing, metrics aggregation). Airflow works around by having each scheduler parse a subset of DAG files (scheduler_heartbeat_sec, max_threads). Works but is awkward.
  • Failure mode: if one scheduler hangs in a long DB transaction, SKIP LOCKED still works (locks are released on backend death/timeout). But lock held by a zombie connection → up to deadlock_timeout + statement_timeout delay before reclamation.

(B) Single leader via etcd/ZooKeeper + sharded workers (Google Borg, Temporal)

  • One leader elected via etcd lease (10s TTL). Leader owns scheduling loop. Others are hot standby.
  • For scale: leader doesn't do all work; it assigns shards (0..N-1) to standby nodes via etcd keys /sched/shards/0 = node2. Each node services its shards. Leader re-assigns shards on node loss.
  • Pro: clean leader story; non-DB work (metrics, parsing) has a natural owner.
  • Pro: fencing tokens come free (etcd revision number is monotonic).
  • Con: etcd becomes a critical dependency. etcd outage → everyone thinks they're leader → use fencing token to reject stale writes → degradation is "no new dispatches" not "double dispatches."
  • Con: more moving parts to reason about.

(C) Event-sourced workflow engine (Temporal/Cadence model) — deferred to v3

  • Workflow state is stored as an event log. Any worker can pick up any workflow by replaying from the log. No "scheduler" in the classic sense.
  • Pro: no leader needed; no split-brain possible.
  • Pro: deterministic replay → effective exactly-once at the workflow level.
  • Con: requires user code to be deterministic (no time.time(), no unsafe random(), etc.). Workflow SDK enforces this via lints + runtime checks.
  • Con: reshaping the programming model. Not compatible with Airflow's "arbitrary Python task" ethos.
  • Verdict: put in v3 roadmap; don't try to ship v1 this way.

Chosen: Hybrid of (A) and (B) for v2:

  • etcd-based leader election for the control plane duties (DAG file parsing allocation, shard re-balancing, cluster-wide state like quotas, maintenance mode).
  • 64-shard scheduling loop: each shard claimed via etcd CAS on /sched/shards/<n>. Each shard polls DB with SELECT ... WHERE hash(dag_id) % 64 = <shard> ... SKIP LOCKED.
  • Fencing token: every scheduler stamps scheduler_epoch on writes; stale leader's writes rejected.
  • DB ops/sec stays manageable because each shard scans a partition, not the whole table.

Quantified: at 6K writes/sec peak, 64 shards = 94 writes/sec/shard on average. Each shard's polling query hits (queue_name, scheduled_ts) partial index on a partition of ~100K rows → sub-ms plan. This is Airflow 2.7+ architecture essentially.

Failure modes & mitigations:

Failure Detection Mitigation Recovery
Scheduler leader GC-freeze etcd lease expires Standby takes over via CAS; old leader's writes rejected by fencing 10-15s of missed dispatches
Partial network partition: leader can reach DB but not etcd Leader sees etcd errors; stops dispatching preemptively After 2 consecutive etcd timeouts, voluntarily step down Standby takes over
etcd cluster minority partition Members report ErrNoLeader Scheduler can't renew lease → steps down → dispatching halts until etcd heals Dispatching stops (CP system); tasks pile up in DB; no corruption
Two schedulers both think they're leader Fencing token mismatch on DB write CAS rejects one → steps down Degradation = missed writes, reconciler picks up
Shard never claimed (all claimants died) Leader's shard-health check every 10s Leader re-assigns to another standby 10-30s of that shard's DAGs being behind

The earned-secret detail most candidates miss: The interaction between scheduler lease TTL, worker lease TTL, and DB failover timing. If you set scheduler lease=10s and DB failover=30s, then on DB primary failure: scheduler leader can't write → doesn't renew etcd lease → dropped as leader → a new leader comes up but also can't write → churn. The fix: scheduler should hold leadership across transient DB errors; only drop leadership if etcd itself is unreachable. Otherwise you get "re-election storms" during DB failovers that compound the outage.

7.2 Effective Exactly-Once via Fencing + Idempotency

Why critical: At 100M executions/day, if 0.1% duplicate-execute, that's 100K double-runs/day. If even 1% of tasks have non-idempotent side effects (charging a credit card, sending an email), that's 1K customer-affecting duplicate events per day. Exactly-once is the holy grail; at-least-once + idempotency is the practical substitute. This is where Airflow's "just retry and hope it's idempotent" approach breaks at scale.

Three approaches:

(A) Pure at-least-once + trust-the-user

  • Platform: task can run 0 or N times; user guarantees safe retries.
  • Pro: simple; no platform-imposed constraints on user code.
  • Con: users don't get it right. Every large Airflow user I've talked to at Meta and elsewhere has "duplicate email" or "double-charged" incidents in their history.
  • Con: debugging is an exercise in blame-shifting.

(B) At-least-once + platform-fenced writes (fencing tokens exposed to user code)

  • Platform passes (run_id, ti_id, attempt) to user task as a fencing token. User code is expected to use it on any external write: INSERT ... ON CONFLICT DO NOTHING with that token as the natural key, or CAS with token as a version, or idempotent API calls keyed by the token.
  • Pro: the platform provides the mechanism; user code has a clear contract.
  • Pro: works across heterogeneous user code (SQL, HTTP, Kafka).
  • Con: user code must be audited. No compiler enforcement.
  • Con: doesn't help for side-effects the user forgot to key (e.g. "send email" where email API doesn't expose idempotency key — workaround: wrap with an idempotency table in the user's DB).

(C) Deterministic replay + event sourcing (Temporal model)

  • Workflow state is an event log. Each user workflow decision → event. Replaying events reconstructs state deterministically. Workflow worker compares replay events to recorded events → cannot "forget" or duplicate.
  • Platform distinguishes: workflow functions (deterministic, replayable) and activities (non-deterministic, at-least-once with user idempotency).
  • Exactly-once at the workflow orchestration level is real. Activities still need to be idempotent — but the blast radius of a duplicate activity is much smaller because the workflow records "I already called activity X and got result Y" and won't re-call it on replay.
  • Pro: best-in-class correctness. Used by Uber, Coinbase, Airbnb (Temporal), Stripe (internal), Snap.
  • Con: programming model constraint. Requires SDK + language support (Go, Java, Python, TypeScript, Ruby today).
  • Con: not a fit for Airflow-like "arbitrary Python task" ecosystem without major migration.

Chosen for v2: approach (B) with a strong platform stance. The AttemptContext API gives every task a fencing token; our SDK provides helpers for the 5 most common sinks (Postgres, MySQL, Kafka, S3, HTTP via idempotency-key header). Deterministic replay is v3.

Implementation detail:

# User code contract
@task(idempotency_required=True)
def charge_customer(customer_id: str, amount: int, ctx: AttemptContext):
    # ctx.fencing_key = f"{ctx.run_id}:{ctx.ti_id}:{ctx.attempt}"
    #   NOT {attempt} alone — we want to key on the INSTANCE, so retries
    #   with attempt=2 are a different key (we want to actually re-run the
    #   *logical* retry). But see note below.

    # For a charge: we want exactly one charge per (run_id, ti_id).
    # The attempt doesn't matter for the external effect — a retry means
    # "the previous attempt failed, retry the SAME logical charge."
    idem_key = f"charge:{ctx.run_id}:{ctx.ti_id}"  # NO attempt!

    response = stripe.Charge.create(
        amount=amount,
        currency="usd",
        customer=customer_id,
        idempotency_key=idem_key,
    )
    return response.id

Subtle: the fencing token for external side effects omits the attempt (we want ONE side effect across all attempts). The fencing token for worker ownership (DB lease_token) includes the attempt because we want to tell worker-A-on-attempt-1 apart from worker-B-on-attempt-2. Two different fencing tokens, two different purposes, commonly conflated.

Quantitative: Stripe's idempotency layer gives <1/10^9 collision on a 24h window; equivalent for most modern APIs. Our platform SDK stores fencing keys in the metadata DB for 7 days as a belt-and-suspenders against user SDK bugs.

Failure modes:

Failure Mitigation
User forgot to use fencing key Platform lint catches common patterns (stripe, sql INSERT without ON CONFLICT, etc.); CI integration. For non-SDK sinks: review mandatory.
External system doesn't support idempotency keys Wrap with "outbox table" pattern in user's DB: write intent + fencing key → then external call → then mark outbox row done. On retry, check outbox first.
Fencing token collision (theoretical) UUIDv7 gives 10^-12 collision over 100 years; ti_id from a single generator across the cluster. Not a real risk.
Zombie worker completes after reassignment lease_token CAS rejects the success report. Worker logs and aborts. Side-effect may have happened → idempotency on the external side handles it.
Multiple retries exhaust idempotency window Set window > max_retry_total_duration (typically 7 days for our config).

Earned secret: you cannot fence the user code's subprocess. If worker's lease expires, the runtime (k8s) will SIGTERM the pod eventually, but during the gap, user code can keep running and issue external writes. Temporal/Cadence solve this by making the activity call home before every external write — the server gates each call. Airflow doesn't do this. At 100M/day scale, we need something in between: our SDK's I/O primitives for external systems do a heartbeat + lease_check before the actual call. If lease is stale, the helper raises an exception. User code can still bypass the SDK and call raw requests.post, but the SDK's common helpers are the 80% path.

7.3 Long-Tail and Skew: Keeping the Queue Flowing

Why critical: Airflow users constantly complain about "my queue is full of a 6-hour task and nothing else can run." At 100M/day with p99=1hr task duration, roughly 1% of tasks consume >1 hour of worker time. Math: 0.01 × 100M × 3600s = 3.6M "slot-seconds" per day from the tail, on a pool with 350K slots = 86,400 sec/day each = 30.2G total slot-seconds/day. 3.6M / 30.2G = 12% of pool capacity permanently occupied by long-tails. If they all land on one priority queue, they starve everything else.

Three mitigations, stacked:

(A) Separate priority queues + weighted fair queueing

  • Queues: default, priority-high, priority-low, batch (long-running), plus per-tenant queues for noisy neighbors.
  • Workers subscribe to queues with weights: 40% priority-high, 40% default, 20% batch. Workers pull from each weighted-round-robin.
  • Pro: simple, deployable.
  • Con: static weights don't adapt.

(B) Dedicated worker pools for long-running tasks

  • Task-level hint: pool='long'. Scheduler routes to workers tagged pool=long.
  • Pool has its own sizing. When long-pool is full, tasks wait there, not in the main queue.
  • Pro: hard isolation. Main queue never blocked by long tails.
  • Con: requires user to annotate tasks accurately.
  • Con: under-utilization if long pool has spare capacity.

(C) Preemption (last resort)

  • Scheduler can evict a low-priority task in favor of a high-priority one.
  • Requires user code to handle SIGTERM + checkpoint → messy. Few users implement.
  • Only sensible for tasks the platform can safely re-run from scratch.

Chosen: (A) + (B) layered:

  • Three default queues: default, high, long.
  • Workers report capacity_by_queue.
  • Per-tenant: a tenant:<id> queue with a reservation (≤N% of total pool) — this is the noisy-neighbor defense.
  • Quota manager: cluster-level token bucket per tenant; requests exceeding quota queued in tenant queue with lower priority.

Skew within a DAG (fan-out with one laggard):

  • DAG extract → 100 parallel transform_i → single load. If transform_42 is 10× slower, the DAG can't load until it completes.
  • Solutions:
    1. Speculative retry — if transform_42 runs >2× p95 of its siblings, start a duplicate and take first result. Requires idempotency.
    2. Relaxed trigger rulesload can run on all_done (not all_success), accepting partial data.
    3. Data partitioning — if transform_42 is slow because its input shard is huge, pre-partition into transform_42_a, _b, _c. Scheduler supports dynamic task mapping (Airflow task.expand()).

Real-system comparisons:

System Approach Limitation at 100M scale
Airflow 2.x CeleryExecutor Queue per priority; pool feature Pool is a global mutex, contended at 10K+ concurrent
Airflow KubernetesExecutor Each task = pod; no queue starvation (scheduler pressure moves to k8s) Pod start latency (~30s cold) blows p99 scheduling latency
Temporal Sharded task queues; poll-based workers Shard rebalance on worker add/remove causes latency blip (~500ms-5s)
Argo Workflows K8s-native; per-step pods Same as Airflow K8sExecutor; pod start dominates
AWS Step Functions Managed; stateless tasks Good for short tasks; long-tail still occupies RUNNING state budget
Google Borg/Omega Batch vs service priority tiers Similar conceptually; our "queue" is their "priority band"

Failure modes & mitigations:

Failure Mitigation
All workers subscribed to same queue; one tenant blasts 10K tasks Per-tenant quota enforced at scheduler dispatch (refuses to enqueue beyond quota)
Long-pool starves when there's no long work Workers can subscribe to multiple queues with priority; if primary queue empty, pull from secondary
Small task stuck behind large batch Separate queues by SIZE (default = <5min tasks, long = ≥5min) with estimator from historical avg
Priority inversion: low-prio task holds a shared pool resource Pool-level priority inheritance: promote pool-holder's priority when high-prio task is blocked

Earned secret: The naive approach is "one big queue, sort by priority." It fails because pulling a batch of 100 tasks all of which are tiny from a queue where a huge task is item #1 requires you to skip-and-continue, which Redis Streams / Kafka don't natively do. Redis XREADGROUP returns FIFO within a consumer group. To get priority you need separate streams per priority and worker-side merging with weighted selection. Kafka similarly needs topic-per-priority. This leaks implementation detail into the task routing layer and is why almost every real production scheduler ends up with N queues per priority tier × M queues per tenant/pool = lots of queues. At 100M/day you're running 256+ Redis streams across sharded clusters.


8 Failure Modes & Resilience (Runbook Table) #

Component Failure Detection Blast radius Mitigation Recovery (RTO)
Scheduler leader JVM crash / OOM etcd lease not renewed within 10s Dispatching halts for ~10-15s; ~12-18K tasks' scheduling latency spikes Standby takes over via etcd CAS; fencing token ensures no double-dispatch <15s
Scheduler shard owner Process hangs mid-tick Leader's shard-health timer (30s) Only that shard's DAGs affected (~2% of fleet) Leader re-assigns shard via etcd; SKIP LOCKED frees abandoned rows 30-60s
Worker Host crash mid-task Lease expires at 60s Task attempt 1 abandoned; re-enqueued as attempt 2 Scheduler reaper sweeps expired leases; CAS reset state, re-enqueue 60-90s per task
Worker Slow heartbeat (GC pause) Lease at 60% → worker self-steps-down gracefully Task killed; re-enqueued Worker SDK kills subprocess on lease-expiry-approaching Prevention, not recovery
Metadata DB primary Hardware failure DB proxy / Patroni detects All writes blocked 15-30s Sync replica promoted; scheduler retries dispatch; read replicas handle UI 30s
Metadata DB (total) Region outage Cross-region failover Control plane down in primary region Warm standby in secondary region; DNS cutover; RPO=1min of lost dispatches (idempotency saves us) 15 min
Queue (Redis) Primary node down Redis Sentinel / Cluster Claims for that partition stall Replica promoted; XADD retried; idempotency on ti_event prevents re-enqueue loops <30s
Queue Total Redis cluster loss Monitoring Live tasks lost their "claim ticket" Scheduler rebuilds queue from task_instance WHERE state='scheduled' AND NOT in_redis 5-10 min
etcd cluster Loss of quorum (minor partition) Leader sees ErrNoLeader Scheduler can't renew; steps down; NO new dispatches Wait for etcd recovery; running tasks complete; new tasks delayed Duration of partition
Clock skew Worker clock +30s ahead Heartbeat ts skew monitor Leases evaluated against worker clock fire early → premature task kills All time comparisons use scheduler/DB clock (NOW() in SQL); NTP hard req on workers (max 100ms skew); lease math on server side only Prevention
Clock skew Scheduler clock jumps backward (NTP slew) Alert on NOW() non-monotonic Cron triggers could re-fire Use MAX(last_fired_ts, NOW()) + store last_fired in dag_run with UNIQUE constraint (idempotency) Correctness maintained
DST transition 2 AM local happens twice (fall back) or skipped (spring forward) Tests + monitoring Cron 0 2 * * * fires twice or zero times Scheduler evaluates cron in UTC internally, converts to tz for display; policy: "nonexistent-time = skip, ambiguous-time = run once (first)"; document on every DAG Documented behavior
Poison task Same task fails all retries attempt >= max_retries DAG fails; SLA alert fires Move ti to DLQ table; surface in UI; block DAG if user opts in to "halt on poison" Manual intervention
DLQ overflow DLQ grows > 1M rows DLQ size metric UI slow; operator fatigue Auto-expire after 30 days; alert ops; per-DAG DLQ quotas Auto + manual triage
Triggerer crash All deferred tasks stuck Triggerer heartbeat to DB Deferred tasks in limbo until reaper Another triggerer claims orphaned triggers via TTL; rehydrate state from trigger_row 30-60s
Log collector (FluentBit) Downstream S3 errors Collector backlog grows Logs buffered in local disk (bounded); user sees stale logs in UI Backpressure to worker bounded by disk; 24h buffer Minutes to S3 recovery
Massive cron alignment (top-of-hour) 100K DAGs all cron 0 * * * * Burst detector in scheduler Queue depth spikes; scheduling latency p99 breach Automatic scheduling jitter added to cron (0-60s); token bucket on tenant dispatch rate Self-healing, with SLO dip
Backfill storm Operator backfills 30 days of DAGs manually Monitoring Primary queue overwhelmed Backfill goes to a separate backfill queue with dedicated pool (5% of fleet); rate-limit Auto
Noisy tenant Tenant A runs 50M tasks in one day Per-tenant counters Could starve other tenants Per-tenant quota: rate-limit TriggerRun + dispatch at scheduler; oversubscription = 429 Policy

Monitoring SLIs (pager-relevant):

  • scheduler_dispatch_latency_p99: target <5s; page at >10s for 5 min
  • scheduler_leader_elections_per_hour: target <1; page at >3
  • worker_lease_expirations_per_min: baseline established per fleet; page on 5× baseline (indicates partition or overload)
  • metadata_db_replication_lag: target <100ms; page at >1s
  • queue_depth_p99: target <10K; page at >100K (cascade incoming)
  • dlq_size_growth_rate: page on >100 tasks/hour moving to DLQ
  • trigger_row_stuck_count: tasks deferred >2× timeout — page

9 Evolution Path #

v1 — Single-node Airflow clone (0→1M tasks/day)

  • Single scheduler process, single Postgres, no HA.
  • LocalExecutor (subprocess) or CeleryExecutor with one Redis broker.
  • No triggerer; deferrable = sensor-poll = worker slot occupied.
  • Audience: first 2-3 teams inside the company.
  • What breaks first: sensors eat worker slots; Postgres CPU at ~5K/sec writes; one scheduler → SPOF.

v2 — HA scheduler + queue + triggerer (1M→100M tasks/day)

  • 5-node scheduler with etcd leader election + 64-shard sharding.
  • Redis Streams queue (sharded), Kafka mirror for audit.
  • Triggerer pool (12 nodes, asyncio).
  • Postgres → Spanner migration OR Postgres + Redis-backed event store to offload write volume.
  • Worker pool with heartbeat + lease + fencing token.
  • DLQ, per-tenant quota, priority queues.
  • Web UI served from read replicas + Redis cache.
  • Observability: Prometheus + OpenTelemetry traces on scheduler ticks and dispatch paths.
  • This is "what I'd build for interview" and covers 100M/day.

v3 — Event-sourced multi-region (100M→10B tasks/day OR critical-correctness workloads)

  • Migrate to Temporal-style event-sourced workflow engine for the critical-correctness tier (e.g. payment workflows, privacy DSARs). Non-critical batch stays on v2.
  • Shared-nothing shards on Cassandra or Spanner; each shard owns a subset of workflows; workflow is a replayable sequence of events.
  • Multi-region active-active: workflow state replicated globally; leader per shard region-pinned with failover.
  • Deterministic replay: workflow code (not activity code) must be deterministic; SDK enforces via sandboxing.
  • Effective exactly-once at workflow level; activities still at-least-once with idempotency.
  • Multi-tenant isolation: per-tenant namespaces with soft and hard quotas; dedicated cells for largest tenants.
  • Regulatory: immutable append-only event log with hash chaining (BlockchainDB-lite for tamper evidence); per-record TTL for GDPR but with audit-safe "deletion tombstones."

Migration strategies:

  • v1→v2: dual-write DAG state from v1 and v2 schedulers against the same DB; gradually cut traffic per-DAG via routing table; rollback if error rate up.
  • v2→v3: side-by-side clusters; new DAGs opt into v3; old DAGs stay on v2; never migrate live DAGs (too risky).

10 Out-of-1-Hour Notes (depth-on-demand) #

Deterministic replay vs event-sourced vs checkpoint-and-resume:

  • Checkpoint-and-resume (Flink, Spark): every N seconds snapshot state. On failure, reload snapshot, replay input since. Requires state to be serializable. Good for data streams; clumsy for workflows.
  • Event sourcing + replay (Temporal): state is derived from events; replay reconstructs. Works for long-running workflows. Requires user code determinism.
  • Neither (Airflow): each task is stateless from the platform's view; user must design retries to be idempotent. Simplest; weakest guarantees. Breaks at high scale + high correctness.
  • Rule of thumb: choose event sourcing if any task has >10 steps with interleaved external effects; choose checkpoint for batch/streaming; choose "nothing" for ops that are naturally idempotent.

Cron semantics around DST:

  • Stripe, AWS, and GCP Cloud Scheduler all have had incidents from ambiguous DST interpretation.
  • Two competing philosophies:
    • Wall-clock fidelity: 0 2 * * * fires when local clock shows 2:00. In fall-back, fires twice (2:00 DST, then 2:00 standard). In spring-forward, skipped.
    • UTC stability: cron expression evaluated in UTC; ignores DST. Predictable but surprising for users who want "every night at 2 local."
  • Our policy: internally store DAGs with explicit (cron, tz). Evaluator uses zoneinfo + dateutil.rrule. Ambiguous time: fire ONCE (the first occurrence). Nonexistent time: SKIP. Policy documented. UI displays "next 3 runs" so users catch misconfiguration before it bites.

Sensor storms: polling vs triggerer-push:

  • Traditional: a sensor is a task that loops while not condition: sleep(poke_interval). Each sensor occupies a worker slot. 10K sensors × 1 slot each = 10K slot-hours/day of waste.
  • Triggerer: an async process that registers with the condition source (S3 bucket event, Kafka topic position, REST API). Lightweight: one triggerer can handle 10K+ concurrent waits via asyncio + aiohttp. Uses O(1) threads, O(N) memory where N = trigger count.
  • When condition fires: triggerer writes to DB, scheduler picks up, schedules downstream task.
  • Backpressure: if triggerer pool falls behind, triggers queue in DB; new triggers go to least-loaded triggerer.
  • Failure: triggerer crash → orphan triggers reclaimed by another via TTL → continuity at cost of 30-60s delay.

Back-pressure propagation from workers → scheduler:

  • If workers can't keep up, queue depth grows. Scheduler should stop dispatching new work until queue drains rather than pile up Redis memory / Kafka lag.
  • Signal: per-queue depth → scheduler reads every tick → if >threshold, skip dispatching to that queue this tick.
  • Also: per-tenant dispatch token bucket refilled based on queue drain rate. Fast-draining tenant gets more tokens.
  • Alternative: rely on Kafka producer blocking. Works but couples semantic state to a transport layer.

Multi-tenant resource quotas:

  • Quotas at 4 levels: (1) rate of DAG registration, (2) rate of DAG trigger, (3) concurrent running task_instances, (4) log storage MB.
  • Each level: soft limit (throttle), hard limit (reject).
  • Fair queueing: tenants share pool but with weights; a starving tenant's pending count grows → its dispatch priority increases (dynamic weight).
  • This is the "dominant resource fairness" idea (DRF, Google Borg). Overkill for small deployments, essential at 100M/day.

Regulatory audit (immutable run log):

  • ti_event table is append-only. Daily partitions are made read-only after 24h (Postgres table-level REVOKE UPDATE, DELETE).
  • Mirror to object store as Parquet files with SHA-256 hash chain (each file's hash includes prev file's hash) — tamper evidence without full blockchain.
  • GDPR tension: user can request deletion of PII in run params. Resolution: store PII in a separate params_pii table with FK; drop the PII table on deletion request; audit log retains the event (task X ran) but not the data (for user Y). Must be designed, not retrofitted.
  • SOX compliance: keep all state transitions for 7 years.

Cost per execution:

  • From §3: $0.0017 per task. Dominated by compute (95%). Control plane cost per task: ~$0.0001.
  • Comparison: AWS Step Functions Standard is $0.025 per state transition — a typical workflow with 20 transitions = $0.5. Ours is $0.034 for 20 tasks. 15× cheaper, at the cost of running the infrastructure yourself. Break-even: ~500K runs/day.
  • Lambda equivalent: $0.20 per million invocations + $1.67 per GB-hour. A 60s 1GB Lambda task = $0.00102 (compute) + $0.0000002 (invocation) = $0.001. Similar, but Lambda hides queue/scheduler cost.

Discontinuous optimizations as scale climbs:

  • 10K/day → 1M/day: add retries and UI polish.
  • 1M/day → 10M/day: move logs to object store, add queue metrics, begin to tune DB.
  • 10M/day → 100M/day: shard scheduler, migrate queue to Redis Streams, triggerer pool mandatory.
  • 100M/day → 1B/day: metadata DB must shard or migrate to Spanner. Queue must move to Kafka (10x write throughput over Redis). Workers must co-locate with runtime (data locality).
  • 1B/day → 10B/day: multi-region, event-sourced. Everything above doubled.

Observability maturity curve:

  • L0: logs in files. (v1)
  • L1: Prometheus metrics, Grafana. (v1 end)
  • L2: distributed tracing across scheduler→queue→worker→runtime (OpenTelemetry). (v2)
  • L3: per-DAG SLO dashboards with auto-attribution ("this DAG is slow because queue X is full"). (v2 end)
  • L4: cross-system causal analysis — integrate with infra telemetry ("DAG spike coincides with DB replica lag spike"). (v3)

Chaos / disaster drills:

  • Monthly: kill scheduler leader, verify failover <15s.
  • Monthly: kill a random queue shard, verify task continuity.
  • Quarterly: DB failover rehearsal (non-prod first, then planned maintenance window in prod).
  • Annually: simulated region outage.
  • Per-change: every deploy goes through staged rollout (1%, 10%, 50%, 100%) with auto-rollback on SLO breach.

Things I'd refuse to build in v1 if I had authority:

  • "Exactly-once" marketing claim. Promise at-least-once + idempotency; don't lie.
  • Synchronous cross-DAG dependencies as a first-class feature (it's a coupling anti-pattern; use events instead).
  • In-scheduler business logic (like "send Slack when task fails"). Put it in a notifier service that subscribes to the event bus.

Closing: Why this design, in one paragraph #

100M task-instances/day is beyond single-node Airflow and inside the comfort zone of a sharded scheduler with an external queue. The critical invariants are: (1) source of truth is a transactional DB with a monotonic fencing token on every write, so split-brain can't corrupt; (2) dispatch via row-level locks with SKIP LOCKED up to ~5K/sec per shard, horizontally scaled via hash(dag_id) % 64 partitioning; (3) workers are cattle with lease-based ownership, leaning on idempotency (both platform-enforced and user-contracted) for at-least-once to feel like exactly-once; (4) operational escape hatches (DLQ, per-tenant quotas, priority queues, triggerer for deferrable waits) so that the long tail and noisy neighbors never starve the system. When scale outgrows this (>1B/day or harder correctness needs), evolve to Temporal-style event sourcing. When a Google L7 interviewer asks "how do you prevent double-dispatch on scheduler failover?" the one-line answer is: "fencing token on every scheduler-origin write, validated by CAS in the DB; old leader's writes are rejected by definition, so double-dispatch becomes a definitional impossibility, not a probabilistic one." Everything else is plumbing.

esc
navigate open esc close