Key Question
How does Apache Flink use Chandy-Lamport snapshots for exactly-once stream processing?
Deep Dive
Apache Flink is a distributed stream processor. It runs a DAG of operators (source → transformations → sink). For fault tolerance, Flink needs to snapshot the state of every operator. It uses a variant of Chandy-Lamport called asynchronous barrier snapshotting (ABS).
Barriers = Markers: Flink’s “markers” are called checkpoint barriers. They are lightweight messages injected into the data stream by the source operator. Barriers flow downstream along the data edges — exactly like Chandy-Lamport markers flow along communication channels.
Source --[data, data, BARRIER_n, data]--> Map --[data, BARRIER_n, data]--> Sink
|
Operator snapshots
its state when it
receives the barrier
Aligned checkpoints (default):
- Source injects barrier
ninto each input stream. - When a downstream operator receives barrier
non an input, it waits for barriernon all its inputs (buffering the data that arrives in between). This is the “alignment” step. - Once all barriers arrive, the operator snapshots its state and emits barrier
ndownstream. - Alignment guarantees exactly-once: any data before the barrier is included in the state; any data after belongs to the next epoch.
Unaligned checkpoints (low-latency mode): The operator does NOT wait for alignment. It snapshots its state and any in-flight buffered data immediately. This avoids backpressure spikes but produces larger snapshots. Chandy-Lamport’s marker-plus-in-transit-recording is the same idea.
Recovery: If a worker fails, Flink restarts all operators from the last completed checkpoint and replays the source data from the corresponding offset. Because the checkpoint is a consistent cut, no data is lost or duplicated — exactly-once semantics.
Check Your Understanding
- What is the Flink equivalent of a Chandy-Lamport marker?
- What is the difference between aligned and unaligned checkpoints?
- How does Flink achieve exactly-once semantics using checkpoints?
The “So What?”
Flink is the most prominent real-world example of Chandy-Lamport at scale — processing millions of events per second across hundreds of nodes. The barrier snapshotting algorithm is the reason Flink can offer exactly-once guarantees. When someone says “exactly-once stream processing,” they mean “Chandy-Lamport checkpoints.”
✏️ Exercises
Exercises: Chandy-Lamport Snapshots
-
Marker Reception: In the Chandy-Lamport algorithm, process P receives a marker message on channel C. P has NOT yet recorded its local state. What does P do? Be specific.
-
In-Transit Messages: Process P has already recorded its local state. It now receives a marker on channel C. Why must P record all application messages received on C between P’s state recording and this marker? What would go wrong if it didn’t?
-
Consistent Snapshot vs. Checkpoint: A single-machine system takes a “checkpoint” by pausing all threads, writing their stacks and heap to disk, and resuming. A distributed system uses Chandy-Lamport. What is the fundamental difference in approach, and why is Chandy-Lamport’s approach necessary in the distributed setting?
👁️ View Solutions
Solutions: Chandy-Lamport Snapshots
Exercise 1
P does the following:
- Records its current local state (all variables, data structures, etc.).
- Marks channel C as “recorded” with no in-transit messages (the channel the marker arrived on is considered finished).
- Sends a
MARKERmessage on every outgoing channel. - From now on, P records all application messages received on channels it has already snapped (other than C) until a marker arrives on that channel.
Exercise 2
These messages were sent after the sender took its snapshot (because the marker arrived after P’s snapshot, and markers are sent after the sender’s snapshot). But they were received before the marker on C (since they arrived before it). Without recording them, they would be “lost” — included in neither P’s state (recorded too early) nor the sender’s state (sent too late). This would violate the consistent cut property: a message was sent but is missing from the snapshot, making the snapshot inconsistent.
Exercise 3
Single-machine checkpoint: Pauses all threads globally, then captures state. Nothing moves while you capture — this is a “stop-the-world” approach. It assumes shared memory and global visibility.
Distributed snapshot (Chandy-Lamport): Processes continue running while the snapshot is taken. Markers are used to delineate the cut without global coordination. Each process records its state at a different “logical” time (when it first receives a marker), not the same wall-clock time.
Why Chandy-Lamport is necessary: In a distributed system, there is no global clock, no shared memory, and you cannot pause all processes atomically (the pause message itself would take unpredictable time to arrive). Stop-the-world would require synchronizing across an asynchronous network, which is impossible. Chandy-Lamport’s marker-based approach works without any of these assumptions.