Backpressure in Streaming Systems

Backpressure in Streaming Systems

The Speed Layer is like a team of couriers, rushing fresh orders from Kafka to the database. But moving fast doesn’t mean everything is fine.

The Courier’s Worst Nightmare: The Pile-Up

Picture yourself as a logistics dispatcher. At first, everything runs smoothly:

  • Warehouse (Kafka) shipping rate: 100 packages/sec

  • Couriers (Stream Processing) pick-up rate: 100 packages/sec

  • Distribution center (PostgreSQL) processing rate: 100 packages/sec

Looks perfect, right? But suddenly… Black Friday!

  • Warehouse shipping rate: 300 packages/sec

  • Couriers pick-up rate: 300 packages/sec

  • Distribution center processing rate: 50 packages/sec (system is getting overwhelmed)

The couriers start queuing at the distribution center, packages pile up, memory spikes, and the system is on the verge of collapse…

A Classic Problem in Stream Processing: Backpressure

Backpressure is a classic problem every stream processing engineer encounters.

When PostgreSQL slows down for any of the following reason, your stream processing system can struggle to keep up and may eventually crash.

  • Network latency

  • Database lock contention

  • Disk I/O bottlenecks

  • Exhausted connection pools

Common Reactions to Backpressure

When teams hit backpressure, their first thought is usually: Add more machines, more CPU, more RAM, more database nodes!

This can work short-term, but you’ll never outrun traffic spikes. During promotions, traffic can spike 100x instantly—throwing money at it isn’t realistic.

Solution: Give Your Couriers Smart Brakes

Today, we’ll implement a backpressure mechanism for our Simple Streaming framework. The core idea is simple:

When the downstream can’t keep up, let the upstream smartly hit the brakes.

Note: All code in this article is pseudo-code for educational and conceptual illustration.

Focus on understanding the architecture and logic, not on running the exact code.

Step 1: Design an Emergency Signal

class StreamingOverloadException(Exception):
    """
    Exception thrown when the system is overloaded
    """
    def __init__(self, message: str, pause_seconds: float = 3.0):
        super().__init__(message)
        self.pause_seconds = pause_seconds  # recommended pause time

Step 2: Teach the PostgreSQL Sink to Signal Overload

Our PostgreSQL Sink needs to monitor its own capacity. Let’s break down the backpressure mechanism:

Backpressure Detection Flow

Normal Processing Flow:

Step 1: Data Arrives
┌───────────────┐
│ New Message   │ ──► write() method
└───────────────┘

Step 2: Buffer Management
┌───────────────┐    YES   ┌───────────────┐
│ Buffer >= 100?│ ───────► │ _flush_and_clear() │
└───────────────┘          └───────────────┘
       │ NO                      │
       ▼                         ▼
┌───────────────┐          ┌───────────────┐
│ Add to Buffer │          │ Start Timer   │
└───────────────┘          └───────────────┘

Step 3: Performance Monitoring
┌──────────────────────┐     ┌──────────────────────┐
│ Execute Batch Insert  │ ──►│ Calculate Duration    │
└──────────────────────┘     └──────────────────────┘
                                     │
                                     ▼
                          ┌──────────────────────┐
                          │ _check_overload()    │
                          └──────────────────────┘

Design Overview

Step 1 – Data Entry: Each message enters the system via write() method.

def write(self, message: Dict[str, Any]):
    """Add the message to the buffer and flush in batches when batch_size is reached"""
    # Extract the actual data from the message (following the Day 6 design)
    data = message.get('value', {})
    if not data:
        return

    # Add to buffer
    self._buffer.append(data)

    # Flush in batch if buffer reaches batch_size, triggering Step 2
    if len(self._buffer) >= self.batch_size:
        self._flush_and_clear()

Step 2 – Smart Buffering: Once the buffer reaches 100 messages, _flush_and_clear() is triggered. This is the main backpressure detection point.

Step 3 – Key Innovation: Add a timer in _flush_and_clear() to measure processing speed:

