Use cases_
Real-Time Monitoring and Alerting
RisingWave continuously monitors event streams and detects anomalies within seconds using SQL, triggering instant alerts without Java or complex stream processing frameworks.

What is real-time monitoring and alerting?
Real-time monitoring and alerting is the practice of continuously analyzing event streams to detect anomalies, threshold violations, or policy breaches within seconds — and triggering automated responses before issues escalate. Unlike batch monitoring that processes data on hourly or daily schedules, real-time monitoring provides sub-second detection latency.
Where is real-time monitoring used?
Manufacturing: Monitor production line equipment sensor data in real time. Detect machine breakdowns, quality deviations, or predictive maintenance signals before they cause unplanned downtime.
Financial services: Monitor trading systems and detect fraudulent activities in real time. Trigger alerts for suspicious transactions, compliance violations, or unusual trading patterns.
Energy and utilities: Monitor power grid status, analyze smart meter data for usage anomalies, and issue alerts for equipment failures or safety hazards at facilities.
Infrastructure and DevOps: Monitor application logs, metrics, and traces. Detect service degradation, error rate spikes, or resource exhaustion across distributed systems.
Why use RisingWave for monitoring and alerting?
RisingWave lets you build monitoring pipelines with standard SQL — no Java, no Flink clusters, no complex stream processing frameworks. Define anomaly detection rules as SQL materialized views that update continuously as new data arrives.
Sub-second detection latency: RisingWave processes events and updates materialized views in milliseconds, enabling near-instant anomaly detection and alert triggering.
50+ connectors: Ingest from Kafka, Kinesis, MQTT, PostgreSQL (CDC), MySQL (CDC), and other systems. Deliver alerts to Kafka, webhooks, Slack, or downstream databases.
Complex event processing with SQL: Define multi-way joins, time windows, pattern matching, and aggregations using familiar SQL syntax. No custom Java operators needed.
Dynamic scaling without downtime: Add or remove compute nodes in seconds based on event volume, without stopping your monitoring pipelines.
Sub-second failure recovery: RisingWave persists state in S3-compatible storage. If a node fails, recovery takes seconds — not minutes or hours like Flink checkpoint recovery.
Code snippets
To better illustrate the monitoring and alerting use case, let's explore some code snippets that demonstrate how to implement these functionalities in RisingWave.
1. Create a table to ingest IoT events (with all the device health metrics)
RisingWave enables users of event streaming platforms to directly execute SQL queries for real-time analytics. It also excels in supporting continuous stream processing for real-time monitoring and dashboard applications.
CREATE TABLE iot_events (
device_id VARCHAR(255) NOT NULL,
timestamp TIMESTAMP NOT NULL,
cpu_utilization NUMERIC,
memory_utilization NUMERIC,
disk_utilization NUMERIC,
network_utilization NUMERIC,
battery_level NUMERIC,
temperature NUMERIC
);
This table will store the raw IoT events received from the devices, including various device health metrics such as CPU utilization, memory utilization, disk utilization, network utilization, battery level, and temperature.
2. Create a materialized view to record anomaly records
CREATE MATERIALIZED VIEW anomaly_entries AS
SELECT
device_id,
timestamp,
CASE
WHEN cpu_utilization > 80 THEN 'cpu_utilization'
WHEN memory_utilization > 90 THEN 'memory_utilization'
WHEN disk_utilization > 95 THEN 'disk_utilization'
END AS metric,
CASE
WHEN cpu_utilization > 80 THEN cpu_utilization
WHEN memory_utilization > 90 THEN memory_utilization
WHEN disk_utilization > 95 THEN disk_utilization
END AS value,
NULL AS description -- Placeholder for anomaly description
FROM
iot_events
WHERE
timestamp >= NOW() - INTERVAL '1 day' AND
cpu_utilization > 80 OR memory_utilization > 90 OR disk_utilization > 95;
This materialized view will store anomaly entries based on the raw IoT events. It will contain the device ID, timestamp, metric name (e.g., cpu_utilization), metric value, and a placeholder column for anomaly description. The materialized view will be refreshed every 10 seconds to ensure that it contains the latest anomaly data.
3. Create the Python logic to do alerting (via Webhook)
import psycopg2
import time
import requests
# Connect to the database
connection = psycopg2.connect(
host="localhost",
user="postgres",
password="",
database="dev"
)
# Create a cursor to execute queries
cursor = connection.cursor()
# Define the webhook URL
webhook_url = "<https://example.com/webhook>"
# Create a loop to fetch the new anomaly entries every 10 seconds
while True:
# Execute the query to select new anomaly entries from the materialized view
query = "SELECT * FROM anomaly_entries WHERE timestamp >= NOW() - INTERVAL '10 seconds'"
cursor.execute(query)
# Fetch the results
results = cursor.fetchall()
# Send notifications for each new anomaly entry
for row in results:
# Format the notification message
notification_message = "New anomaly detected: {}".format(row)
# Send the notification via webhook
requests.post(webhook_url, data={"message": notification_message})
# Sleep for 10 seconds
time.sleep(10)
# Close the cursor and the connection
cursor.close()
connection.close()
You can further customize and extend this example to fit your specific monitoring and alerting requirements, such as integrating with external systems, implementing complex alert conditions, or incorporating machine learning models for anomaly detection.