In today’s data-driven world, the ability to process and analyze data in real-time is no longer a luxury—it’s a necessity. After building data pipelines that process millions of events per second, I’ll share the architecture patterns and lessons learned.
The Evolution of Data Processing
Traditional batch processing served us well for years, but modern applications demand:
- Sub-second latency for critical decisions
- Handling late-arriving data gracefully
- Processing unbounded data streams
- Scaling elastically with load
Architecture Overview
Here’s a battle-tested architecture for real-time data processing:
Data Sources → Kafka → Spark Streaming → Data Stores → Analytics
↓ ↓ ↓
Schema Registry Checkpointing Real-time Dashboards
Setting Up Apache Kafka
1. Kafka Cluster Configuration
# kafka-cluster.yaml
version: '3.8'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka1:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka1:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_JMX_PORT: 9101
2. Producer Implementation with Schema Registry
from confluent_kafka import Producer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer
class KafkaProducer:
def __init__(self, bootstrap_servers, schema_registry_url):
self.producer = Producer({
'bootstrap.servers': bootstrap_servers,
'enable.idempotence': True,
'acks': 'all',
'retries': 10,
'max.in.flight.requests.per.connection': 5,
'compression.type': 'snappy',
'linger.ms': 10,
'batch.size': 32768
})
self.schema_registry = SchemaRegistryClient({
'url': schema_registry_url
})
def produce(self, topic, key, value, schema):
serializer = AvroSerializer(
self.schema_registry,
schema
)
self.producer.produce(
topic=topic,
key=key,
value=serializer(value, SerializationContext(topic, MessageField.VALUE)),
on_delivery=self.delivery_callback
)
def delivery_callback(self, err, msg):
if err:
logger.error(f'Message delivery failed: {err}')
else:
logger.info(f'Message delivered to {msg.topic()} [{msg.partition()}]')
Spark Streaming Processing
1. Structured Streaming Setup
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
spark = SparkSession.builder \
.appName("RealTimeDataPipeline") \
.config("spark.sql.adaptive.enabled", "true") \
.config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
.config("spark.streaming.kafka.maxRatePerPartition", "10000") \
.getOrCreate()
# Read from Kafka
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "events") \
.option("startingOffsets", "latest") \
.option("maxOffsetsPerTrigger", "100000") \
.load()
2. Complex Event Processing
# Define schema
event_schema = StructType([
StructField("event_id", StringType(), True),
StructField("user_id", StringType(), True),
StructField("event_type", StringType(), True),
StructField("timestamp", TimestampType(), True),
StructField("properties", MapType(StringType(), StringType()), True)
])
# Parse and process events
processed_df = df \
.select(from_json(col("value").cast("string"), event_schema).alias("data")) \
.select("data.*") \
.withWatermark("timestamp", "10 minutes")
# Windowed aggregations
user_activity = processed_df \
.groupBy(
window(col("timestamp"), "5 minutes", "1 minute"),
col("user_id")
) \
.agg(
count("*").alias("event_count"),
collect_set("event_type").alias("event_types"),
min("timestamp").alias("session_start"),
max("timestamp").alias("session_end")
)
# Pattern detection
suspicious_activity = user_activity \
.filter(
(col("event_count") > 100) |
(array_contains(col("event_types"), "failed_login") &
(size(col("event_types")) > 5))
)
3. Stateful Stream Processing
def update_user_state(key, values, state):
"""
Update user state with new events
"""
# Get existing state
user_profile = state.get() or {
'total_events': 0,
'last_activity': None,
'risk_score': 0.0
}
# Update with new events
for value in values:
user_profile['total_events'] += 1
user_profile['last_activity'] = value['timestamp']
# Update risk score based on behavior
if value['event_type'] == 'failed_login':
user_profile['risk_score'] = min(1.0, user_profile['risk_score'] + 0.1)
elif value['event_type'] == 'successful_login':
user_profile['risk_score'] = max(0.0, user_profile['risk_score'] - 0.05)
# Update state
state.update(user_profile)
return [(key, user_profile)]
# Apply stateful processing
stateful_stream = processed_df \
.groupBy("user_id") \
.applyInPandas(update_user_state, schema=output_schema)
Data Quality & Monitoring
1. Data Quality Checks
class DataQualityMonitor:
def __init__(self, spark_session):
self.spark = spark_session
self.metrics = {}
def add_quality_checks(self, df):
# Completeness check
df = df.withColumn(
"completeness_score",
when(col("user_id").isNull(), 0)
.when(col("event_type").isNull(), 0)
.otherwise(1)
)
# Timeliness check
df = df.withColumn(
"latency_seconds",
unix_timestamp(current_timestamp()) - unix_timestamp(col("timestamp"))
)
# Validity check
df = df.withColumn(
"is_valid",
col("event_type").isin(VALID_EVENT_TYPES) &
(col("latency_seconds") < 3600) # Events older than 1 hour are invalid
)
return df
def monitor_stream(self, df):
# Calculate metrics
metrics_df = df \
.groupBy(window(col("timestamp"), "1 minute")) \
.agg(
count("*").alias("total_events"),
avg("completeness_score").alias("avg_completeness"),
avg("latency_seconds").alias("avg_latency"),
sum(when(col("is_valid"), 1).otherwise(0)).alias("valid_events")
)
# Write metrics to monitoring system
metrics_df.writeStream \
.foreachBatch(self.send_to_prometheus) \
.outputMode("update") \
.start()
2. Prometheus Integration
from prometheus_client import Counter, Histogram, Gauge
# Define metrics
events_processed = Counter('pipeline_events_processed_total', 'Total events processed')
event_latency = Histogram('pipeline_event_latency_seconds', 'Event processing latency')
active_users = Gauge('pipeline_active_users', 'Number of active users')
def send_to_prometheus(batch_df, batch_id):
metrics = batch_df.collect()
for row in metrics:
events_processed.inc(row['total_events'])
event_latency.observe(row['avg_latency'])
active_users.set(row['active_user_count'])
Handling Late Data
1. Watermarking Strategy
# Configure watermarking for late data
late_data_df = processed_df \
.withWatermark("timestamp", "30 minutes") \
.groupBy(
window(col("timestamp"), "10 minutes"),
col("user_id")
) \
.agg(
count("*").alias("event_count")
) \
.writeStream \
.outputMode("update") \
.option("checkpointLocation", "/tmp/checkpoint") \
.trigger(processingTime='10 seconds') \
.start()
2. Multi-Stream Joins
# Join user events with user profiles
user_profiles = spark.readStream \
.format("kafka") \
.option("subscribe", "user_profiles") \
.load()
enriched_events = processed_df \
.join(
user_profiles,
expr("""
processed_df.user_id = user_profiles.user_id AND
user_profiles.timestamp >= processed_df.timestamp - interval 24 hours AND
user_profiles.timestamp <= processed_df.timestamp
"""),
"leftOuter"
)
Performance Optimization
1. Partition Optimization
# Repartition for optimal parallelism
optimized_df = processed_df \
.repartition(200, col("user_id")) \
.sortWithinPartitions("timestamp")
2. Caching Strategies
# Cache frequently accessed data
dimension_data = spark.read \
.format("parquet") \
.load("/data/dimensions/") \
.cache()
# Broadcast small datasets
from pyspark.sql.functions import broadcast
enriched_df = processed_df.join(
broadcast(dimension_data),
"dimension_id"
)
Production Deployment
1. Kubernetes Deployment
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: spark-streaming
spec:
serviceName: spark
replicas: 3
template:
spec:
containers:
- name: spark
image: spark-streaming:latest
resources:
requests:
memory: "4Gi"
cpu: "2"
limits:
memory: "8Gi"
cpu: "4"
env:
- name: SPARK_MODE
value: "streaming"
- name: CHECKPOINT_DIR
value: "s3://bucket/checkpoints"
2. Monitoring Dashboard
# Grafana dashboard configuration
dashboard = {
"dashboard": {
"title": "Real-time Data Pipeline",
"panels": [
{
"title": "Events Per Second",
"targets": [
{
"expr": "rate(pipeline_events_processed_total[1m])"
}
]
},
{
"title": "Processing Latency",
"targets": [
{
"expr": "histogram_quantile(0.95, pipeline_event_latency_seconds)"
}
]
}
]
}
}
Lessons Learned
- Start with Schema Registry: It saves countless hours of debugging
- Plan for Failure: Design for at-least-once processing, handle duplicates downstream
- Monitor Everything: You can’t optimize what you don’t measure
- Test with Production Load: Performance testing with realistic data volumes is crucial
- Automate Recovery: Self-healing systems are essential for 24/7 operations
Conclusion
Building real-time data pipelines is challenging but rewarding. The key is to start simple, measure everything, and iterate. With the right architecture and tools, you can build systems that process millions of events per second reliably.
Remember: the best data pipeline is the one that delivers accurate data on time, not the one with the fanciest technology stack.
Next Steps
- Explore Kafka Streams for simpler use cases
- Implement exactly-once semantics with Kafka transactions
- Add machine learning models to the pipeline
- Build automated data quality frameworks
Happy streaming! 🚀
About Surendra Tamang
Software Engineer specializing in web scraping, data engineering, and full-stack development. Passionate about transforming complex data challenges into elegant solutions that drive business value.
Continue Reading
Explore more insights and technical articles from our blog
Get More Technical Insights
Subscribe to receive weekly articles on web scraping, data engineering, and software development. Join 1000+ developers and engineers who trust our content.
No spam. Unsubscribe anytime.