def _flush_and_clear(self):
    """Perform a batch write and act as a 'health monitor' for the system."""
    if not self._buffer:
        return

    # Key step: start timing (corresponds to "Start Timer" in the diagram)
    flush_start_time = time.time()
    buffer_size = len(self._buffer)

    try:
        # Execute the batch insert into PostgreSQL
        # (corresponds to "Execute Batch Insert" in the diagram)
        # ... batch insert logic here

        # Calculate the processing duration
        # (corresponds to "Calculate Duration")
        flush_duration = time.time() - flush_start_time

        # Important: check whether the system is overloaded
        # (corresponds to "_check_overload()")
        self._check_overload(flush_duration, buffer_size)

        # Clear the buffer
        self._buffer.clear()

    except Exception as e:
        logger.error(f"Batch write failed: {e}")
        raise

Triggering Backpressure

The key is knowing when the system is truly overloaded, not just experiencing occasional performance blips. We use a “three strikes” strategy:

Overload Detection:

┌─────────────────────┐
│ Duration > 2.0s ?   │
└─────────────────────┘
         │
      ┌─────────┐
      │ YES │ NO │
      ▼         ▼
┌─────────────┐ ┌─────────────┐
│ slow_count++│ │ slow_count--│
└─────────────┘ └─────────────┘
      │
      ▼
┌─────────────┐
│ Count >= 3? │
└─────────────┘
      │ YES
┌─────────────────────────┐
│ BACKPRESSURE TRIGGER    │
│ pause_time = 5.0s       │
│ raise StreamingOverloadException │
└─────────────────────────┘

Design Principles:

  • Threshold (2.0 seconds): Why 2 seconds? Under normal conditions, batch writing 100 records should complete in a few hundred milliseconds. A 2-second duration signals a clear performance anomaly. This threshold can be adjusted based on your hardware and database configuration.

  • Counter Mechanism (slow_count): A single slow batch may just be a temporary hiccup (network jitter, competing queries, etc.). We use a counter to distinguish between occasional slowdowns and sustained overload:

    • Slow batch: counter +1

    • Normal batch: counter -1 (but never below 0)

  • Three-Strikes Rule: Backpressure is triggered only after three consecutive slow batches. This prevents false alarms while still responding quickly to real overloads.

  • Fixed Pause Time: A 5-second pause gives most transient issues (short-term lock contention, memory GC, etc.) a chance to recover, without stalling the system for too long.

First, initialize Backpressure parameters:

def __init__(self, batch_size: int = 100):
    ...

    # Backpressure detection parameters (corresponding to thresholds in the diagram above)
    self._slow_flush_count = 0      # Counter: tracks consecutive slow flushes
    self._max_slow_flush = 3        # Trigger threshold: backpressure after 3 consecutive slow flushes
    self._slow_flush_threshold = 2.0  # Time threshold: flushes taking longer than 2 seconds are considered slow

Next, implement the Detection logic:

def _check_overload(self, flush_duration: float):
    """Check for overload—like a delivery worker checking if they’re too exhausted"""

    # Step 1: Check if the flush duration exceeds the threshold
    if flush_duration > self._slow_flush_threshold:  # 2.0-second threshold

        # Step 2: Increment the slow flush counter
        self._slow_flush_count += 1
        logger.warning(f"Processing slowed down: {flush_duration:.2f}s (slow flush #{self._slow_flush_count})")

        # Step 3: Check if the trigger condition is met
        if self._slow_flush_count >= self._max_slow_flush:  # 3 consecutive slow flushes

            # Step 4: Trigger backpressure
            error_msg = f"PostgreSQL overload: {self._slow_flush_count} consecutive slow flushes"
            pause_time = 5.0  # Fixed 5-second pause for simplicity

            self._slow_flush_count = 0  # Reset counter

            # Raise an overload exception to trigger backpressure
            raise StreamingOverloadException(error_msg, pause_seconds=pause_time)
    else:
        # Processing speed is back to normal, decrement the counter
        self._slow_flush_count = max(0, self._slow_flush_count - 1)

Step 3: Teach Simple Streaming Engine to Hit the Emergency Brake

