Yesterday, we learned how to make a system hit the brakes when it’s overloaded. But there’s an even more important question: How does the courier know how far they’ve gotten?
Today, let’s talk about checkpoint.
The Courier’s Memory Problem: Which Package Am I On?
Imagine you’re a hardworking courier. You’ve delivered 500 packages in a single day. Suddenly—your phone runs out of battery and the system restarts.
The bad scenario:
The system says: “Let’s start over from package #1.”
You: “What? I already delivered up to #487!”
The ideal scenario:
The system says: “Welcome back. You stopped at package #487.”
You: “Perfect. I’ll continue from #488.”
That’s the core problem we’re solving today:
How do we accurately record progress and resume from the correct position?

Think of Checkpoint as Regular Progress Reports
In the world of stream processing, a checkpoint is like a courier regularly calling the dispatch center:
“I’m on Route A, just finished package #157. On Route B, I’m at package #89.”
The dispatch center writes this down on a whiteboard:
Delivery Progress Log
Route A: ✓ Package #157
Route B: ✓ Package #89
Last updated: 14:30
Now, even if the courier’s phone dies, they can restart from the correct spot.
A Simple Checkpoint Implementation
Let’s add this regular progress reporting mechanism to our Simple Streaming system.
Note: All code in this article is pseudo-code for educational and conceptual illustration. Focus on understanding the architecture and logic, not on running the exact code. To keep today’s focus clear, we’ll simplify some implementation details from previous days and concentrate on the checkpoint mechanism itself.
Designing the Progress Tracker
Our checkpoint system needs to track and persist processing progress. Here’s the high-level flow.
Checkpoint Core Workflow
Step 1: Message Processing
┌─────────────────────────────────────┐
│ New Message Arrives │
│ (topic: orders, partition: 0) │
└─────────────────────────────────────┘
│
▼
┌─────────────────────────────────────┐
│ record_message() │
│ │
│ Record: orders[0] → offset 487 │
└─────────────────────────────────────┘
Step 2: Time Check
┌─────────────────────────────────────┐
│ should_commit()? │
│ │
│ current_time - start >= 5.0s │
└─────────────────────────────────────┘
│ YES │ NO
▼ ▼
┌─────────────────┐ ┌─────────────────┐
│ Trigger │ │ Continue │
│ Checkpoint │ │ Processing │
└─────────────────┘ └─────────────────┘
Step 3: Two-Phase Commit
┌─────────────────────────────────────┐
│ commit() │
│ │
│ Phase 1: Flush all sinks │
│ Phase 2: Save to persistent storage │
└─────────────────────────────────────┘
Tracking Message Processing
Each time a message is processed, we record its topic, partition, and offset.
class SimpleCheckpoint:
"""
A simple progress tracker.
Like a delivery logbook that records how far each route has progressed.
"""
def __init__(self, commit_interval: float = 5.0):
self.commit_interval = commit_interval
# Current progress (not yet persisted)
self._offsets = {} # {(topic, partition): offset}
# Confirmed progress (persisted)
self._committed_offsets = {}
# Timing
self._created_at = time.time()
logger.info(
f"Checkpoint tracking started: saving progress every {commit_interval} seconds"
)
def record_message(self, topic: str, partition: int, offset: int):
"""Record delivery of one package."""
tp = (topic, partition)
self._offsets[tp] = offset
logger.debug(f"Progress recorded: {tp} → package #{offset}")
def should_commit(self) -> bool:
"""Check whether it’s time to report progress."""
return (time.time() - self._created_at) >= self.commit_interval
def reset(self):
"""Reset the checkpoint and start a new timing cycle."""
self._created_at = time.time()
self._offsets.clear()
logger.debug("Checkpoint reset; starting a new cycle")
Implementing the Two-Phase Commit
When it’s time to save progress, we execute a two-phase commit.
Two-Phase Commit Flow
Phase 1: Flush All Sinks
┌─────────────────────────────────────┐
│ _flush_sinks() │
│ │
│ PostgreSQL Sink: flush buffer │
│ File Sink: flush buffer │
│ Other Sinks: flush buffer │
└─────────────────────────────────────┘
│
▼
Phase 2: Commit Progress
┌─────────────────────────────────────┐
│ _commit_progress() │
│ │
│ orders[0] → offset 487 │
│ Save to checkpoint.json │
└─────────────────────────────────────┘
│
▼
┌─────────────────────────────────────┐
│ Return Success │
│ logger.info("Progress saved") │
└─────────────────────────────────────┘
Phase Breakdown - Ensure Data Is Delivered
Before recording progress, we ensure all sinks have successfully written their data.
def commit(self) -> bool:
"""
Two-phase progress commit:
1. Confirm all delivery points received the data
2. Report progress to the dispatch center
"""
logger.info(f"Starting checkpoint commit")
try:
logger.debug("Phase 1: flushing all sinks")
self._flush_sinks()
logger.debug("Phase 2: committing progress")
self._commit_progress()
logger.info("Checkpoint committed successfully")
return True
except Exception as e:
logger.error(f"Checkpoint commit failed: {e}")
return False
def _commit_progress(self):
"""
Report progress to the dispatch center — the core functionality.
"""
logger.info("Reporting progress:")
for (topic, partition), offset in self._offsets.items():
self._committed_offsets[(topic, partition)] = offset
logger.info(f"{topic}[{partition}] → package #{offset}")
self._save_to_persistent_storage()
def _save_to_persistent_storage(self):
"""
Save the checkpoint to persistent storage
Ensure progress can be recovered after a system restart
"""
try:
# Save to the filesystem
checkpoint_data = {
'committed_offsets': self._committed_offsets,
'timestamp': time.time()
}
with open('checkpoint.json', 'w') as f:
json.dump(checkpoint_data, f)
logger.debug("Checkpoint has been saved to persistent storage")
except Exception as e:
logger.error(f"Failed to save checkpoint: {e}")
raise
Integrating Checkpoints into the Streaming Engine
In SimpleStreamingEngine, we need to create a checkpoint instance and integrate it into the main processing loop.
First, create the checkpoint when the application starts:
class SimpleStreamingEngine:
def __init__(self, commit_interval: float = 5.0):
# Create a checkpoint instance
self._checkpoint = SimpleCheckpoint(commit_interval)
logger.info(
f"StreamingEngine initialized, checkpoint interval: {commit_interval}s"
)
# Other initialization...
...
Next, add the checkpoint trigger mechanism to the main processing loop:
# Record progress while processing each message
def message_handler(message):
# Process the message
df.process_message(message)
# Record progress
topic = source.topic
partition = message.partition
offset = message.offset
self._checkpoint.record_message(topic, partition, offset)
# Check whether it is time to report progress — the key trigger point!
if self._checkpoint.should_commit():
self._try_commit_checkpoint()
# Execute the three-phase checkpoint process
def _try_commit_checkpoint(self):
"""
Attempt to execute a checkpoint — this is the core of the trigger mechanism
"""
logger.debug("Checkpoint interval reached, starting commit")
try:
# Execute the full checkpoint workflow
success = self._checkpoint.commit()
if success:
# Commit succeeded, reset checkpoint for the next round
self._checkpoint.reset()
logger.debug("Checkpoint commit succeeded and reset")
else:
# Commit failed (possibly due to overload), keep the current state
logger.debug("Checkpoint commit failed, keeping current state")
except Exception as e:
logger.error(f"Checkpoint commit exception: {e}")
The system uses a time-based trigger mechanism: progress is automatically saved every commit_interval seconds (for example, every 5 seconds).
# Time-based trigger: check in the main loop
if self._checkpoint.should_commit():
self._try_commit_checkpoint()
Progress Recovery on System Restart
When the courier (the system) comes back online, it needs to continue from where it left off. Let’s walk through the recovery process:
System Restart Recovery Flow:
Step 1: Load Checkpoint
┌─────────────────────────────────────┐
│ _load_checkpoint_from_storage() │
│ │
│ Read checkpoint.json │
│ orders[0] -> offset 487 │
└─────────────────────────────────────┘
│
▼
Step 2: Set Consumer Position
┌─────────────────────────────────────┐
│ kafka_consumer.seek() │
│ │
│ Set orders[0] start at 488 │
│ (offset + 1 to avoid duplicates) │
└─────────────────────────────────────┘
│
▼
Step 3: Resume Processing
┌─────────────────────────────────────┐
│ Start Message Loop │
│ │
│ Continue from where left off │
│ No data loss or duplication │
│ Business as usual │
└─────────────────────────────────────┘
Recovery Flow Explanation
Step 1 – Load the last saved progress:
Read the previously saved checkpoint from persistent storage:
def run(self):
"""Restore progress when the system starts"""
# 1. Read the last saved progress from persistent storage
saved_progress = self._load_checkpoint_from_storage()
# 2. Tell the Kafka consumer where to start reading
if saved_progress:
for (topic, partition), offset in saved_progress.items():
logger.info(f"Restoring progress: {topic}[{partition}] starting from offset {offset}")
# Set the starting position of the Kafka consumer
self._kafka_consumer.seek(topic, partition, offset + 1)
logger.info("System startup complete, resuming processing from the last interruption")
# Start the normal message processing loop...
def _load_checkpoint_from_storage(self):
"""
Load the checkpoint from persistent storage
"""
try:
# Read from the filesystem
with open('checkpoint.json', 'r') as f:
checkpoint_data = json.load(f)
return checkpoint_data['committed_offsets']
except FileNotFoundError:
logger.info("Checkpoint file not found, starting from the beginning")
return {}
except Exception as e:
logger.warning(f"Failed to load checkpoint: {e}")
return {} # Start from the beginning
Key Concepts of Progress Recovery
Load previous progress: Read the last saved offsets from the checkpoint.
Set the consumption position: Instruct the Kafka consumer to start reading from the correct offsets.
Seamless continuation: After a restart, the system continues processing from where it stopped, with no data loss or duplication.
Checkpoint in Production Systems
Our example uses periodic snapshots for simplicity and then persists it periodically, which helps illustrate the basic concept of checkpoints.
However, in real-world stream processing systems—especially platforms like Kafka, Flink, and RisingWave—this kind of ‘one-off snapshot’ approach has a drawback:
Consistency challenges: the snapshot must be taken at a point in time that is perfectly aligned with the data processing logic; otherwise, it can lead to data loss or duplicate processing.
In more rigorous designs, checkpoint events flow through the pipeline just like regular data.
Checkpoint Barrier Propagation in Streaming Pipeline:
Timeline:
T1: [data] → [data] → [data] → [data]
T2: [data] → [data] → [data] → [data]
T3: [CHECKPOINT_BARRIER_#123] starts propagation
T4: [data] → [BARRIER] → [data] → [data]
T5: [data] → [data] → [BARRIER] → [data]
T6: [data] → [data] → [data] → [BARRIER]
When each Operator receives Checkpoint Barrier:
┌─────────────────────────────────────┐
│ Source receives BARRIER │
│ 1. Stop processing new data │
│ 2. Save state: offset=487 │
│ 3. Forward BARRIER downstream │
└─────────────────────────────────────┘
│
▼
┌─────────────────────────────────────┐
│ Filter receives BARRIER │
│ 1. Process data before BARRIER │
│ 2. Save state: processed=1234 │
│ 3. Forward BARRIER downstream │
└─────────────────────────────────────┘
│
▼
┌─────────────────────────────────────┐
│ Aggregator receives BARRIER │
│ 1. Save window state and results │
│ 2. Forward BARRIER downstream │
└─────────────────────────────────────┘
│
▼
┌─────────────────────────────────────┐
│ Sink receives BARRIER │
│ 1. Flush output buffer │
│ 2. Save state: written=456 │
│ 3. Report completion to Coordinator│
└─────────────────────────────────────┘
This design ensures:
Ordering: each operator saves its state at the same logical point in time
Consistency: no data is left in a “partially processed” state
Completeness: the states of all operators together form a complete snapshot
For today, however, we’ll start by using the snapshot approach to tell the story, so everyone can first grasp the core idea of progress tracking.
Architecture Recap
After three days of learning, our SimpleStreamingEngine has evolved from basic data stream processing into a reliable streaming system with progress management capabilities:
Day 8: SimpleStreamingEngine with Checkpoint (Streaming architecture with progress persistence)
┌─────────────────┐ ┌──────────────────┐ ┌─────────────────┐
│ KafkaSource │───►│DataFrame.filter()│───►│PostgreSQLSink │
│ │ │ │ │ │
│ • Topic consume │ │ • Lambda filter │ │ • Batch buffer │
│ • Consumer seek │ │ • Data transform │ │ │
│ • Offset resume │ │ │ │ • Timer trigger │
│ │ │ │ │ • Bulk insert │
│ │ │ │ │ • Progress track│
└─────────────────┘ └──────────────────┘ └─────────────────┘
▲ │
│ ┌─────────────────────────────────────┘
│ │
│ ▼
┌─────────────────────────────────────────────────────────────┐
│ SimpleStreamingEngine │
│ │
│ • Message handler with progress tracking │
│ • Checkpoint triggered every 5 seconds │
│ • Two-phase commit (flush + save) │
│ • Persistent storage (checkpoint.json) │
│ • Automatic recovery on system restart │
└─────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ SimpleCheckpoint │
│ │
│ • record_message(): Track processing progress │
│ • should_commit(): Time-based trigger (5s) │
│ • commit(): Two-phase progress persistence │
│ • reset(): Clear state and restart the cycle │
│ • Persistent state stored in checkpoint.json │
└─────────────────────────────────────────────────────────────┘
Evolution from basic processing to a reliable system
Day 5: Build basic stream processing capabilities
Day 6: Optimize throughput and improve efficiency
Day 7: Ensure system stability and prevent overload failures
Day 8: Record processing progress to enable fault recovery
Final Thoughts: Giving Your System a Memory
Checkpoint gives a streaming system memory:
Precise progress tracking
Fast recovery without duplicate processing
Like a good courier, a good streaming system always knows where it left off. Although reporting progress may pause processing for a few milliseconds each time, the trade-off is reliability and recoverability.
At this point, we’ve walked through several common and important system features in a streaming pipeline:
Rate limiting and overload protection (Backpressure)
Progress persistence and recovery (Checkpoint)
Real-world systems include additional features that are more specialized or scenario-dependent, which we won’t cover here. Next up, we’ll dive into the most important—and challenging—topics in streaming pipelines:
Joins
Stateful processing
This part will involve:
State storage and retrieval
State consistency and fault tolerance
Cross-event aggregation and correlation
Stay tuned!

