A Hands-On Guide to Building the Speed Layer of the Lambda Architecture

A Hands-On Guide to Building the Speed Layer of the Lambda Architecture

In the previous article, we introduced the design principles and implementation of the Lambda Architecture.

We talked about its three layers:

  • Batch Layer – slow but precise; processes full historical data to ensure eventual consistency

  • Speed Layer – handles incremental data in real time

  • Serving Layer – merges results from both layers and provides a unified query interface

Starting today, we’ll dive into the implementation of the Speed Layer.

We’ll begin by writing a Kafka Consumer and gradually explore the core mechanisms of stream processing.

In today’s article, we’ll write a Kafka Consumer — a key component of the Speed Layer — responsible for instantly receiving order data and writing it into the Serving DB (while the Batch Layer focuses on processing full historical data each day).

⚠️ Important

All the code in this article is for educational and conceptual demonstration only (pseudo code).

It’s designed to help you understand and solve real-world problems, but it’s not production-ready.

Focus on grasping the overall architecture and design logic — you can skim over low-level code details.

Step 1: Speed Layer Kafka Consumer: Ingesting Orders into the Serving DB

Let’s start with the simplest version:

from kafka import KafkaConsumer
import json

# Subscribe to the orders topic
consumer = KafkaConsumer('orders')

print("[Speed Layer] Waiting for fresh orders...")

for message in consumer:
    order = json.loads(message.value.decode('utf-8'))

    insert_db(order)
    conn.commit()
    print(f"[Speed Layer] Inserted order {order['id']}")

This logic is straightforward:

  • Kafka provides a real-time stream of order data

  • The Consumer consumes and processes the stream

  • The Serving DB stores the processed results

  • Core task: write incoming data into storage as fast as possible

Step 2: Designing the Serving DB

Here we have two key tables:

  • orders_batch_summary – pre-aggregated historical data computed daily by the Batch Layer

  • orders_realtime – detailed real-time orders sent by the Speed Layer

When querying from the dashboard, the system merges both tables while filtering out invalid orders with status = 'removed':

SELECT status, SUM(count) AS total
FROM (
    SELECT status, count(*)
    FROM orders_batch_summary
    WHERE status != 'removed'

    UNION ALL

    SELECT status, COUNT(*) AS count
    FROM orders_realtime
    WHERE status != 'removed'
    GROUP BY status
) t
GROUP BY status;

However, as the company grows, more Consumers are written independently by different developers, each with their own logic. Over time, the codebase becomes tangled and unmaintainable.

We need to refactor — to establish a unified Stream Processing architecture so everyone can build upon a consistent framework.

Designing a Source Abstraction Layer

On a team project, different Kafka Consumer implementations can make integration harder.

The solution is to define a unified Source Interface.

Source Architecture Design

    ┌─────────────┐
    │ BaseSource  │  ◄── Abstract Interface
    │             │
    │ + run()     │
    └─────────────┘
           △
           │ implements
    ┌─────────────┐
    │KafkaSource  │  ◄── Concrete Implementation
    │             │
    │ + run()     │
    └─────────────┘

Step-by-Step Explanation of the Source Core

Step 1: Define the BaseSource Abstract Interface

from abc import ABC, abstractmethod

class BaseSource(ABC):
    def __init__(self, name: str):
        self.name = name

    @abstractmethod
    def run(self):
        pass

Key Points:

  • Each Source must have a unique name

  • run() is abstract — forcing subclasses to implement it

Step 2: Initialize SimpleKafkaSource

class SimpleKafkaSource(BaseSource):
    def __init__(self, name: str, topic: str, broker_address: str = "localhost:9092"):
        super().__init__(name)
        self.topic = topic
        self.broker_address = broker_address
        self.consumer = None
        self.message_handler = self._default_handler

Key Points:

  • Inherits from BaseSource to conform to a unified interface

  • message_handler is replaceable, offering flexibility in message processing logic

Step 3: Set Up the Kafka Consumer

def _setup_consumer(self):
    self.consumer = KafkaConsumer(
        self.topic,
        bootstrap_servers=self.broker_address,
        group_id=f"simple-source-{self.name}",
        auto_offset_reset='latest',
        value_deserializer=lambda m: json.loads(m.decode('utf-8')) if m else None
    )

Key Points:

  • group_id is automatically generated to avoid conflicts

  • auto_offset_reset='latest' ensures consumption starts from the latest messages

  • Automatic JSON deserialization

