Streaming Join Principles: Bidirectional State Management Guide

Streaming Join Principles: Bidirectional State Management Guide

Key Takeaways

  • Core Concept: The Bidirectional Build-Probe model is the foundation of stream-to-stream joins.
  • Key Challenge: Infinite data streams require intelligent state management and cleanup to handle out-of-order events.
  • Solution: Encapsulating join logic within the streaming engine removes external database dependencies, enabling true event-driven architecture.

What is a Streaming Join? A Streaming Join is a stateful operation that combines two unbounded event streams in real-time based on a common key. Unlike traditional database joins, it requires bidirectional state management to handle out-of-order data and ensure that events arriving at different times can still be matched.


Do you remember the concept of stateful operations we discussed on Day 12? And the Database Lookup Join we implemented earlier?

Today, we take it a step further and dive into the true mindset of real-time data processing: Stream-to-Stream Join. Instead of relying on an external database for lookups, we let two streams match events directly. This is where the real power of streaming systems and event-driven architecture emerges.

Streaming Join vs. Traditional RDB Join

In relational databases, a JOIN combines two tables based on a condition. In the world of real-time data integration, however, a “table” becomes an unbounded stream of events. This fundamentally changes the nature of JOIN—it becomes continuous, stateful, and time-aware.

Traditional RDB Join: The Build–Probe Model

Step 1: Build Phase
    Table A
    (smaller)         ────────────────────> Hash Table
      │                                   (key→value index)
      │                                       │
      └─ Load all data into memory            │
                                              │
Step 2: Probe Phase                           │
    Table B                                   │
    (larger)                                  │
      │                                       │
      ├─ row1 ──────────┐                     │
      ├─ row2 ──────────┼─ Lookup ───────────►│
      ├─ row3 ──────────┼─ Hash Table         │
      ├─ ...  ──────────┘                     │
      │                                       │
      │                   Match Found ◄───────┘
      │                      │
      ▼                      │
    Join Result ◄────────────┘

Characteristics:

  • One side builds an index; the other probes for matches.
  • The smaller table is fully loaded into memory to build a hash table.
  • The larger table is scanned row by row and probed against the hash table.
  • It is a finite, one-time operation with a clear start and end.

Streaming Join: Bidirectional State Management

In a streaming context, we cannot simply "load all data" because the data never ends. This approach is consistent with the implementation logic found in modern stream processors like Apache Flink and Kafka Streams.

   Stream A Processing                 Stream B Processing
    ┌─────────────────┐                ┌─────────────────┐
    │                 │                │                 │
    │ Event A         │                │ Event B         │
    │      │          │                │      │          │
    │      ▼          │                │      ▼          │
    │ Store in        │                │ Store in        │
    │ State A         │                │ State B         │
    │      │          │                │      │          │
    │      ▼          │                │      ▼          │
    │ Query State B ──┼────────────────┼── Query State A │
    │      │          │                │      │          │
    │      ▼          │                │      ▼          │
    │ Emit Result     │                │ Emit Result     │
    │                 │                │                 │
    └─────────────────┘                └─────────────────┘
            │                                    │
            │        State Management            │
            │    ┌─────────────────────────┐     │
            └───►│ State Store A           │◄────┘
                 │ Key → [timestamp,value] │
                 │                         │
                 │ State Store B           │
                 │ Key → [timestamp,value] │
                 └─────────────────────────┘

Key differences:

  • Bidirectional Build + Probe: Both streams must maintain state and probe the other side.
  • Continuous operation: There is no defined end; data keeps flowing indefinitely.
  • Time semantics: Event timestamps and arrival order matter.
  • State management: Historical data must be retained and cleaned up intelligently (Stateful computing).

Implementing Streaming Join in Simple Streaming

Important Note

All code in this article is pseudo code for educational purposes. It is meant to illustrate the architectural ideas and core design concepts behind stream processing. The code is not production-ready.

For clarity, we simplify earlier implementation details and focus specifically on today’s core mechanism.

Step 1: Designing the State Store

class SimpleDataFrame:
    def __init__(self, name: str = "dataframe", sink_callback=None):
        self.name = name
        # Core of Streaming Join: bidirectional state storage
        # Implementing a State Store (similar to Flink's Keyed State)
        self._join_state = defaultdict(list)  # key -> [events...]
        self._join_partner = None             # the paired DataFrame
        self._join_key = None                 # join field name

Internal Structure of _join_state

{
  "O001": [event1, event2, ...],  # one key may correspond to multiple events
  "O002": [event3, event4, ...],
}

Design Considerations

  • defaultdict(list) supports one-to-many relationships.
  • A single order_id may have multiple associated detail events.

Step 2: Establishing Bidirectional Association in join()

def join(self, other: 'SimpleDataFrame', on: str) -> 'SimpleDataFrame':
    # 1. Create a new DataFrame to collect join results
    joined_df = SimpleDataFrame(f"{self.name}_join_{other.name}")

    # 2. Establish bidirectional association
    self._join_partner = other
    other._join_partner = self
    self._join_key = on
    other._join_key = on

    # 3. Intercept original processing logic to inject join functionality
    self_original_process = self.process_message

    def enhanced_process(message):
        # First execute original logic (filter, transform, etc.)
        result = self_original_process(message)

        # Then apply join logic
        join_results = self._process_join_event(message)
        for join_result in join_results:
            joined_df.process_message(join_result)

        return result

    self.process_message = enhanced_process
    return joined_df

Core Design Principles

  • Bidirectional references: Each DataFrame knows its join partner.
  • Event interception: Join logic is injected without breaking existing behavior.
  • Backward compatibility: Existing DataFrame functionality remains intact.