Now we need the SimpleStreamingEngine to respond to this emergency signal. Let’s break down how backpressure is handled at the application level:

Application-Level Backpressure Flow

The SimpleStreamingEngine needs to perform backpressure checks right at the entry point of message processing, ensuring that no data is lost:

Step 1: Message Processing Entry Point
┌─────────────────────────────────────┐
         New Message Arrives         
└─────────────────────────────────────┘
                    
                    
┌─────────────────────────────────────┐
       message_handler()             
                                     
    First Check: _should_pause()?    
└─────────────────────────────────────┘
                    
        ┌──────────────────────┐
         YES           NO    
                              
┌─────────────────┐    ┌─────────────────┐
  Pause Kafka         Process Message 
  Consumer            Normally        
                                      
└─────────────────┘    └─────────────────┘
                                
                                
                    ┌─────────────────────┐
                     Exception Handler   
                    └─────────────────────┘

Step 1 – Message Processing Entry Point: Every message from Kafka is handled by a processor created via _create_message_handler():

def _create_message_handler(self, source):
    """Create a message handler with built-in smart braking"""
    def message_handler(message):
        # First line of defense: check system status (corresponds to "_should_pause()?" in the diagram)
        if self._should_pause():
            logger.debug("Pausing Kafka consumption due to backpressure")
            self._pause_kafka_consumer(source)  # corresponds to "Pause Kafka Consumer" in the diagram
            return

        # Normal processing flow (corresponds to "Process Message Normally")
        try:
            df = self._source_dataframe_map[source]
            df.process_message(message)
        except StreamingOverloadException as e:
            # Exception handling: downstream signals overload (corresponds to "Exception Handler")
            self._handle_overload(e, source)
        except Exception as e:
            logger.error(f"Other error: {e}")

    return message_handler

Concept:

  • Preemptive Check: Verify the system status before processing any message

  • Safe Pause: Don’t drop messages; pause the Kafka consumer so messages remain in the queue

  • Exception Handling: Catch overload exceptions from the PostgreSQL Sink

Emergency Brake Trigger and Recovery Mechanism

When the PostgreSQL Sink throws a StreamingOverloadException, the entire SimpleStreamingEngine enters an emergency state:

Normal Processing State:
┌─────────────────────────────────────┐
│          _paused = False            │
│         Normal Processing           │
└─────────────────────────────────────┘
                    │
                    ▼ StreamingOverloadException
┌─────────────────────────────────────┐
│         EMERGENCY BRAKE             │
│                                     │
│  _paused = True                     │
│  _pause_until = now + pause_seconds │
│  _overload_count++                  │
└─────────────────────────────────────┘
                    │
                    ▼
┌─────────────────────────────────────┐
│          PAUSED STATE               │
│                                     │
│   Kafka Consumer is paused          │
│   Messages remain in Kafka queue    │
│   System gives DB time to recover   │
└─────────────────────────────────────┘
                    │
                    ▼ Time expires
┌─────────────────────────────────────┐
│         AUTO RECOVERY               │
│                                     │
│  current_time >= _pause_until ?     │
│  YES: _paused = False               │
│  Resume normal processing           │
└─────────────────────────────────────┘

Emergency Brake Handling: When an overload exception is received (corresponds to "EMERGENCY BRAKE" in the diagram):

def _handle_overload(self, error: StreamingOverloadException, source):
    """Handle overload – emergency brake procedure"""
    # Track the number of overloads
    self._overload_count += 1

    # Set system state (corresponds to state change in diagram)
    self._paused = True
    self._pause_until = time.time() + error.pause_seconds  # compute when to resume

    logger.warning(
        f"System overload (occurrence #{self._overload_count})! "
        f"Pausing for {error.pause_seconds:.1f} seconds to give the system a breather..."
    )

    # Immediately pause the Kafka consumer (corresponds to "Kafka Consumer is paused")
    self._pause_kafka_consumer(source)

State Check and Auto Recovery: Before processing each message, the system checks whether it should pause (corresponds to "AUTO RECOVERY").