Step 4: Core Runtime Logic

def run(self):
    self._setup_consumer()  # initialize Consumer

    for message in self.consumer:  # continuously listen for messages
        self.message_handler({
            'key': message.key,
            'value': message.value,
            'topic': message.topic,
            'offset': message.offset
        })

Execution Flow:

  1. Initialize Kafka Consumer

  2. Continuously read messages from the Topic

  3. Wrap messages in a standard format

  4. Call message_handler for processing

Key Points:

The Source only handles data ingestion; message processing logic is injected externally through message_handler, enabling high flexibility.

import logging
import json
from abc import ABC, abstractmethod
from typing import Optional, Callable, Any
from kafka import KafkaConsumer

logger = logging.getLogger(__name__)

class BaseSource(ABC):
    """Base abstract class for all Sources"""

    def __init__(self, name: str):
        self.name = name
        self._running = False

    @abstractmethod
    def run(self):
        """Main execution method"""
        pass

    def stop(self):
        """Stop the Source"""
        self._running = False
        logger.info(f"Source {self.name} stopped")

class SimpleKafkaSource(BaseSource):
    """Simple Kafka Source implementation"""

    def __init__(
        self,
        name: str,
        topic: str,
        broker_address: str = "localhost:9092",
        consumer_group: Optional[str] = None,
        message_handler: Optional[Callable[[Any], None]] = None
    ):
        super().__init__(name)
        self.topic = topic
        self.broker_address = broker_address
        self.consumer_group = consumer_group or f"simple-source-{name}"
        self.message_handler = message_handler or self._default_handler
        self.consumer: Optional[KafkaConsumer] = None

    def _default_handler(self, message):
        print(f"[{self.name}] Received message: {message}")

    def _setup_consumer(self):
        try:
            self.consumer = KafkaConsumer(
                self.topic,
                bootstrap_servers=self.broker_address,
                group_id=self.consumer_group,
                auto_offset_reset='latest',
                value_deserializer=lambda m: json.loads(m.decode('utf-8')) if m else None,
                key_deserializer=lambda m: m.decode('utf-8') if m else None
            )
            logger.info(f"Consumer setup for topic: {self.topic}, group: {self.consumer_group}")
        except Exception as e:
            logger.error(f"Failed to setup consumer: {e}")
            raise

    def run(self):
        logger.info(f"Starting Source {self.name} for topic {self.topic}")
        self._setup_consumer()
        self._running = True

        try:
            while self._running:
                message_batch = self.consumer.poll(timeout_ms=1000)
                for topic_partition, messages in message_batch.items():
                    for message in messages:
                        if not self._running:
                            break
                        try:
                            self.message_handler({
                                'key': message.key,
                                'value': message.value,
                                'topic': message.topic,
                                'partition': message.partition,
                                'offset': message.offset,
                                'timestamp': message.timestamp
                            })
                        except Exception as e:
                            logger.error(f"Error processing message: {e}")
        except KeyboardInterrupt:
            logger.info("Received interrupt signal")
        except Exception as e:
            logger.error(f"Error in run loop: {e}")
        finally:
            if self.consumer:
                self.consumer.close()
            logger.info(f"Source {self.name} finished")

    def stop(self):
        super().stop()
        if self.consumer:
            self.consumer.close()

Designing the Sink Abstraction Layer

In the Speed Layer architecture, Sources handle data input, while Sinks handle data output.

To avoid inconsistent implementations, we define a unified Sink interface.

Sink Architecture Design

    ┌─────────────┐
    │  BaseSink   │  ◄── Abstract Interface
    │             │
    │ + write()   │
    └─────────────┘
           △
           │ implements
    ┌──────────────────┐
    │SimplePostgreSQL  │  ◄── Concrete Implementation
    │Sink              │
    │ + write()        │
    └──────────────────┘

Step-by-Step Explanation of the Sink Core

Step 1: Define the Base Sink Interface

from abc import ABC, abstractmethod

class BaseSink(ABC):
    def __init__(self, name: str):
        self.name = name

    @abstractmethod
    def write(self, message):
        pass

    def setup(self):
        pass  # default no-op

Key Points:

  • Each Sink must have a unique name

  • write() handles actual data writes

  • setup() is optional and can be overridden

Step 2: Initialize the Simple PostgreSQL Sink