Design Advantages

  • Backward compatible: filters and sinks continue to work.
  • Separation of concerns: join results are handled independently.
  • Chainable operations: joined results can be further processed.

Step 3: The Core Join Algorithm

def _process_join_event(self, event) -> List[Dict]:
    if not self._join_partner:
        return []

    join_value = str(event.get(self._join_key))
    results = []

    # 1. Store: save current event into state (Stateful operation)
    self._join_state[join_value].append(event)

    # 2. Query: look up matching events in partner's state
    partner_events = self._join_partner._join_state.get(join_value, [])

    # 3. Merge: combine with each matched event
    for partner_event in partner_events:
        merged = {**event, **partner_event}
        results.append(merged)

    return results

Algorithm Steps

  1. Store: Persist the current event in local state by join key.
  2. Query: Look up matching events in the partner’s state store.
  3. Merge: Combine matched events and emit results.

Step 4: End-to-End Execution Flow

Let’s walk through a complete example:

# Define the join relationship
orders_df = app.dataframe(source=orders_source)
details_df = app.dataframe(source=details_source)
joined_df = orders_df.join(details_df, on="order_id")

# Event arrival order
1. detail = {"order_id": "O001", "product": "咖啡", "qty": 2}
2. order = {"order_id": "O001", "user_id": "U123", "total": 500}

Event 1: Detail Arrives First

# details_df receives the detail event:

join_value = "O001"

# 1. Store in local state
details_df._join_state["O001"] = [{"order_id": "O001", "product": "咖啡", "qty": 2}]

# 2. Query partner (orders_df)
orders_df._join_state.get("O001", [])  # returns [], no order yet

# 3. No match, no output
results = []

Event 2: Order Arrives Later

# orders_df receives the order event:

join_value = "O001"

# 1. Store in local state
orders_df._join_state["O001"] = [{"order_id": "O001", "user_id": "U123", "total": 500}]

# 2. Query partner (details_df)
details_df._join_state.get("O001", [])
# Found! [{"order_id": "O001", "product": "咖啡", "qty": 2}]

# 3. Merge events
merged = {
    **{"order_id": "O001", "user_id": "U123", "total": 500},    # current event
    **{"order_id": "O001", "product": "咖啡", "qty": 2}         # matched event
}
# Output:
# {"order_id": "O001", "user_id": "U123", "total": 500, "product": "咖啡", "qty": 2}

results = [merged]

Regardless of arrival order, the join eventually succeeds because both sides retain state.

Design Advantages Summary

  1. Clean API: df1.join(df2, on="key") is intuitive and expressive.
  2. Automatic state handling: State management and routing are transparent.
  3. Non-intrusive design: Existing DataFrame operations remain unaffected.
  4. Composable: Join results can be chained with further transformations.
  5. Out-of-order tolerant: Matching works regardless of arrival order.

This implementation demonstrates how complex streaming join logic can be encapsulated behind a simple API, allowing developers to focus on business logic rather than low-level state mechanics.

Architecture Recap

Day 13: SimpleStreamingEngine with Streaming JOIN

┌─────────────────┐    ┌──────────────────┐    ┌──────────────────┐
│   KafkaSource   │───►│DataFrame         │───►│PostgreSQLSink    │
│                 │    │                  │    │                  │
│ • Topic consume │    │ • Filter         │    │ • Batch insert   │
│ • Consumer pause│    │ • Lookup Join    │    │ • Timer trigger  │
│ • Offset resume │    │ • Streaming Join │    │ • Overload detect│
└─────────────────┘    └──────────────────┘    └──────────────────┘
         ▲                                               │
         │         ┌─────────────────────────────────────┘
         │         │
         │         ▼
┌─────────────────────────────────────────────────────────────┐
│              SimpleStreamingEngine                          │
│  • Backpressure                                             │
│  • Checkpoint                                               │
│  • State Storage                                            │
└─────────────────────────────────────────────────────────────┘

Newly Introduced Capabilities

Day 10

  • Lookup Join: DataFrame can enrich data by querying a database.

Day 13

  • Streaming Join: Direct join between two streams.
  • State storage: Bidirectional state management for event matching and out-of-order handling.

FAQ: Common Questions on Streaming Joins

Q: How does Streaming Join handle late-arriving data? A: By using bidirectional state stores, the system retains events from both streams. When a late event arrives, it is stored and then used to probe the partner's state, allowing it to match with previously stored counterparts regardless of the delay.

Q: What is the main difference between a Lookup Join and a Streaming Join? A: A Lookup Join enriches a stream by querying an external static or slowly changing dimension table (usually in a database). A Streaming Join correlates two dynamic, unbounded event streams in real-time within the streaming engine itself.

Q: How do you prevent state from growing indefinitely? A: In production systems like Apache Flink, state is managed using Time-to-Live (TTL) configurations or windowing strategies to automatically clean up old data that is no longer expected to match.

Conclusion

Streaming Join represents a major step forward—from simple event forwarding to intelligent stream correlation.

By introducing bidirectional state management, two independent streams can become aware of each other, wait for matching counterparts, and ultimately meet within the streaming layer itself. This not only solves out-of-order arrival challenges but, more importantly, eliminates the dependency on external databases. Data processing can now be completed entirely within the streaming domain.

Up Next: Day 14 – Streaming GroupBy

If JOIN is the “encounter between streams,” then GROUP BY is “a stream organizing itself.”

In Day 14, we’ll explore how to implement Streaming GroupBy. Stay tuned.


The Modern Backbone for
Real-Time Data and AI
GitHubXLinkedInSlackYouTube
Sign up for our to stay updated.