The Speed Layer is like a team of couriers, rushing fresh orders from Kafka to the database. But moving fast doesn’t mean everything is fine.
The Courier’s Worst Nightmare: The Pile-Up
Picture yourself as a logistics dispatcher. At first, everything runs smoothly:
Warehouse (Kafka) shipping rate: 100 packages/sec
Couriers (Stream Processing) pick-up rate: 100 packages/sec
Distribution center (PostgreSQL) processing rate: 100 packages/sec
Looks perfect, right? But suddenly… Black Friday!
Warehouse shipping rate: 300 packages/sec
Couriers pick-up rate: 300 packages/sec
Distribution center processing rate: 50 packages/sec (system is getting overwhelmed)
The couriers start queuing at the distribution center, packages pile up, memory spikes, and the system is on the verge of collapse…
A Classic Problem in Stream Processing: Backpressure
Backpressure is a classic problem every stream processing engineer encounters.
When PostgreSQL slows down for any of the following reason, your stream processing system can struggle to keep up and may eventually crash.
Network latency
Database lock contention
Disk I/O bottlenecks
Exhausted connection pools
Common Reactions to Backpressure
When teams hit backpressure, their first thought is usually: Add more machines, more CPU, more RAM, more database nodes!
This can work short-term, but you’ll never outrun traffic spikes. During promotions, traffic can spike 100x instantly—throwing money at it isn’t realistic.
Solution: Give Your Couriers Smart Brakes
Today, we’ll implement a backpressure mechanism for our Simple Streaming framework. The core idea is simple:
When the downstream can’t keep up, let the upstream smartly hit the brakes.
Note: All code in this article is pseudo-code for educational and conceptual illustration.
Focus on understanding the architecture and logic, not on running the exact code.
Step 1: Design an Emergency Signal
class StreamingOverloadException(Exception):
"""
Exception thrown when the system is overloaded
"""
def __init__(self, message: str, pause_seconds: float = 3.0):
super().__init__(message)
self.pause_seconds = pause_seconds # recommended pause time
Step 2: Teach the PostgreSQL Sink to Signal Overload
Our PostgreSQL Sink needs to monitor its own capacity. Let’s break down the backpressure mechanism:
Backpressure Detection Flow
Normal Processing Flow:
Step 1: Data Arrives
┌───────────────┐
│ New Message │ ──► write() method
└───────────────┘
Step 2: Buffer Management
┌───────────────┐ YES ┌───────────────┐
│ Buffer >= 100?│ ───────► │ _flush_and_clear() │
└───────────────┘ └───────────────┘
│ NO │
▼ ▼
┌───────────────┐ ┌───────────────┐
│ Add to Buffer │ │ Start Timer │
└───────────────┘ └───────────────┘
Step 3: Performance Monitoring
┌──────────────────────┐ ┌──────────────────────┐
│ Execute Batch Insert │ ──►│ Calculate Duration │
└──────────────────────┘ └──────────────────────┘
│
▼
┌──────────────────────┐
│ _check_overload() │
└──────────────────────┘
Design Overview
Step 1 – Data Entry: Each message enters the system via write() method.
def write(self, message: Dict[str, Any]):
"""Add the message to the buffer and flush in batches when batch_size is reached"""
# Extract the actual data from the message (following the Day 6 design)
data = message.get('value', {})
if not data:
return
# Add to buffer
self._buffer.append(data)
# Flush in batch if buffer reaches batch_size, triggering Step 2
if len(self._buffer) >= self.batch_size:
self._flush_and_clear()
Step 2 – Smart Buffering: Once the buffer reaches 100 messages, _flush_and_clear() is triggered. This is the main backpressure detection point.
Step 3 – Key Innovation: Add a timer in _flush_and_clear() to measure processing speed:
def _flush_and_clear(self):
"""Perform a batch write and act as a 'health monitor' for the system."""
if not self._buffer:
return
# Key step: start timing (corresponds to "Start Timer" in the diagram)
flush_start_time = time.time()
buffer_size = len(self._buffer)
try:
# Execute the batch insert into PostgreSQL
# (corresponds to "Execute Batch Insert" in the diagram)
# ... batch insert logic here
# Calculate the processing duration
# (corresponds to "Calculate Duration")
flush_duration = time.time() - flush_start_time
# Important: check whether the system is overloaded
# (corresponds to "_check_overload()")
self._check_overload(flush_duration, buffer_size)
# Clear the buffer
self._buffer.clear()
except Exception as e:
logger.error(f"Batch write failed: {e}")
raise
Triggering Backpressure
The key is knowing when the system is truly overloaded, not just experiencing occasional performance blips. We use a “three strikes” strategy:
Overload Detection:
┌─────────────────────┐
│ Duration > 2.0s ? │
└─────────────────────┘
│
┌─────────┐
│ YES │ NO │
▼ ▼
┌─────────────┐ ┌─────────────┐
│ slow_count++│ │ slow_count--│
└─────────────┘ └─────────────┘
│
▼
┌─────────────┐
│ Count >= 3? │
└─────────────┘
│ YES
┌─────────────────────────┐
│ BACKPRESSURE TRIGGER │
│ pause_time = 5.0s │
│ raise StreamingOverloadException │
└─────────────────────────┘
Design Principles:
Threshold (2.0 seconds): Why 2 seconds? Under normal conditions, batch writing 100 records should complete in a few hundred milliseconds. A 2-second duration signals a clear performance anomaly. This threshold can be adjusted based on your hardware and database configuration.
Counter Mechanism (slow_count): A single slow batch may just be a temporary hiccup (network jitter, competing queries, etc.). We use a counter to distinguish between occasional slowdowns and sustained overload:
Slow batch: counter +1
Normal batch: counter -1 (but never below 0)
Three-Strikes Rule: Backpressure is triggered only after three consecutive slow batches. This prevents false alarms while still responding quickly to real overloads.
Fixed Pause Time: A 5-second pause gives most transient issues (short-term lock contention, memory GC, etc.) a chance to recover, without stalling the system for too long.
First, initialize Backpressure parameters:
def __init__(self, batch_size: int = 100):
...
# Backpressure detection parameters (corresponding to thresholds in the diagram above)
self._slow_flush_count = 0 # Counter: tracks consecutive slow flushes
self._max_slow_flush = 3 # Trigger threshold: backpressure after 3 consecutive slow flushes
self._slow_flush_threshold = 2.0 # Time threshold: flushes taking longer than 2 seconds are considered slow
Next, implement the Detection logic:
def _check_overload(self, flush_duration: float):
"""Check for overload—like a delivery worker checking if they’re too exhausted"""
# Step 1: Check if the flush duration exceeds the threshold
if flush_duration > self._slow_flush_threshold: # 2.0-second threshold
# Step 2: Increment the slow flush counter
self._slow_flush_count += 1
logger.warning(f"Processing slowed down: {flush_duration:.2f}s (slow flush #{self._slow_flush_count})")
# Step 3: Check if the trigger condition is met
if self._slow_flush_count >= self._max_slow_flush: # 3 consecutive slow flushes
# Step 4: Trigger backpressure
error_msg = f"PostgreSQL overload: {self._slow_flush_count} consecutive slow flushes"
pause_time = 5.0 # Fixed 5-second pause for simplicity
self._slow_flush_count = 0 # Reset counter
# Raise an overload exception to trigger backpressure
raise StreamingOverloadException(error_msg, pause_seconds=pause_time)
else:
# Processing speed is back to normal, decrement the counter
self._slow_flush_count = max(0, self._slow_flush_count - 1)
Step 3: Teach Simple Streaming Engine to Hit the Emergency Brake
Now we need the SimpleStreamingEngine to respond to this emergency signal. Let’s break down how backpressure is handled at the application level:
Application-Level Backpressure Flow
The SimpleStreamingEngine needs to perform backpressure checks right at the entry point of message processing, ensuring that no data is lost:
Step 1: Message Processing Entry Point
┌─────────────────────────────────────┐
│ New Message Arrives │
└─────────────────────────────────────┘
│
▼
┌─────────────────────────────────────┐
│ message_handler() │
│ │
│ First Check: _should_pause()? │
└─────────────────────────────────────┘
│
┌──────────────────────┐
│ YES │ NO │
▼ ▼
┌─────────────────┐ ┌─────────────────┐
│ Pause Kafka │ │ Process Message │
│ Consumer │ │ Normally │
│ │ │ │
└─────────────────┘ └─────────────────┘
│
▼
┌─────────────────────┐
│ Exception Handler │
└─────────────────────┘
Step 1 – Message Processing Entry Point: Every message from Kafka is handled by a processor created via _create_message_handler():
def _create_message_handler(self, source):
"""Create a message handler with built-in smart braking"""
def message_handler(message):
# First line of defense: check system status (corresponds to "_should_pause()?" in the diagram)
if self._should_pause():
logger.debug("Pausing Kafka consumption due to backpressure")
self._pause_kafka_consumer(source) # corresponds to "Pause Kafka Consumer" in the diagram
return
# Normal processing flow (corresponds to "Process Message Normally")
try:
df = self._source_dataframe_map[source]
df.process_message(message)
except StreamingOverloadException as e:
# Exception handling: downstream signals overload (corresponds to "Exception Handler")
self._handle_overload(e, source)
except Exception as e:
logger.error(f"Other error: {e}")
return message_handler
Concept:
Preemptive Check: Verify the system status before processing any message
Safe Pause: Don’t drop messages; pause the Kafka consumer so messages remain in the queue
Exception Handling: Catch overload exceptions from the PostgreSQL Sink
Emergency Brake Trigger and Recovery Mechanism
When the PostgreSQL Sink throws a StreamingOverloadException, the entire SimpleStreamingEngine enters an emergency state:
Normal Processing State:
┌─────────────────────────────────────┐
│ _paused = False │
│ Normal Processing │
└─────────────────────────────────────┘
│
▼ StreamingOverloadException
┌─────────────────────────────────────┐
│ EMERGENCY BRAKE │
│ │
│ _paused = True │
│ _pause_until = now + pause_seconds │
│ _overload_count++ │
└─────────────────────────────────────┘
│
▼
┌─────────────────────────────────────┐
│ PAUSED STATE │
│ │
│ Kafka Consumer is paused │
│ Messages remain in Kafka queue │
│ System gives DB time to recover │
└─────────────────────────────────────┘
│
▼ Time expires
┌─────────────────────────────────────┐
│ AUTO RECOVERY │
│ │
│ current_time >= _pause_until ? │
│ YES: _paused = False │
│ Resume normal processing │
└─────────────────────────────────────┘
Emergency Brake Handling: When an overload exception is received (corresponds to "EMERGENCY BRAKE" in the diagram):
def _handle_overload(self, error: StreamingOverloadException, source):
"""Handle overload – emergency brake procedure"""
# Track the number of overloads
self._overload_count += 1
# Set system state (corresponds to state change in diagram)
self._paused = True
self._pause_until = time.time() + error.pause_seconds # compute when to resume
logger.warning(
f"System overload (occurrence #{self._overload_count})! "
f"Pausing for {error.pause_seconds:.1f} seconds to give the system a breather..."
)
# Immediately pause the Kafka consumer (corresponds to "Kafka Consumer is paused")
self._pause_kafka_consumer(source)
State Check and Auto Recovery: Before processing each message, the system checks whether it should pause (corresponds to "AUTO RECOVERY").
def _should_pause(self) -> bool:
"""Check if the system should pause – like looking at a traffic light"""
current_time = time.time()
# Check current pause state
if self._paused and current_time < self._pause_until:
return True # still in paused state
elif self._paused and current_time >= self._pause_until:
# Time to auto-recover
self._paused = False
logger.info("Pause ended, resuming message processing")
# Important: resume all Kafka consumers
self._resume_all_kafka_consumers()
return False # normal processing
Architecture Recap
After several days of hands-on learning, our SimpleStreamingEngine has evolved from a basic data flow processor into a more stable streaming system:
Day 7: SimpleStreamingEngine with Backpressure (带背压机制的流处理架构)
┌─────────────────┐ ┌──────────────────┐ ┌──────────────────┐
│ KafkaSource │───►│DataFrame.filter()│───►│PostgreSQLSink │
│ │ │ │ │ │
│ • Topic consume │ │ • Lambda filter │ │ • Batch buffer │
│ • Consumer pause│ │ • Data transform │ │ • Timer trigger │
│ • Auto resume │ │ │ │ • Bulk insert │
│ │ │ │ │ • Overload detect│
│ │ │ │ │ • Exception throw│
└─────────────────┘ └──────────────────┘ └──────────────────┘
▲ │
│ ┌─────────────────────────────────────┘
│ │
│ ▼
┌─────────────────────────────────────────────────────────────┐
│ SimpleStreamingEngine │
│ │
│ • Message handler with backpressure check │
│ • _should_pause() status management │
│ • _handle_overload() emergency brake │
│ • Kafka Consumer pause/resume control │
│ • Auto recovery after timeout │
└─────────────────────────────────────────────────────────────┘
The system has now evolved from simple “message in → process → out” to a full-fledged streaming system with intelligent flow control:
Day 5: Built basic streaming capability
Day 6: Optimized throughput and efficiency
Day 7: Ensured system stability and prevented overload crashes
Summary
What We Learned
Backpressure acts like a smart traffic controller for your streaming system. It keeps the system running smoothly and reliably.
Core Design Principle
In high-throughput systems, gracefully handling overload is more important than blindly increasing capacity. When PostgreSQL starts struggling, letting the system slow down intelligently is often better than forcing it to keep up.
Key Points
Three-Strikes Rule: Avoids false alarms, triggers only on true overload
Fixed Pause Duration: 5 seconds is simple and easy to understand
Kafka Consumer Pause: Ensures data is not lost
Auto Recovery: System self-heals without manual intervention
Backpressure in Production
In real-world environments, backpressure mechanisms are far more sophisticated than our demo version.
Mature streaming frameworks like Apache Flink have built-in backpressure mechanisms:
Automatic Detection: Monitors processing speed and resource usage
Dynamic Adjustment: Adapts processing speed according to load
Distributed Support: Coordinates backpressure across multiple nodes
Takeaways
Through this simplified implementation, we grasped the core concepts of backpressure:
Clear logic, easy to understand
Includes key elements: detection, trigger, pause, recovery
Ensures data safety—no messages are lost
Like in a delivery platform: the goal isn’t to exhaust the couriers, but to make sure every package is delivered reliably so customers keep coming back.
Preview of Day 8
Today we learned how to make the system hit the brakes under overload, but one key question remains: how to accurately track processing progress?
In streaming systems, when the system restarts or fails, we need to know: which Kafka offsets have been processed? Which data has been successfully written to the database? How do we avoid duplicate processing or data loss?
Next time we’ll explore manual commit control—the core technique for managing processing progress. We’ll implement it using a checkpoint mechanism to:
Accurately record Kafka offsets and processing status
Resume from the correct point after failure
Avoid duplicate processing and data inconsistency
Flexibly control commit timing to ensure data safety