class SimplePostgreSQLSink(BaseSink):
    def __init__(self, name: str, host: str, dbname: str, table_name: str):
        super().__init__(name)
        self.host = host
        self.dbname = dbname
        self.table_name = table_name
        self.connection = None

Key Points:

  • Inherits from BaseSink for interface consistency

  • Stores database connection information

  • Lazy connection initialization (connection = None)

Step 3: Core Logic of write()

def write(self, message):
    # Automatically detect columns and insert into DB
    data = message.get('value', {})
    # ... dynamically generate SQL and insert

Key Points:

Automatically detects field structures in message['value'], dynamically generates INSERT SQL, and writes to the database.

This allows the Sink to adapt to various data schemas automatically.

Complete Sink Implementation

import logging
from abc import ABC, abstractmethod
from typing import Any, Dict

try:
    import psycopg2
    from psycopg2.extras import Json
    from psycopg2 import sql
except ImportError:
    psycopg2 = None
    print("Warning: psycopg2 not installed. Run: pip install psycopg2-binary")

logger = logging.getLogger(__name__)

class BaseSink(ABC):
    """Base abstract class for all Sinks"""

    def __init__(self, name: str):
        self.name = name

    @abstractmethod
    def write(self, message: Dict[str, Any]):
        """Write a message"""
        pass

    def setup(self):
        """Setup connection"""
        pass

    def close(self):
        """Close connection"""
        pass

class SimplePostgreSQLSink(BaseSink):
    """PostgreSQL Sink with automatic schema detection"""

    def __init__(
        self,
        name: str,
        host: str,
        port: int,
        dbname: str,
        user: str,
        password: str,
        table_name: str
    ):
        super().__init__(name)
        self.host = host
        self.port = port
        self.dbname = dbname
        self.user = user
        self.password = password
        self.table_name = table_name
        self.connection = None

    def setup(self):
        if psycopg2 is None:
            raise ImportError("psycopg2 is required")

        self.connection = psycopg2.connect(
            host=self.host,
            port=self.port,
            dbname=self.dbname,
            user=self.user,
            password=self.password
        )
        logger.info(f"Connected to PostgreSQL: {self.host}:{self.port}/{self.dbname}")

    def write(self, message: Dict[str, Any]):
        """Automatically detect fields and write to PostgreSQL"""
        data = message.get('value', {})
        # ... dynamic field detection and SQL execution logic

    def close(self):
        if self.connection:
            self.connection.close()
            logger.info("PostgreSQL connection closed")

Simple Streaming Engine: Unified Management Layer

Between the Source (data input) and the Sink (data output), we need a unified management layer to handle orchestration, monitoring, and lifecycle management.

This component is the SimpleStreamingEngine.

Simple Streaming Engine Architecture Design

    ┌─────────────────────┐
    │SimpleStreamingEngine│  ◄── Central Manager
    │                     │
    │    +add_source()    │
    │    +add_sink()      │
    │    + run()          │
    └─────────────────────┘
           │
           │ manages
           ▼
    ┌──────────────┐    ┌──────────────┐
    │    Source    │───▶│     Sink     │
    │              │    │              │
    │ KafkaSource  │    │PostgreSQLSink│
    └──────────────┘    └──────────────┘

Step-by-Step Breakdown of the Simple Streaming Engine Core Code

Step 1: Initializing the Simple Streaming Engine

class SimpleStreamingEngine:
    def __init__(self, name: str = "simple-streaming-app"):
        self.name = name
        self._sources = []  # List of Sources
        self._sinks = []    # List of Sinks

Key Points:

  • The SimpleStreamingEngine manages two lists: Sources and Sinks.

  • It provides unified registration interfaces for both.

Step 2: Registering Sources and Sinks

def add_source(self, source: BaseSource):
    self._sources.append(source)

def add_sink(self, sink: BaseSink):
    self._sinks.append(sink)

Key Points:

  • Simple list-based management supporting multiple sources and sinks.

  • Follows a unified interface — any class implementing BaseSource or BaseSink can be registered.

Step 3: Core Execution Logic

def run(self):
    # Initialize all sinks
    for sink in self._sinks:
        sink.setup()

    # Set message handler for each source
    for source in self._sources:
        source.message_handler = self._create_message_handler()
        source.run()  # Start consuming data