def _should_pause(self) -> bool:
    """Check if the system should pause – like looking at a traffic light"""
    current_time = time.time()

    # Check current pause state
    if self._paused and current_time < self._pause_until:
        return True  # still in paused state

    elif self._paused and current_time >= self._pause_until:
        # Time to auto-recover
        self._paused = False
        logger.info("Pause ended, resuming message processing")
        # Important: resume all Kafka consumers
        self._resume_all_kafka_consumers()

    return False  # normal processing

Architecture Recap

After several days of hands-on learning, our SimpleStreamingEngine has evolved from a basic data flow processor into a more stable streaming system:

Day 7: SimpleStreamingEngine with Backpressure (带背压机制的流处理架构)

┌─────────────────┐    ┌──────────────────┐    ┌──────────────────┐
│   KafkaSource   │───►│DataFrame.filter()│───►│PostgreSQLSink    │
│                 │    │                  │    │                  │
│ • Topic consume │    │ • Lambda filter  │    │ • Batch buffer   │
│ • Consumer pause│    │ • Data transform │    │ • Timer trigger  │
│ • Auto resume   │    │                  │    │ • Bulk insert    │
│                 │    │                  │    │ • Overload detect│
│                 │    │                  │    │ • Exception throw│
└─────────────────┘    └──────────────────┘    └──────────────────┘
         ▲                                               │
         │         ┌─────────────────────────────────────┘
         │         │
         │         ▼
┌─────────────────────────────────────────────────────────────┐
│              SimpleStreamingEngine                          │
│                                                             │
│  • Message handler with backpressure check                  │
│  • _should_pause() status management                        │
│  • _handle_overload() emergency brake                       │
│  • Kafka Consumer pause/resume control                      │
│  • Auto recovery after timeout                              │
└─────────────────────────────────────────────────────────────┘

The system has now evolved from simple “message in → process → out” to a full-fledged streaming system with intelligent flow control:

  • Day 5: Built basic streaming capability

  • Day 6: Optimized throughput and efficiency

  • Day 7: Ensured system stability and prevented overload crashes

Summary

What We Learned

Backpressure acts like a smart traffic controller for your streaming system. It keeps the system running smoothly and reliably.

Core Design Principle

In high-throughput systems, gracefully handling overload is more important than blindly increasing capacity. When PostgreSQL starts struggling, letting the system slow down intelligently is often better than forcing it to keep up.

Key Points

  • Three-Strikes Rule: Avoids false alarms, triggers only on true overload

  • Fixed Pause Duration: 5 seconds is simple and easy to understand

  • Kafka Consumer Pause: Ensures data is not lost

  • Auto Recovery: System self-heals without manual intervention

Backpressure in Production

In real-world environments, backpressure mechanisms are far more sophisticated than our demo version.

Mature streaming frameworks like Apache Flink have built-in backpressure mechanisms:

  • Automatic Detection: Monitors processing speed and resource usage

  • Dynamic Adjustment: Adapts processing speed according to load

  • Distributed Support: Coordinates backpressure across multiple nodes

Takeaways

Through this simplified implementation, we grasped the core concepts of backpressure:

  • Clear logic, easy to understand

  • Includes key elements: detection, trigger, pause, recovery

  • Ensures data safety—no messages are lost

Like in a delivery platform: the goal isn’t to exhaust the couriers, but to make sure every package is delivered reliably so customers keep coming back.

Preview of Day 8

Today we learned how to make the system hit the brakes under overload, but one key question remains: how to accurately track processing progress?

In streaming systems, when the system restarts or fails, we need to know: which Kafka offsets have been processed? Which data has been successfully written to the database? How do we avoid duplicate processing or data loss?

Next time we’ll explore manual commit control—the core technique for managing processing progress. We’ll implement it using a checkpoint mechanism to:

  • Accurately record Kafka offsets and processing status

  • Resume from the correct point after failure

  • Avoid duplicate processing and data inconsistency

  • Flexibly control commit timing to ensure data safety

The Modern Backbone for
Real-Time Data and AI
GitHubXLinkedInSlackYouTube
Sign up for our to stay updated.