Q10 Orchestration & Coordination 28 min read 11 sections
Distributed Job Scheduler (Airflow-style)
Execute DAGs on schedule, recover from worker failures, and scale to a very large number of task runs per day.
1 Problem Restatement & Clarifying Questions #
Restatement (30 sec, said aloud): Build a multi-tenant distributed scheduler that accepts user-defined DAGs of tasks, triggers them on cron/event/sensor conditions, dispatches tasks to workers honoring dependencies, retries on failure, provides a UI + audit log, and scales to 100M task executions/day. Source of truth is a metadata DB; workers are cattle; scheduler is HA with leader election; execution is at-least-once with idempotency-by-convention. Think Apache Airflow, Temporal (Uber Cadence), Argo Workflows, AWS Step Functions.
Clarifying questions (I would actually ask, grouped by how much they change the design):
Load-shape questions (change capacity numbers):
- "100M/day" — is that task executions (task_instance level) or DAG runs (workflow level)? Airflow-style DAGs have 10-100 tasks each, so 100M task-instances/day implies maybe 5-10M DAG runs/day. I'll assume task_instance-level (the harder number). Avg = 1.16K/sec, peak 5× = 5.8K/sec.
- Task duration distribution: p50 30s, p95 5min, p99 1hr, max 24hr? Long tail is what blows up worker pool sizing and poisons queues. I'll assume that distribution.
- Hourly/daily skew? Airflow users cron at top of hour → 12× spike at :00, 2-3× at :15/:30/:45. Bake this into queue + scheduler sizing.
- Peak:avg ratio: 5× nominal, 12× instantaneous at cron-alignment boundaries.
Correctness-contract questions (change deep-dive):
5. Execution semantics: at-least-once + user-idempotent is what Airflow ships. Does the user want effective exactly-once (Temporal-style deterministic replay + event sourcing)? That's a v3 feature and a different system.
6. Idempotency contract: do we require tasks to be idempotent, or do we provide fencing tokens (run_id + task_id + attempt_id) so the platform guarantees no duplicate side-effects? I'll offer both: dumb workers + convention, smart workers + deterministic replay (v3).
7. Cross-DAG dependencies? Sensors or explicit ExternalTaskMarker? Affects data model.
8. Backfill behavior: when a DAG is paused for 7 days then resumed, do we run 7 days of missed runs serially (catchup=True) or skip to now (catchup=False)? Both needed; backfill queue must not starve live queue.
Operational questions (change SLO / ops story): 9. SLA: scheduling latency = (scheduled_time → task_dispatched_to_worker). Target p99 < 5s. Also end-to-end DAG SLA alerting? Yes. 10. Multi-tenant isolation: noisy neighbor quotas, per-tenant queue? Yes, critical at 100M scale. 11. Task code: runs in containers (k8s), in-process Python (CeleryExecutor-style), or both? Both; but we do not design the container runtime — assume a K8s job API exists. 12. Regions: single-region active, multi-region DR, or active-active multi-region? I'll design single-region active + warm-standby DR; mention active-active in v3. 13. Retention: run logs & metadata — 90 days hot, 1 year cold? Log volume is the #1 cost driver. 14. Compliance: GDPR right-to-erasure on task logs? Immutable audit trail conflict? Address in §10.
Stated assumptions (I'd write these on the whiteboard so the interviewer can push back):
- 100M task_instances/day, avg 1.16K/sec, peak 5.8K/sec sustained, 14K/sec momentary at top-of-hour
- Avg task 60s on 1 CPU core
- At-least-once + idempotent user tasks; v3 offers deterministic replay
- Single writer metadata DB (Spanner or vertically-scaled Postgres with read replicas)
- K8s exists as a runtime substrate; we don't design the pod scheduler
- Web UI is read-mostly; 99% of reads answered from replicas or cache
2 Functional Requirements #
In scope (numbered so I can point at them during the interview):
- DAG authoring & registration — users submit a DAG spec (Python/YAML/JSON) declaring tasks, deps, schedule, retries, timeouts, owner, tags. System versions the DAG.
- Scheduling triggers — cron (with timezone + DST), interval, event (Kafka/webhook/sensor), manual trigger via API/UI, sensor polling (deferrable).
- Dependency resolution — topological order, upstream-must-succeed, branch operators, trigger_rule (one_success / all_done / all_failed), fan-out/fan-in (dynamic task mapping:
task.expand(input=[...])). - Dispatch & execution — queue-based, worker pulls, lease-based task ownership; honors priority, pool, concurrency caps (per-DAG, per-task, per-pool, global).
- Retries — per-task retry count, exponential backoff with jitter, retry on specific exceptions only, optional task-level circuit breaker.
- Timeouts — per-task hard timeout (lease expires → reassign); per-DAG SLA (alert if run > X).
- SLA alerting — missed SLA fires webhook / PagerDuty / email. Also:
ExecutionTimeout,SensorTimeout. - UI — list DAGs, inspect runs, view logs, trigger/pause/unpause, clear task state, view gantt, code view, audit log. Read-heavy; ~10:1 read:write.
- Historical run log — task-level logs streamed to object store; state events immutable in metadata DB; queryable for N days hot, M years cold.
- Deferrable tasks (triggerer) — long waits (e.g. "poll S3 for file for 12h") are offloaded from worker slots to a lightweight triggerer process that wakes the task when the async event fires.
- Backfill — re-run historical date ranges with throttling.
- Idempotency — same (dag_id, logical_date) returns same run_id;
trigger_runis idempotent by caller-supplied key.
Out of scope (stated to constrain the design):
- Container/VM scheduling (assume K8s Job API or similar runtime exists)
- Data catalog / data lineage (mention hooks for OpenLineage, don't design)
- In-task resource isolation (CPU/mem limits enforced by runtime, not scheduler)
- Business-logic workflow semantics (saga compensation, etc. — covered in v3 as Temporal-style)
- Actual DAG code sandboxing/security (auth'd via IAM; code scanning is a separate system)
- Secrets management (assume Vault/Parameter Store integration)
3 NFRs + Capacity Estimate (BOE, show the math) #
Non-functional targets:
| NFR | Target | Rationale |
|---|---|---|
| Availability (control plane) | 99.95% (4.4h/yr) | Scheduler down = nothing runs; missed cron = SLA breach. Google+Meta internal schedulers sit at 99.95-99.99. |
| Scheduling latency (scheduled_ts → dispatched) | p50 1s, p99 5s, p999 30s | Airflow 2.x ships ~2s p50. Tight enough that cron-misalignment isn't perceptible; loose enough that you don't need hard real-time. |
| Worker-start-to-task-exec latency | p99 < 10s for in-process exec; < 60s cold k8s pod | Split accountability: scheduler owns dispatch; runtime owns pod start. |
| Execution semantics | At-least-once + idempotency-by-convention (v1-v2); effective exactly-once via event sourcing (v3) | Distributed-systems reality: exactly-once requires deterministic replay of user code, which users won't write unless we force it. |
| Durability (metadata) | RPO ≤ 1 min, RTO ≤ 15 min | Synchronous replica + snapshot. Losing 1 min of state = re-run at most 1 min of dispatches (idempotent so safe). |
| Log durability | 11 9's via object store (S3/GCS) | Standard. |
Capacity estimate (all numbers derived):
INPUT
N_tasks = 100,000,000 / day
seconds_day = 86,400
peak_ratio = 5× sustained, 12× instantaneous (top-of-hour)
avg_task_dur = 60s (assumed; p95 = 5 min, p99 = 1 hr)
events_per_ti = 5 (queued, running, success/fail, heartbeats compacted to 1, log-flush)
retry_rate = 3% (industry: 1-5% of tasks retry)
attempts_avg = 1 + 0.03 * 1.5 ≈ 1.045 (most retries succeed on 2nd attempt)
DISPATCH QPS
avg_dispatch = 100M / 86400 × 1.045 ≈ 1.21K/sec
peak_dispatch = 1.21K × 5 = 6.05K/sec sustained
burst = 1.21K × 12 = 14.5K/sec at :00
WORKER POOL SIZING (Little's Law: L = λW)
L = concurrent_tasks = λ × W
λ = 1.21K/sec dispatch
W = 60s mean duration
L_avg = 1.21K × 60 = 72.6K concurrent tasks (avg)
L_peak = 6.05K × 60 = 363K concurrent tasks (peak, sustained)
→ need ~350-400K worker slots (cores) at peak
→ at 32 cores/host → ~11K-12K worker hosts at peak
With p99 = 1hr tail: long-runners occupy ~1% of slots at steady state
= 0.01 × 100M × 3600 / 86400 = 41,700 slots perpetually tied up by long tails
→ add ~12% headroom
METADATA DB WRITE LOAD
events/sec avg = 1.21K × 5 = 6.05K writes/sec
events/sec peak = 6.05K × 5 = 30K writes/sec
Each event row ≈ 500-1KB (state transition + metadata)
Daily write volume = 100M × 5 × 1KB = 500 GB/day raw
+ indexes 2-3× → ~1.5 TB/day rolling WAL
90-day hot retention → 135 TB hot tier
Cold archive → Parquet on S3, ~10× compression → 15 TB cold
QUEUE DEPTH
Steady state at peak: L_peak = 363K tasks "in flight"
Queue "ready" depth (dispatched-but-not-yet-claimed): tiny, sub-second
Queue "scheduled but not due" depth: up to 24h of future tasks
worst case if all DAGs schedule daily at midnight:
100M tasks sitting in a "scheduled at T" zset/heap at T-ε
→ partition by shard_key (hash(dag_id) % N), N=64-256
LEADER ELECTION / HEARTBEAT
Scheduler lease: 10s (tight enough for <15s failover, loose enough to absorb GC pauses)
Worker heartbeat: 10s (miss 3 → reap at 30s)
With 12K workers, heartbeat rate = 1.2K/sec writes against a worker_liveness
table (or better: a Redis SETEX with TTL, avoid DB hot row)
LOG VOLUME
Avg task log = 10 KB (stdout + stderr captured)
100M × 10KB = 1 TB/day
90-day hot = 90 TB on S3 STANDARD_IA → ~$2K/month
Trace/event logs (state transitions, scheduling decisions) = ~500 GB/day
Shipped to BigQuery / Snowflake for analytics; 90d partition pruning
COST (rough order)
12K hosts × $400/mo (m5.4xlarge class) = $4.8M/mo compute (dominant)
Metadata DB (Spanner 3-node HA or RDS r6g.16xlarge × 3) = $30-80K/mo
Queue (Redis Cluster 16 shards × HA pair + Kafka 32 brokers) = $40-60K/mo
Logs + traces storage + BigQuery = $20K/mo
Control plane (scheduler + triggerer + API servers, say 30 hosts) = $15K/mo
TOTAL ≈ $5M/mo → $0.0017 per task execution
Key coherence checks (the interviewer will cross-check these):
- 6K/sec dispatch × 5 events = 30K writes/sec on metadata → single Postgres WAL tops out at ~10-30K/sec; need Spanner, or Postgres with queue events offloaded to Redis/Kafka and only lifecycle events in DB (this is what Airflow 2.x does with
LocalExecutordeprecation andCeleryKubernetesExecutorpush). - 350K worker slots: that's Meta-Borg-scale. Airflow docs say the community pushes single deployments to ~50K concurrent tasks; beyond that you shard by cluster/tenant or move to Temporal (which scales further via sharded task queues on Cassandra).
- Leader election 10s lease: with 10s TTL and 3s heartbeat, a single scheduler crash = at most 13-15s of lost dispatch → 18-22K tasks backlogged → catchable in <10s of sprint dispatch by the new leader (needs to burst-dispatch at ~3-5K/sec for 10s, which it can because DB SKIP LOCKED scales).
4 High-Level API #
Surface: gRPC (server-to-server, e.g. sensor webhooks from other systems) + REST/JSON (UI, CLI). Auth: OIDC + IAM (per-DAG ACLs). All mutating calls idempotent via client-supplied Idempotency-Key.
service Scheduler {
// DAG authoring
rpc RegisterDag(RegisterDagRequest) returns (RegisterDagResponse);
// payload: dag_spec (YAML/proto), version auto-bumped on content-hash change
// idempotent on (dag_id, content_hash)
rpc UpdateDag(UpdateDagRequest) returns (UpdateDagResponse);
// pause/unpause, owner change, SLA change
rpc DeleteDag(DeleteDagRequest) returns (google.protobuf.Empty);
// soft delete; history retained for audit
// Runs
rpc TriggerRun(TriggerRunRequest) returns (TriggerRunResponse);
// request: { dag_id, logical_date, params, run_id (optional, caller-supplied) }
// idempotent on (dag_id, run_id); if run_id absent, server generates UUIDv7
// returns run_id + state=queued
rpc GetRunStatus(GetRunStatusRequest) returns (RunStatus);
// cacheable 1s server-side; read from replica
rpc ListRuns(ListRunsRequest) returns (ListRunsResponse);
// filter by dag_id, state, time window; paginated
rpc CancelRun(CancelRunRequest) returns (CancelRunResponse);
// sets run state=cancelling; scheduler fences running tasks via lease revocation
rpc RetryTask(RetryTaskRequest) returns (RetryTaskResponse);
// manual retry of failed task_instance; bumps attempt_id
// idempotent on (ti_id, target_attempt_id)
// Task-level
rpc ListTasks(ListTasksRequest) returns (ListTasksResponse);
rpc GetTaskLogs(GetTaskLogsRequest) returns (stream LogChunk);
// streamed from object store via signed URL indirection for large logs
// Sensor / event hooks
rpc SignalEvent(SignalEventRequest) returns (SignalEventResponse);
// webhook endpoint external systems call ("file ready at s3://...")
// triggerer matches waiting deferrables and advances state
// Worker<->scheduler (internal plane, not user-facing)
rpc WorkerHeartbeat(stream HeartbeatRequest) returns (stream HeartbeatResponse);
rpc ClaimTask(ClaimTaskRequest) returns (ClaimTaskResponse);
// lease-based; returns lease_token (fencing token, monotonic)
rpc ExtendLease(ExtendLeaseRequest) returns (ExtendLeaseResponse);
rpc ReportTaskResult(ReportTaskResultRequest) returns (ReportTaskResultResponse);
// idempotent on (ti_id, attempt_id, lease_token); stale lease => rejected
}
Error codes (gRPC canonical + domain extensions):
ALREADY_EXISTS: duplicate DAG version or run_id — return existing resourceFAILED_PRECONDITION: DAG paused, upstream dep unmet, lease expiredRESOURCE_EXHAUSTED: per-tenant quota hit — client should back off (signaled viaretry-after)ABORTED: lease fencing token stale (another worker took over) — worker must abortDEADLINE_EXCEEDED: task hit hard timeout — scheduler reassigns
Idempotency implementation:
Idempotency-Key(HTTP header) orclient_request_id(gRPC metadata)- Server stores
(idempotency_key, request_hash, response, expires_at)in a 24h TTL cache (Redis) - Replay within TTL returns cached response; replay with same key but different request body →
INVALID_ARGUMENT - For
TriggerRun, the durable idempotency is via(dag_id, run_id)PK indag_run— the idem cache is just a speed-up for the API layer
5 Data Schema #
Storage layers & rationale:
| Data | Store | Why | Rejected |
|---|---|---|---|
| DAG spec, versioning | Postgres (or Spanner) | Small (10K DAGs × 100KB avg = 1 GB), transactional | S3-only: no txn with state; ZooKeeper: wrong shape |
| DAG runs, task_instance state | Postgres → Spanner at scale | Source of truth; needs ACID + SKIP LOCKED | DynamoDB: no SKIP LOCKED pattern; Cassandra: no transactions |
| Task queue (dispatched, ready-to-claim) | Redis Streams or Kafka (or DB-backed SKIP LOCKED under 5K/sec) | Low-latency claim, consumer groups | SQS: 10s visibility timeout too coarse for 5s-p99 SLO |
| Scheduled-but-not-due tasks ("when to dispatch") | Redis ZSET (score = unix_ts) sharded, OR partitioned table with ready_at index |
Need O(log N) "pop all due tasks" | Kafka: not a heap |
| Worker liveness | Redis TTL keys | 1.2K heartbeat/sec is hot row in any RDBMS | Postgres: WAL churn |
| Task logs (stdout/stderr) | Object store (S3/GCS) | 1 TB/day, write-heavy, immutable | DB: wrong cost structure |
| Audit / event stream | Kafka → BigQuery for analytics; mirror to S3 for cold audit | Append-only, high volume | DB: wrong scale |
| Leader election, config | etcd (or ZK) | Native leases, watches | Postgres advisory locks: works at small scale but no watch primitive |
Core tables (Postgres / Spanner flavor):
-- DAG authoring
CREATE TABLE dag_spec (
dag_id TEXT NOT NULL,
version INT NOT NULL,
content_hash TEXT NOT NULL, -- sha256 of serialized spec
spec_blob BYTEA NOT NULL, -- serialized YAML/proto
owner TEXT NOT NULL,
tenant_id TEXT NOT NULL,
schedule_cron TEXT, -- null for triggered-only DAGs
timezone TEXT NOT NULL, -- IANA tz
paused BOOL DEFAULT FALSE,
sla_seconds INT, -- null = no SLA
catchup BOOL DEFAULT FALSE,
max_active INT DEFAULT 16, -- concurrency cap
created_ts TIMESTAMPTZ DEFAULT NOW(),
PRIMARY KEY (dag_id, version)
);
CREATE INDEX ON dag_spec (tenant_id, dag_id) WHERE NOT paused;
-- Run-level
CREATE TABLE dag_run (
run_id UUID NOT NULL, -- UUIDv7 = time-ordered
dag_id TEXT NOT NULL,
dag_version INT NOT NULL,
logical_date TIMESTAMPTZ NOT NULL, -- scheduled time (cron)
state SMALLINT NOT NULL, -- queued/running/success/failed/cancelled
trigger_type SMALLINT NOT NULL, -- cron/manual/backfill/event
started_ts TIMESTAMPTZ,
ended_ts TIMESTAMPTZ,
tenant_id TEXT NOT NULL,
PRIMARY KEY (run_id),
UNIQUE (dag_id, logical_date, trigger_type) -- idempotency
);
CREATE INDEX ON dag_run (dag_id, logical_date DESC);
CREATE INDEX ON dag_run (state, logical_date) WHERE state IN (0,1); -- hot queries
-- Task-instance (the big table)
CREATE TABLE task_instance (
ti_id UUID NOT NULL, -- UUIDv7
run_id UUID NOT NULL REFERENCES dag_run(run_id),
task_id TEXT NOT NULL, -- logical id in DAG
map_index INT NOT NULL DEFAULT -1, -- for dynamic fanout; -1 = unmapped
attempt INT NOT NULL DEFAULT 1,
state SMALLINT NOT NULL, -- queued/scheduled/running/success/failed/up_for_retry/deferred/skipped
priority INT DEFAULT 0,
queue_name TEXT NOT NULL,
worker_id TEXT, -- null until claimed
lease_token BIGINT, -- monotonic fencing token
lease_expires TIMESTAMPTZ,
scheduled_ts TIMESTAMPTZ NOT NULL, -- when to dispatch (after backoff etc)
started_ts TIMESTAMPTZ,
ended_ts TIMESTAMPTZ,
exit_code INT,
next_retry_ts TIMESTAMPTZ,
PRIMARY KEY (ti_id),
UNIQUE (run_id, task_id, map_index, attempt)
);
-- Critical hot index: the dispatch query
CREATE INDEX ti_dispatch ON task_instance (queue_name, scheduled_ts)
WHERE state = 1 /* scheduled-ready */;
-- Partition by hash(run_id) for write spread (Postgres declarative partitioning or Spanner interleaving)
CREATE INDEX ti_lease ON task_instance (lease_expires)
WHERE state = 2 /* running */ AND lease_expires IS NOT NULL;
-- Queue partitions (if using DB-backed queue)
-- Actually a view over task_instance + queue metadata; physical queue often in Redis/Kafka.
-- Worker pool state (in Redis, mirrored periodically to DB for history)
-- Redis: SET worker:<id> {host, capacity, tags, last_hb} EX 30
-- DB (historical):
CREATE TABLE worker_event (
worker_id TEXT NOT NULL,
event_ts TIMESTAMPTZ NOT NULL,
event_type SMALLINT, -- registered/unregistered/drained/died
capacity INT,
PRIMARY KEY (worker_id, event_ts)
);
-- Triggerer state for deferred/async waits
CREATE TABLE trigger_row (
trigger_id UUID NOT NULL,
ti_id UUID NOT NULL REFERENCES task_instance(ti_id),
triggerer_id TEXT, -- which triggerer process owns it
classpath TEXT NOT NULL, -- user-provided async fn
kwargs_json JSONB NOT NULL,
created_ts TIMESTAMPTZ DEFAULT NOW(),
next_poll_ts TIMESTAMPTZ,
PRIMARY KEY (trigger_id)
);
CREATE INDEX ON trigger_row (triggerer_id, next_poll_ts) WHERE triggerer_id IS NOT NULL;
-- Immutable event log (append-only, for audit + UI timeline)
CREATE TABLE ti_event (
ti_id UUID NOT NULL,
event_ts TIMESTAMPTZ NOT NULL,
event_type SMALLINT NOT NULL, -- queued/dispatched/started/heartbeat/succeeded/failed/retried/cancelled
worker_id TEXT,
attempt INT,
message TEXT,
PRIMARY KEY (ti_id, event_ts, event_type)
) PARTITION BY RANGE (event_ts); -- daily partitions, drop after 90d
Why this schema is defensible:
- UUIDv7 for run_id and ti_id: time-ordered so BTREE inserts are append-mostly, not random. Snowflake-style IDs also fine; UUIDv7 avoids centralized ID service.
UNIQUE (run_id, task_id, map_index, attempt): this is the core idempotency key. A worker reporting success for attempt=1 cannot conflict with attempt=2's retry.lease_tokenmonotonic: fencing. If worker A is partitioned and lease expires, worker B takes over with a higher token. A's late "I succeeded" write is rejected becausetoken_old < token_new. This is Martin Kleppmann's fencing pattern — see §7.1.- Partial index on dispatch query:
WHERE state=1keeps the BTREE tiny (only "ready" rows), so SKIP LOCKED scan stays O(k) where k=batch size, not O(total_task_instances). - Rejected DynamoDB: Airflow-style needs
SELECT tasks WHERE state=scheduled AND scheduled_ts<=NOW() ORDER BY priority LIMIT 100 FOR UPDATE SKIP LOCKED. Dynamo has no SKIP LOCKED analog; you can build one with conditional writes + retry storms, but it's worse than a well-tuned Postgres up to ~5-10K/sec. - Rejected single Postgres beyond v2: at 30K writes/sec peak, single-writer Postgres WAL bottlenecks around 10-30K/sec depending on row size. Spanner or Aurora with multi-master for v2→v3. Or shard the metadata by
tenant_id(Airbnb did this with multiple Airflow deployments + a federation layer).
6 System Diagram (CENTERPIECE) #
6.1 Overall architecture
+-----------------------------------+
| USERS |
| Web UI | CLI | API clients |
| Sensors/Upstream Systems |
+------+-----------+-------+-------+
| | |
HTTPS | gRPC/TLS | webhook (event)
v v v
+------------------------------------------+-----------+-------+----------------------------+
| EDGE / API LAYER (stateless, N=8) |
| Envoy / GFE -> API Server (auth, rate limit, idem cache in Redis) |
| OpenAPI/gRPC; reads from metadata replicas; writes via leader-aware routing |
+-----+---------------+--------------------------+---------------------+--------------------+
| write path | read path | webhook | WS for UI push
v v v v
+----+-----+ +-----+------+ +----+-----+ +------+------+
| DAG Reg | | Read Cache | | Triggerer| | UI SSR |
| (parser, | | (Redis + | | Pool | | (Next.js) |
| validator| | Postgres | | N = 12 | +-------------+
| compiler)| | replicas) | | async I/O|
+----+-----+ +------------+ +----+-----+
| |
v v
+-----+-------------------------------------------------------------------------------------+
| CONTROL PLANE (leader-elected via etcd) |
| |
| +-----------------+ etcd cluster (5 nodes, Raft) <------------+ |
| | SCHEDULER POOL | - leader lease (10s TTL, 3s renew) | |
| | (N=5, 1 lead, | - /sched/leader, /sched/shards/* | |
| | 4 standby) | - compare-and-swap for shard claims | |
| | | | |
| | DagFileProcessor+-> SHARDED SCHEDULER LOOPS (64 shards) <------+ |
| | (parse every 30s) shard_id = hash(dag_id) % 64 |
| | |
| | per shard: tick every 1s: |
| | 1. SELECT due runs |
| | 2. resolve deps |
| | 3. INSERT task_instance rows (state=scheduled) |
| | 4. enqueue to Redis/Kafka |
| +-------+------------+---------------------------------------------------------+--------+
| | | |
| | 5-30 K/sec | INSERT/UPDATE |
| v v |
| +-------+------------+-------------+ +----------------------+ |
| | METADATA DB | | QUEUE LAYER | |
| | (Spanner / PG+replicas) | | Redis Streams + | |
| | - dag_spec | | Kafka (mirror) | |
| | - dag_run | | - 64 partitions | |
| | - task_instance (partitioned) | | - per-queue + per- | |
| | - ti_event (daily parts) | | priority streams | |
| | - trigger_row | | - XADD on dispatch | |
| | HA: sync replica + 2 async | | - XREADGROUP by | |
| | Backup: WAL ship + daily snap | | worker | |
| +----------------------------------+ +----+----------------+ |
| ^ |
+--------------------------------------------------|---------------------------------+
| XREADGROUP (claim)
| 10 ms p99
|
+---------------------------------------------+---------------------+
| WORKER POOL (N = 11-12K hosts) |
| |
| Worker process |
| - pulls from Redis stream (XREADGROUP count=8) |
| - ClaimTask RPC to scheduler -> lease_token (fencing) |
| - forks subprocess OR submits k8s Job (runtime-dependent) |
| - streams stdout/stderr to FluentBit -> S3 |
| - heartbeats 10s -> Redis SETEX worker:<id> |
| - on complete: ReportTaskResult (lease_token included) |
| - lease renewal every 30s while running |
| |
| Capacity tag per worker: {mem, cpu, gpu, queue_subscriptions} |
+----------+---------------------------------------------+-----------+
| |
| stdout/stderr (10 KB avg) | stdout
v v
+-------+---------+ +------+--------+
| FluentBit → | | K8s runtime |
| Log Collector | | (pod manages |
| (N=20) | | task code) |
+-------+---------+ +---------------+
|
v
+-------+--------+ +----------------+ +--------------+
| Object Store | | Kafka | -> | BigQuery / |
| S3/GCS (logs) | | ti_event bus | | Snowflake |
| 1 TB/day | | audit stream | | (analytics) |
+----------------+ +----------------+ +--------------+
6.2 Sub-diagram: scheduling loop (per-shard tick)
tick t = every 1s per shard (64 shards; one process per shard claimed via etcd CAS)
[1] SELECT ready dag_runs (state=scheduled, scheduled_ts <= NOW())
ORDER BY priority, scheduled_ts
LIMIT 500
FOR UPDATE SKIP LOCKED;
[2] For each run:
resolve DAG (cached parse in memory, reload on version change)
compute new task_instances whose deps are satisfied
(upstream states from task_instance rows for this run_id)
[3] BATCH INSERT new task_instance rows with:
state = 'scheduled'
scheduled_ts = NOW() (or + backoff for retries)
queue_name = task.queue (default: "default")
priority = max(task.prio, dag.prio, tenant.prio)
[4] For each inserted ti:
Redis: XADD queue:<queue_name>:<priority> ti_id <JSON payload>
MAXLEN ~ 1M (cap to avoid runaway)
Kafka: produce to topic "dispatch" partition=hash(ti_id)
(mirror for durability + analytics)
[5] UPDATE dag_run.state = 'running' where all children scheduled
[6] Publish to ti_event log: {ti_id, event_type=queued, ts}
Failure of step 3 after steps 1-2: transaction rollback, SKIP LOCKED releases;
next tick re-selects.
Failure of step 4 after step 3: task row exists but not enqueued.
→ periodic reconciler (every 60s) scans state=scheduled AND NOT in queue
→ re-enqueues. This is the anti-"lost dispatch" guardrail.
6.3 Sub-diagram: task lease lifecycle (THE fencing story)
[Worker pulls from queue]
XREADGROUP GROUP workers <worker_id> COUNT 8 STREAMS queue:default >
-> gets ti_id
[Worker claims] [Scheduler]
ClaimTask(ti_id, worker_id) ----------> CAS:
UPDATE task_instance
SET state='running',
worker_id=?,
lease_token = nextval(lease_seq),
lease_expires = NOW() + 60s
WHERE ti_id=? AND state='scheduled';
returns {lease_token: 42} or FAILED_PRECONDITION
[Worker executes user code]
every 30s:
ExtendLease(ti_id, lease_token=42) -> lease_expires += 60s
Scenario A (happy path):
done -> ReportTaskResult(ti_id, token=42, exit=0)
Scheduler CAS:
UPDATE task_instance
SET state='success', ended_ts=NOW()
WHERE ti_id=? AND lease_token=42;
(if 0 rows updated: stale token, worker's write is DISCARDED)
Scenario B (worker partitioned):
worker's lease_expires = T+60
worker continues running user code (cannot be killed from outside)
at T+70: scheduler's reaper: lease_expires < NOW()
UPDATE task_instance
SET state='up_for_retry', worker_id=NULL, lease_token=NULL
WHERE ti_id=? AND lease_expires < NOW() AND state='running';
re-enqueue with attempt = attempt + 1
meanwhile at T+75 partitioned worker finishes:
ReportTaskResult(ti_id, token=42, exit=0)
Scheduler: WHERE lease_token=42 → 0 rows (now token=43 or NULL)
rejected (ABORTED) → worker logs and drops
← THIS IS WHERE AT-LEAST-ONCE BITES IDEMPOTENCY:
the side-effects of attempt 1 AND attempt 2 may both have happened.
user code must tolerate (see §7.2)
Scenario C (lease extension fails):
worker's ExtendLease returns ABORTED (lease already reassigned)
worker must KILL its subprocess immediately (SIGKILL the user code
via runtime API, or orphan the k8s Job — which is less clean)
and report nothing.
→ minimizes duplicate-side-effect window but doesn't eliminate it.
6.4 Sub-diagram: failure recovery (scheduler crash)
etcd lease semantics:
scheduler leader holds /sched/leader key with TTL=10s, renew every 3s.
T=0: leader S1 holds lease, dispatching.
T=5: S1 JVM stops-the-world GC for 8s (pathological).
S1 cannot renew.
T=10: etcd lease expires. S2, S3, S4, S5 all watching /sched/leader.
T=10.1: etcd fires "key deleted" to all watchers.
T=10.1-10.3: S2-S5 race CAS to create /sched/leader with their own ID.
Only one wins (atomic CAS). Say S2.
T=10.3: S2 starts dispatching. Reads last_dispatched_offset from DB
(or from etcd /sched/last_offset); resumes SKIP LOCKED polling.
CRITICAL: fencing token on dispatch writes.
Every scheduler stamps task_instance.scheduler_epoch = etcd_lease_revision.
S1, when GC finishes at T=13, thinks it's still leader.
It tries to UPDATE task_instance ... SET scheduler_epoch=S1.epoch=100.
Condition: WHERE scheduler_epoch < 100. But S2 already wrote epoch=101.
So S1's writes are all rejected by the CAS.
S1 then reads /sched/leader, sees S2, goes to standby.
Blast radius of the 10s outage:
- 1.21K/sec × 10s = 12K tasks that should have been dispatched.
- They sit in task_instance.state='scheduled' for 10s extra.
- p99 scheduling latency spikes to ~12s during the incident window.
- SLO says p99=5s so this is a page (incident severity minor).
- No correctness violation.
If instead S1 double-dispatched during GC freeze (no fencing): same ti_id
enqueued twice → worker claims twice → two workers run user code →
DOUBLE side effects. This is the Airflow 1.10.x bug CVE-2020-17515 era.
Fencing token makes this impossible.
6.5 Arrow annotations (protocol / QPS / payload)
| Edge | Protocol | Avg QPS | Peak QPS | Payload |
|---|---|---|---|---|
| User → API | HTTPS/gRPC | 200/s | 2K/s | 1-10 KB |
| API → DB replicas | SQL | 1.5K/s reads | 10K/s | 1-50 KB |
| Scheduler → DB leader | SQL | 6K/s writes | 30K/s | 500 B/row |
| Scheduler → Redis queue | RESP | 1.2K XADD/s | 6K/s | 2 KB/msg |
| Scheduler → Kafka audit | TCP | 6K/s | 30K/s | 1 KB/event |
| Worker → Redis XREADGROUP | RESP | 1.2K claim/s | 6K/s | 2 KB/msg |
| Worker → Scheduler ClaimTask | gRPC | 1.2K/s | 6K/s | 100 B req, 200 B resp |
| Worker → Scheduler ExtendLease | gRPC | 24K/s (30s × 70K concurrent) | 120K/s | 50 B req |
| Worker → S3 log upload | HTTPS | ~5K/s | 30K/s | 10 KB (avg), buffered |
| Scheduler ↔ etcd | gRPC | ~100/s | 500/s | 100 B |
| Triggerer → DB | SQL | 200/s | 2K/s | 500 B |
ExtendLease is 120K/sec peak — the hottest RPC. Options:
- Keep it simple: Redis
EXPIRE worker:<id>:lease:<ti_id>instead of DB. - Piggyback on heartbeat: one "alive" message carries all leases.
- Variable lease durations: long-running tasks get longer leases (5min) → 10× fewer extends.
This is a real L7 detail — most candidates miss that worker-scheduler chatter dominates control-plane QPS.
7 Deep Dives #
7.1 Scheduler HA: Leader Election without Split-Brain
Why critical: Airflow 1.x shipped without leader election. Operators ran multiple schedulers "for HA" and they competed for the same task rows, double-dispatching. This caused CVE-era production incidents at Lyft, Airbnb, and Bloomberg. Airflow 2.0 (2020) shipped an HA scheduler using row-level locks (SELECT ... FOR UPDATE SKIP LOCKED) on Postgres; this works but has quirks (see below). Temporal/Cadence use a sharded model with Raft for shard ownership. Getting this right is the difference between "works in staging" and "doesn't page at 3 AM."
Three alternatives considered:
(A) Row-locking without external leader election (Airflow 2.x default)
- Multiple scheduler processes run. Each polls the DB with
SELECT ... FOR UPDATE SKIP LOCKED. Postgres gives each process disjoint rows. - Pro: no separate coordination service; simpler ops.
- Con: every scheduler hits the DB every tick → N× read load. With 5 schedulers and 1s tick, you have 5 QPS just for polling plus N× index scans.
- Con: relies on the DB's lock manager scaling. Postgres SKIP LOCKED works well up to about 5-10K ops/sec on a single node before WAL write amplification on the locks table becomes a bottleneck. Beyond that, split by
queue_shard_idphysically. - Con: no clear "primary" for non-DB operations (cache warming, DAG file parsing, metrics aggregation). Airflow works around by having each scheduler parse a subset of DAG files (
scheduler_heartbeat_sec,max_threads). Works but is awkward. - Failure mode: if one scheduler hangs in a long DB transaction, SKIP LOCKED still works (locks are released on backend death/timeout). But lock held by a zombie connection → up to
deadlock_timeout+statement_timeoutdelay before reclamation.
(B) Single leader via etcd/ZooKeeper + sharded workers (Google Borg, Temporal)
- One leader elected via etcd lease (10s TTL). Leader owns scheduling loop. Others are hot standby.
- For scale: leader doesn't do all work; it assigns shards (0..N-1) to standby nodes via etcd keys
/sched/shards/0 = node2. Each node services its shards. Leader re-assigns shards on node loss. - Pro: clean leader story; non-DB work (metrics, parsing) has a natural owner.
- Pro: fencing tokens come free (etcd revision number is monotonic).
- Con: etcd becomes a critical dependency. etcd outage → everyone thinks they're leader → use fencing token to reject stale writes → degradation is "no new dispatches" not "double dispatches."
- Con: more moving parts to reason about.
(C) Event-sourced workflow engine (Temporal/Cadence model) — deferred to v3
- Workflow state is stored as an event log. Any worker can pick up any workflow by replaying from the log. No "scheduler" in the classic sense.
- Pro: no leader needed; no split-brain possible.
- Pro: deterministic replay → effective exactly-once at the workflow level.
- Con: requires user code to be deterministic (no
time.time(), no unsaferandom(), etc.). Workflow SDK enforces this via lints + runtime checks. - Con: reshaping the programming model. Not compatible with Airflow's "arbitrary Python task" ethos.
- Verdict: put in v3 roadmap; don't try to ship v1 this way.
Chosen: Hybrid of (A) and (B) for v2:
- etcd-based leader election for the control plane duties (DAG file parsing allocation, shard re-balancing, cluster-wide state like quotas, maintenance mode).
- 64-shard scheduling loop: each shard claimed via etcd CAS on
/sched/shards/<n>. Each shard polls DB withSELECT ... WHERE hash(dag_id) % 64 = <shard> ... SKIP LOCKED. - Fencing token: every scheduler stamps
scheduler_epochon writes; stale leader's writes rejected. - DB ops/sec stays manageable because each shard scans a partition, not the whole table.
Quantified: at 6K writes/sec peak, 64 shards = 94 writes/sec/shard on average. Each shard's polling query hits (queue_name, scheduled_ts) partial index on a partition of ~100K rows → sub-ms plan. This is Airflow 2.7+ architecture essentially.
Failure modes & mitigations:
| Failure | Detection | Mitigation | Recovery |
|---|---|---|---|
| Scheduler leader GC-freeze | etcd lease expires | Standby takes over via CAS; old leader's writes rejected by fencing | 10-15s of missed dispatches |
| Partial network partition: leader can reach DB but not etcd | Leader sees etcd errors; stops dispatching preemptively | After 2 consecutive etcd timeouts, voluntarily step down | Standby takes over |
| etcd cluster minority partition | Members report ErrNoLeader |
Scheduler can't renew lease → steps down → dispatching halts until etcd heals | Dispatching stops (CP system); tasks pile up in DB; no corruption |
| Two schedulers both think they're leader | Fencing token mismatch on DB write | CAS rejects one → steps down | Degradation = missed writes, reconciler picks up |
| Shard never claimed (all claimants died) | Leader's shard-health check every 10s | Leader re-assigns to another standby | 10-30s of that shard's DAGs being behind |
The earned-secret detail most candidates miss: The interaction between scheduler lease TTL, worker lease TTL, and DB failover timing. If you set scheduler lease=10s and DB failover=30s, then on DB primary failure: scheduler leader can't write → doesn't renew etcd lease → dropped as leader → a new leader comes up but also can't write → churn. The fix: scheduler should hold leadership across transient DB errors; only drop leadership if etcd itself is unreachable. Otherwise you get "re-election storms" during DB failovers that compound the outage.
7.2 Effective Exactly-Once via Fencing + Idempotency
Why critical: At 100M executions/day, if 0.1% duplicate-execute, that's 100K double-runs/day. If even 1% of tasks have non-idempotent side effects (charging a credit card, sending an email), that's 1K customer-affecting duplicate events per day. Exactly-once is the holy grail; at-least-once + idempotency is the practical substitute. This is where Airflow's "just retry and hope it's idempotent" approach breaks at scale.
Three approaches:
(A) Pure at-least-once + trust-the-user
- Platform: task can run 0 or N times; user guarantees safe retries.
- Pro: simple; no platform-imposed constraints on user code.
- Con: users don't get it right. Every large Airflow user I've talked to at Meta and elsewhere has "duplicate email" or "double-charged" incidents in their history.
- Con: debugging is an exercise in blame-shifting.
(B) At-least-once + platform-fenced writes (fencing tokens exposed to user code)
- Platform passes
(run_id, ti_id, attempt)to user task as a fencing token. User code is expected to use it on any external write:INSERT ... ON CONFLICT DO NOTHINGwith that token as the natural key, or CAS with token as a version, or idempotent API calls keyed by the token. - Pro: the platform provides the mechanism; user code has a clear contract.
- Pro: works across heterogeneous user code (SQL, HTTP, Kafka).
- Con: user code must be audited. No compiler enforcement.
- Con: doesn't help for side-effects the user forgot to key (e.g. "send email" where email API doesn't expose idempotency key — workaround: wrap with an idempotency table in the user's DB).
(C) Deterministic replay + event sourcing (Temporal model)
- Workflow state is an event log. Each user workflow decision → event. Replaying events reconstructs state deterministically. Workflow worker compares replay events to recorded events → cannot "forget" or duplicate.
- Platform distinguishes: workflow functions (deterministic, replayable) and activities (non-deterministic, at-least-once with user idempotency).
- Exactly-once at the workflow orchestration level is real. Activities still need to be idempotent — but the blast radius of a duplicate activity is much smaller because the workflow records "I already called activity X and got result Y" and won't re-call it on replay.
- Pro: best-in-class correctness. Used by Uber, Coinbase, Airbnb (Temporal), Stripe (internal), Snap.
- Con: programming model constraint. Requires SDK + language support (Go, Java, Python, TypeScript, Ruby today).
- Con: not a fit for Airflow-like "arbitrary Python task" ecosystem without major migration.
Chosen for v2: approach (B) with a strong platform stance. The AttemptContext API gives every task a fencing token; our SDK provides helpers for the 5 most common sinks (Postgres, MySQL, Kafka, S3, HTTP via idempotency-key header). Deterministic replay is v3.
Implementation detail:
# User code contract
@task(idempotency_required=True)
def charge_customer(customer_id: str, amount: int, ctx: AttemptContext):
# ctx.fencing_key = f"{ctx.run_id}:{ctx.ti_id}:{ctx.attempt}"
# NOT {attempt} alone — we want to key on the INSTANCE, so retries
# with attempt=2 are a different key (we want to actually re-run the
# *logical* retry). But see note below.
# For a charge: we want exactly one charge per (run_id, ti_id).
# The attempt doesn't matter for the external effect — a retry means
# "the previous attempt failed, retry the SAME logical charge."
idem_key = f"charge:{ctx.run_id}:{ctx.ti_id}" # NO attempt!
response = stripe.Charge.create(
amount=amount,
currency="usd",
customer=customer_id,
idempotency_key=idem_key,
)
return response.id
Subtle: the fencing token for external side effects omits the attempt (we want ONE side effect across all attempts). The fencing token for worker ownership (DB lease_token) includes the attempt because we want to tell worker-A-on-attempt-1 apart from worker-B-on-attempt-2. Two different fencing tokens, two different purposes, commonly conflated.
Quantitative: Stripe's idempotency layer gives <1/10^9 collision on a 24h window; equivalent for most modern APIs. Our platform SDK stores fencing keys in the metadata DB for 7 days as a belt-and-suspenders against user SDK bugs.
Failure modes:
| Failure | Mitigation |
|---|---|
| User forgot to use fencing key | Platform lint catches common patterns (stripe, sql INSERT without ON CONFLICT, etc.); CI integration. For non-SDK sinks: review mandatory. |
| External system doesn't support idempotency keys | Wrap with "outbox table" pattern in user's DB: write intent + fencing key → then external call → then mark outbox row done. On retry, check outbox first. |
| Fencing token collision (theoretical) | UUIDv7 gives 10^-12 collision over 100 years; ti_id from a single generator across the cluster. Not a real risk. |
| Zombie worker completes after reassignment | lease_token CAS rejects the success report. Worker logs and aborts. Side-effect may have happened → idempotency on the external side handles it. |
| Multiple retries exhaust idempotency window | Set window > max_retry_total_duration (typically 7 days for our config). |
Earned secret: you cannot fence the user code's subprocess. If worker's lease expires, the runtime (k8s) will SIGTERM the pod eventually, but during the gap, user code can keep running and issue external writes. Temporal/Cadence solve this by making the activity call home before every external write — the server gates each call. Airflow doesn't do this. At 100M/day scale, we need something in between: our SDK's I/O primitives for external systems do a heartbeat + lease_check before the actual call. If lease is stale, the helper raises an exception. User code can still bypass the SDK and call raw requests.post, but the SDK's common helpers are the 80% path.
7.3 Long-Tail and Skew: Keeping the Queue Flowing
Why critical: Airflow users constantly complain about "my queue is full of a 6-hour task and nothing else can run." At 100M/day with p99=1hr task duration, roughly 1% of tasks consume >1 hour of worker time. Math: 0.01 × 100M × 3600s = 3.6M "slot-seconds" per day from the tail, on a pool with 350K slots = 86,400 sec/day each = 30.2G total slot-seconds/day. 3.6M / 30.2G = 12% of pool capacity permanently occupied by long-tails. If they all land on one priority queue, they starve everything else.
Three mitigations, stacked:
(A) Separate priority queues + weighted fair queueing
- Queues:
default,priority-high,priority-low,batch(long-running), plus per-tenant queues for noisy neighbors. - Workers subscribe to queues with weights: 40%
priority-high, 40%default, 20%batch. Workers pull from each weighted-round-robin. - Pro: simple, deployable.
- Con: static weights don't adapt.
(B) Dedicated worker pools for long-running tasks
- Task-level hint:
pool='long'. Scheduler routes to workers taggedpool=long. - Pool has its own sizing. When long-pool is full, tasks wait there, not in the main queue.
- Pro: hard isolation. Main queue never blocked by long tails.
- Con: requires user to annotate tasks accurately.
- Con: under-utilization if long pool has spare capacity.
(C) Preemption (last resort)
- Scheduler can evict a low-priority task in favor of a high-priority one.
- Requires user code to handle
SIGTERM+ checkpoint → messy. Few users implement. - Only sensible for tasks the platform can safely re-run from scratch.
Chosen: (A) + (B) layered:
- Three default queues:
default,high,long. - Workers report
capacity_by_queue. - Per-tenant: a
tenant:<id>queue with a reservation (≤N% of total pool) — this is the noisy-neighbor defense. - Quota manager: cluster-level token bucket per tenant; requests exceeding quota queued in tenant queue with lower priority.
Skew within a DAG (fan-out with one laggard):
- DAG
extract→ 100 paralleltransform_i→ singleload. Iftransform_42is 10× slower, the DAG can'tloaduntil it completes. - Solutions:
- Speculative retry — if transform_42 runs >2× p95 of its siblings, start a duplicate and take first result. Requires idempotency.
- Relaxed trigger rules —
loadcan run onall_done(notall_success), accepting partial data. - Data partitioning — if transform_42 is slow because its input shard is huge, pre-partition into
transform_42_a, _b, _c. Scheduler supports dynamic task mapping (Airflowtask.expand()).
Real-system comparisons:
| System | Approach | Limitation at 100M scale |
|---|---|---|
| Airflow 2.x CeleryExecutor | Queue per priority; pool feature |
Pool is a global mutex, contended at 10K+ concurrent |
| Airflow KubernetesExecutor | Each task = pod; no queue starvation (scheduler pressure moves to k8s) | Pod start latency (~30s cold) blows p99 scheduling latency |
| Temporal | Sharded task queues; poll-based workers | Shard rebalance on worker add/remove causes latency blip (~500ms-5s) |
| Argo Workflows | K8s-native; per-step pods | Same as Airflow K8sExecutor; pod start dominates |
| AWS Step Functions | Managed; stateless tasks | Good for short tasks; long-tail still occupies RUNNING state budget |
| Google Borg/Omega | Batch vs service priority tiers | Similar conceptually; our "queue" is their "priority band" |
Failure modes & mitigations:
| Failure | Mitigation |
|---|---|
| All workers subscribed to same queue; one tenant blasts 10K tasks | Per-tenant quota enforced at scheduler dispatch (refuses to enqueue beyond quota) |
| Long-pool starves when there's no long work | Workers can subscribe to multiple queues with priority; if primary queue empty, pull from secondary |
| Small task stuck behind large batch | Separate queues by SIZE (default = <5min tasks, long = ≥5min) with estimator from historical avg |
| Priority inversion: low-prio task holds a shared pool resource | Pool-level priority inheritance: promote pool-holder's priority when high-prio task is blocked |
Earned secret: The naive approach is "one big queue, sort by priority." It fails because pulling a batch of 100 tasks all of which are tiny from a queue where a huge task is item #1 requires you to skip-and-continue, which Redis Streams / Kafka don't natively do. Redis XREADGROUP returns FIFO within a consumer group. To get priority you need separate streams per priority and worker-side merging with weighted selection. Kafka similarly needs topic-per-priority. This leaks implementation detail into the task routing layer and is why almost every real production scheduler ends up with N queues per priority tier × M queues per tenant/pool = lots of queues. At 100M/day you're running 256+ Redis streams across sharded clusters.
8 Failure Modes & Resilience (Runbook Table) #
| Component | Failure | Detection | Blast radius | Mitigation | Recovery (RTO) |
|---|---|---|---|---|---|
| Scheduler leader | JVM crash / OOM | etcd lease not renewed within 10s | Dispatching halts for ~10-15s; ~12-18K tasks' scheduling latency spikes |
Standby takes over via etcd CAS; fencing token ensures no double-dispatch | <15s |
| Scheduler shard owner | Process hangs mid-tick | Leader's shard-health timer (30s) | Only that shard's DAGs affected (~2% of fleet) | Leader re-assigns shard via etcd; SKIP LOCKED frees abandoned rows | 30-60s |
| Worker | Host crash mid-task | Lease expires at 60s | Task attempt 1 abandoned; re-enqueued as attempt 2 | Scheduler reaper sweeps expired leases; CAS reset state, re-enqueue | 60-90s per task |
| Worker | Slow heartbeat (GC pause) | Lease at 60% → worker self-steps-down gracefully | Task killed; re-enqueued | Worker SDK kills subprocess on lease-expiry-approaching | Prevention, not recovery |
| Metadata DB primary | Hardware failure | DB proxy / Patroni detects | All writes blocked 15-30s | Sync replica promoted; scheduler retries dispatch; read replicas handle UI | 30s |
| Metadata DB (total) | Region outage | Cross-region failover | Control plane down in primary region | Warm standby in secondary region; DNS cutover; RPO=1min of lost dispatches (idempotency saves us) | 15 min |
| Queue (Redis) | Primary node down | Redis Sentinel / Cluster | Claims for that partition stall | Replica promoted; XADD retried; idempotency on ti_event prevents re-enqueue loops |
<30s |
| Queue | Total Redis cluster loss | Monitoring | Live tasks lost their "claim ticket" | Scheduler rebuilds queue from task_instance WHERE state='scheduled' AND NOT in_redis |
5-10 min |
| etcd cluster | Loss of quorum (minor partition) | Leader sees ErrNoLeader |
Scheduler can't renew; steps down; NO new dispatches | Wait for etcd recovery; running tasks complete; new tasks delayed | Duration of partition |
| Clock skew | Worker clock +30s ahead | Heartbeat ts skew monitor | Leases evaluated against worker clock fire early → premature task kills | All time comparisons use scheduler/DB clock (NOW() in SQL); NTP hard req on workers (max 100ms skew); lease math on server side only | Prevention |
| Clock skew | Scheduler clock jumps backward (NTP slew) | Alert on NOW() non-monotonic |
Cron triggers could re-fire | Use MAX(last_fired_ts, NOW()) + store last_fired in dag_run with UNIQUE constraint (idempotency) |
Correctness maintained |
| DST transition | 2 AM local happens twice (fall back) or skipped (spring forward) | Tests + monitoring | Cron 0 2 * * * fires twice or zero times |
Scheduler evaluates cron in UTC internally, converts to tz for display; policy: "nonexistent-time = skip, ambiguous-time = run once (first)"; document on every DAG | Documented behavior |
| Poison task | Same task fails all retries | attempt >= max_retries |
DAG fails; SLA alert fires | Move ti to DLQ table; surface in UI; block DAG if user opts in to "halt on poison" | Manual intervention |
| DLQ overflow | DLQ grows > 1M rows | DLQ size metric | UI slow; operator fatigue | Auto-expire after 30 days; alert ops; per-DAG DLQ quotas | Auto + manual triage |
| Triggerer crash | All deferred tasks stuck | Triggerer heartbeat to DB | Deferred tasks in limbo until reaper | Another triggerer claims orphaned triggers via TTL; rehydrate state from trigger_row | 30-60s |
| Log collector (FluentBit) | Downstream S3 errors | Collector backlog grows | Logs buffered in local disk (bounded); user sees stale logs in UI | Backpressure to worker bounded by disk; 24h buffer | Minutes to S3 recovery |
| Massive cron alignment (top-of-hour) | 100K DAGs all cron 0 * * * * |
Burst detector in scheduler | Queue depth spikes; scheduling latency p99 breach | Automatic scheduling jitter added to cron (0-60s); token bucket on tenant dispatch rate | Self-healing, with SLO dip |
| Backfill storm | Operator backfills 30 days of DAGs manually | Monitoring | Primary queue overwhelmed | Backfill goes to a separate backfill queue with dedicated pool (5% of fleet); rate-limit |
Auto |
| Noisy tenant | Tenant A runs 50M tasks in one day | Per-tenant counters | Could starve other tenants | Per-tenant quota: rate-limit TriggerRun + dispatch at scheduler; oversubscription = 429 |
Policy |
Monitoring SLIs (pager-relevant):
scheduler_dispatch_latency_p99: target <5s; page at >10s for 5 minscheduler_leader_elections_per_hour: target <1; page at >3worker_lease_expirations_per_min: baseline established per fleet; page on 5× baseline (indicates partition or overload)metadata_db_replication_lag: target <100ms; page at >1squeue_depth_p99: target <10K; page at >100K (cascade incoming)dlq_size_growth_rate: page on >100 tasks/hour moving to DLQtrigger_row_stuck_count: tasks deferred >2× timeout — page
9 Evolution Path #
v1 — Single-node Airflow clone (0→1M tasks/day)
- Single scheduler process, single Postgres, no HA.
LocalExecutor(subprocess) orCeleryExecutorwith one Redis broker.- No triggerer; deferrable = sensor-poll = worker slot occupied.
- Audience: first 2-3 teams inside the company.
- What breaks first: sensors eat worker slots; Postgres CPU at ~5K/sec writes; one scheduler → SPOF.
v2 — HA scheduler + queue + triggerer (1M→100M tasks/day)
- 5-node scheduler with etcd leader election + 64-shard sharding.
- Redis Streams queue (sharded), Kafka mirror for audit.
- Triggerer pool (12 nodes, asyncio).
- Postgres → Spanner migration OR Postgres + Redis-backed event store to offload write volume.
- Worker pool with heartbeat + lease + fencing token.
- DLQ, per-tenant quota, priority queues.
- Web UI served from read replicas + Redis cache.
- Observability: Prometheus + OpenTelemetry traces on scheduler ticks and dispatch paths.
- This is "what I'd build for interview" and covers 100M/day.
v3 — Event-sourced multi-region (100M→10B tasks/day OR critical-correctness workloads)
- Migrate to Temporal-style event-sourced workflow engine for the critical-correctness tier (e.g. payment workflows, privacy DSARs). Non-critical batch stays on v2.
- Shared-nothing shards on Cassandra or Spanner; each shard owns a subset of workflows; workflow is a replayable sequence of events.
- Multi-region active-active: workflow state replicated globally; leader per shard region-pinned with failover.
- Deterministic replay: workflow code (not activity code) must be deterministic; SDK enforces via sandboxing.
- Effective exactly-once at workflow level; activities still at-least-once with idempotency.
- Multi-tenant isolation: per-tenant namespaces with soft and hard quotas; dedicated cells for largest tenants.
- Regulatory: immutable append-only event log with hash chaining (BlockchainDB-lite for tamper evidence); per-record TTL for GDPR but with audit-safe "deletion tombstones."
Migration strategies:
- v1→v2: dual-write DAG state from v1 and v2 schedulers against the same DB; gradually cut traffic per-DAG via routing table; rollback if error rate up.
- v2→v3: side-by-side clusters; new DAGs opt into v3; old DAGs stay on v2; never migrate live DAGs (too risky).
10 Out-of-1-Hour Notes (depth-on-demand) #
Deterministic replay vs event-sourced vs checkpoint-and-resume:
- Checkpoint-and-resume (Flink, Spark): every N seconds snapshot state. On failure, reload snapshot, replay input since. Requires state to be serializable. Good for data streams; clumsy for workflows.
- Event sourcing + replay (Temporal): state is derived from events; replay reconstructs. Works for long-running workflows. Requires user code determinism.
- Neither (Airflow): each task is stateless from the platform's view; user must design retries to be idempotent. Simplest; weakest guarantees. Breaks at high scale + high correctness.
- Rule of thumb: choose event sourcing if any task has >10 steps with interleaved external effects; choose checkpoint for batch/streaming; choose "nothing" for ops that are naturally idempotent.
Cron semantics around DST:
- Stripe, AWS, and GCP Cloud Scheduler all have had incidents from ambiguous DST interpretation.
- Two competing philosophies:
- Wall-clock fidelity:
0 2 * * *fires when local clock shows 2:00. In fall-back, fires twice (2:00 DST, then 2:00 standard). In spring-forward, skipped. - UTC stability: cron expression evaluated in UTC; ignores DST. Predictable but surprising for users who want "every night at 2 local."
- Wall-clock fidelity:
- Our policy: internally store DAGs with explicit
(cron, tz). Evaluator useszoneinfo+dateutil.rrule. Ambiguous time: fire ONCE (the first occurrence). Nonexistent time: SKIP. Policy documented. UI displays "next 3 runs" so users catch misconfiguration before it bites.
Sensor storms: polling vs triggerer-push:
- Traditional: a sensor is a task that loops
while not condition: sleep(poke_interval). Each sensor occupies a worker slot. 10K sensors × 1 slot each = 10K slot-hours/day of waste. - Triggerer: an async process that registers with the condition source (S3 bucket event, Kafka topic position, REST API). Lightweight: one triggerer can handle 10K+ concurrent waits via asyncio + aiohttp. Uses O(1) threads, O(N) memory where N = trigger count.
- When condition fires: triggerer writes to DB, scheduler picks up, schedules downstream task.
- Backpressure: if triggerer pool falls behind, triggers queue in DB; new triggers go to least-loaded triggerer.
- Failure: triggerer crash → orphan triggers reclaimed by another via TTL → continuity at cost of 30-60s delay.
Back-pressure propagation from workers → scheduler:
- If workers can't keep up, queue depth grows. Scheduler should stop dispatching new work until queue drains rather than pile up Redis memory / Kafka lag.
- Signal: per-queue depth → scheduler reads every tick → if >threshold, skip dispatching to that queue this tick.
- Also: per-tenant dispatch token bucket refilled based on queue drain rate. Fast-draining tenant gets more tokens.
- Alternative: rely on Kafka producer blocking. Works but couples semantic state to a transport layer.
Multi-tenant resource quotas:
- Quotas at 4 levels: (1) rate of DAG registration, (2) rate of DAG trigger, (3) concurrent running task_instances, (4) log storage MB.
- Each level: soft limit (throttle), hard limit (reject).
- Fair queueing: tenants share pool but with weights; a starving tenant's pending count grows → its dispatch priority increases (dynamic weight).
- This is the "dominant resource fairness" idea (DRF, Google Borg). Overkill for small deployments, essential at 100M/day.
Regulatory audit (immutable run log):
ti_eventtable is append-only. Daily partitions are made read-only after 24h (Postgres table-levelREVOKE UPDATE, DELETE).- Mirror to object store as Parquet files with SHA-256 hash chain (each file's hash includes prev file's hash) — tamper evidence without full blockchain.
- GDPR tension: user can request deletion of PII in run params. Resolution: store PII in a separate
params_piitable with FK; drop the PII table on deletion request; audit log retains the event (task X ran) but not the data (for user Y). Must be designed, not retrofitted. - SOX compliance: keep all state transitions for 7 years.
Cost per execution:
- From §3:
$0.0017 per task. Dominated by compute (95%). Control plane cost per task: ~$0.0001. - Comparison: AWS Step Functions Standard is $0.025 per state transition — a typical workflow with 20 transitions = $0.5. Ours is $0.034 for 20 tasks. 15× cheaper, at the cost of running the infrastructure yourself. Break-even: ~500K runs/day.
- Lambda equivalent: $0.20 per million invocations + $1.67 per GB-hour. A 60s 1GB Lambda task = $0.00102 (compute) + $0.0000002 (invocation) = $0.001. Similar, but Lambda hides queue/scheduler cost.
Discontinuous optimizations as scale climbs:
- 10K/day → 1M/day: add retries and UI polish.
- 1M/day → 10M/day: move logs to object store, add queue metrics, begin to tune DB.
- 10M/day → 100M/day: shard scheduler, migrate queue to Redis Streams, triggerer pool mandatory.
- 100M/day → 1B/day: metadata DB must shard or migrate to Spanner. Queue must move to Kafka (10x write throughput over Redis). Workers must co-locate with runtime (data locality).
- 1B/day → 10B/day: multi-region, event-sourced. Everything above doubled.
Observability maturity curve:
- L0: logs in files. (v1)
- L1: Prometheus metrics, Grafana. (v1 end)
- L2: distributed tracing across scheduler→queue→worker→runtime (OpenTelemetry). (v2)
- L3: per-DAG SLO dashboards with auto-attribution ("this DAG is slow because queue X is full"). (v2 end)
- L4: cross-system causal analysis — integrate with infra telemetry ("DAG spike coincides with DB replica lag spike"). (v3)
Chaos / disaster drills:
- Monthly: kill scheduler leader, verify failover <15s.
- Monthly: kill a random queue shard, verify task continuity.
- Quarterly: DB failover rehearsal (non-prod first, then planned maintenance window in prod).
- Annually: simulated region outage.
- Per-change: every deploy goes through staged rollout (1%, 10%, 50%, 100%) with auto-rollback on SLO breach.
Things I'd refuse to build in v1 if I had authority:
- "Exactly-once" marketing claim. Promise at-least-once + idempotency; don't lie.
- Synchronous cross-DAG dependencies as a first-class feature (it's a coupling anti-pattern; use events instead).
- In-scheduler business logic (like "send Slack when task fails"). Put it in a notifier service that subscribes to the event bus.
Closing: Why this design, in one paragraph #
100M task-instances/day is beyond single-node Airflow and inside the comfort zone of a sharded scheduler with an external queue. The critical invariants are: (1) source of truth is a transactional DB with a monotonic fencing token on every write, so split-brain can't corrupt; (2) dispatch via row-level locks with SKIP LOCKED up to ~5K/sec per shard, horizontally scaled via hash(dag_id) % 64 partitioning; (3) workers are cattle with lease-based ownership, leaning on idempotency (both platform-enforced and user-contracted) for at-least-once to feel like exactly-once; (4) operational escape hatches (DLQ, per-tenant quotas, priority queues, triggerer for deferrable waits) so that the long tail and noisy neighbors never starve the system. When scale outgrows this (>1B/day or harder correctness needs), evolve to Temporal-style event sourcing. When a Google L7 interviewer asks "how do you prevent double-dispatch on scheduler failover?" the one-line answer is: "fencing token on every scheduler-origin write, validated by CAS in the DB; old leader's writes are rejected by definition, so double-dispatch becomes a definitional impossibility, not a probabilistic one." Everything else is plumbing.