Execution Flow:

  1. Initialize connections for all sinks.

  2. Assign message handlers to sources.

  3. Start all sources to begin consuming data.

Step 4: Message Handler Core Logic

def _create_message_handler(self):
    def handler(message):
        # Forward message to all sinks
        for sink in self._sinks:
            sink.write(message)
    return handler

Detailed Data Flow Explanation:

  1. When SimpleStreamingEngine starts:

     # Inside SimpleStreamingEngine.run()
     for source in self._sources:
         source.message_handler = self._create_message_handler()  # Inject handler
         source.run()  # Start source
    
  2. When Source receives data:

     # Inside SimpleKafkaSource.run()
     for message in self.consumer:  # Fetch messages from Kafka
         formatted_message = {
             'key': message.key,
             'value': message.value
         }
         self.message_handler(formatted_message)  # Call injected handler
    
  3. When handler forwards messages:

     # Handler returned by _create_message_handler()
     def handler(message):  # message = formatted data from source
         for sink in self._sinks:
             sink.write(message)
    

Overall Data Flow:

KafkaSource.run()message_handler()Sink.write()

Key Point:

The SimpleStreamingEngine uses function injection so that sources are unaware of sinks, achieving complete decoupling between components.

Why Do We Need the Simple Streaming Engine?

  • Decoupled Design: Sources and sinks are fully independent and interchangeable.

  • Scalability: Supports multiple sinks simultaneously (e.g., PostgreSQL + Elasticsearch).

  • Unified Management: Offers a consistent API for registration and execution.

Complete Simple Streaming Engine Code

import logging
from typing import List
from .source import BaseSource
from .sink import BaseSink

logger = logging.getLogger(__name__)

class SimpleStreamingEngine:
    """
    A minimal streaming processing engine
    """

    def __init__(self, name: str = "simple-streaming-engine"):
        self.name = name
        self._sources: List[BaseSource] = []  # List of sources
        self._sinks: List[BaseSink] = []      # List of sinks

    def add_source(self, source: BaseSource):
        """
        Register a source with the streaming engine
        """
        self._sources.append(source)

    def add_sink(self, sink: BaseSink):
        """
        Register a sink with the streaming engine
        """
        self._sinks.append(sink)

    def run(self):
        """
        Start the streaming engine and process data streams
        """
        # Initialize all sinks
        for sink in self._sinks:
            sink.setup()

        # Assign message handler to each source
        for source in self._sources:
            source.message_handler = self._create_message_handler()
            source.run()  # Start consuming data

    def _create_message_handler(self):
        """
        Create a message handler that dispatches data to all sinks
        """
        def handler(message):
            for sink in self._sinks:
                sink.write(message)
        return handler

Putting It All Together: Automatic Data Flow in Action

At this point, we have:

  • Source: Ingest data from Kafka

  • Sink: Deliver data into PostgreSQL

  • SimpleStreamingEngine: Connects, manages, and orchestrates both

Once assembled, we can kick off the automatic end-to-end data flow.

# 1. Create the SimpleStreamingEngine
engine = SimpleStreamingEngine(...)

# 2. Create a Kafka Source
orders_source = SimpleKafkaSource(...)

# 3. Create a PostgreSQL Sink
pg_sink = SimplePostgreSQLSink(...)

# 4. Assemble and start
engine.add_source(orders_source)
engine.add_sink(pg_sink)
engine.run()  # Start processing: Kafka → PostgreSQL

Summary

In this section, we explored the core implementation of the Speed Layer:

  • The Batch Layer provides reliable historical data processing.

  • The Speed Layer delivers real-time responsiveness and streaming data handling.

  • Without the Speed Layer, true real-time capability in the Lambda Architecture is impossible.

Through the Source–Sink–SimpleStreamingEngine architecture, we built:

  • A unified data processing interface

  • A scalable stream processing framework

  • A fully functional Speed Layer implementation

Day 5 Preview: Tackling Performance Bottlenecks

At first, the system runs smoothly — the consumer processes order data without issue.

But under heavy traffic:

  • The console starts showing latency warnings.

  • The consumer’s processing power hits its limit.

  • Orders begin to queue up for processing.

In the next part, we’ll explore Speed Layer performance challenges and how to optimize for high-throughput workloads.

The Modern Backbone for Your
Data Streaming Workloads
GitHubXLinkedInSlackYouTube
Sign up for our to stay updated.