In today’s data-driven world, the ability to process and analyze data in real-time is no longer a luxury—it’s a necessity. After building data pipelines that process millions of events per second, I’ll share the architecture patterns and lessons learned.
The Evolution of Data Processing
Traditional batch processing served us well for years, but modern applications demand:
- Sub-second latency for critical decisions
- Handling late-arriving data gracefully
- Processing unbounded data streams
- Scaling elastically with load
Architecture Overview
Here’s a battle-tested architecture for real-time data processing:
Data Sources → Kafka → Spark Streaming → Data Stores → Analytics ↓ ↓ ↓ Schema Registry Checkpointing Real-time DashboardsSetting Up Apache Kafka
1. Kafka Cluster Configuration
version: '3.8'services: zookeeper: image: confluentinc/cp-zookeeper:latest environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000
kafka1: image: confluentinc/cp-kafka:latest depends_on: - zookeeper environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka1:9092 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3 KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2 KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3 KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 KAFKA_JMX_PORT: 91012. Producer Implementation with Schema Registry
from confluent_kafka import Producerfrom confluent_kafka.schema_registry import SchemaRegistryClientfrom confluent_kafka.schema_registry.avro import AvroSerializer
class KafkaProducer: def __init__(self, bootstrap_servers, schema_registry_url): self.producer = Producer({ 'bootstrap.servers': bootstrap_servers, 'enable.idempotence': True, 'acks': 'all', 'retries': 10, 'max.in.flight.requests.per.connection': 5, 'compression.type': 'snappy', 'linger.ms': 10, 'batch.size': 32768 })
self.schema_registry = SchemaRegistryClient({ 'url': schema_registry_url })
def produce(self, topic, key, value, schema): serializer = AvroSerializer( self.schema_registry, schema )
self.producer.produce( topic=topic, key=key, value=serializer(value, SerializationContext(topic, MessageField.VALUE)), on_delivery=self.delivery_callback )
def delivery_callback(self, err, msg): if err: logger.error(f'Message delivery failed: {err}') else: logger.info(f'Message delivered to {msg.topic()} [{msg.partition()}]')Spark Streaming Processing
1. Structured Streaming Setup
from pyspark.sql import SparkSessionfrom pyspark.sql.functions import *from pyspark.sql.types import *
spark = SparkSession.builder \ .appName("RealTimeDataPipeline") \ .config("spark.sql.adaptive.enabled", "true") \ .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \ .config("spark.streaming.kafka.maxRatePerPartition", "10000") \ .getOrCreate()
# Read from Kafkadf = spark \ .readStream \ .format("kafka") \ .option("kafka.bootstrap.servers", "localhost:9092") \ .option("subscribe", "events") \ .option("startingOffsets", "latest") \ .option("maxOffsetsPerTrigger", "100000") \ .load()2. Complex Event Processing
# Define schemaevent_schema = StructType([ StructField("event_id", StringType(), True), StructField("user_id", StringType(), True), StructField("event_type", StringType(), True), StructField("timestamp", TimestampType(), True), StructField("properties", MapType(StringType(), StringType()), True)])
# Parse and process eventsprocessed_df = df \ .select(from_json(col("value").cast("string"), event_schema).alias("data")) \ .select("data.*") \ .withWatermark("timestamp", "10 minutes")
# Windowed aggregationsuser_activity = processed_df \ .groupBy( window(col("timestamp"), "5 minutes", "1 minute"), col("user_id") ) \ .agg( count("*").alias("event_count"), collect_set("event_type").alias("event_types"), min("timestamp").alias("session_start"), max("timestamp").alias("session_end") )
# Pattern detectionsuspicious_activity = user_activity \ .filter( (col("event_count") > 100) | (array_contains(col("event_types"), "failed_login") & (size(col("event_types")) > 5)) )3. Stateful Stream Processing
def update_user_state(key, values, state): """ Update user state with new events """ # Get existing state user_profile = state.get() or { 'total_events': 0, 'last_activity': None, 'risk_score': 0.0 }
# Update with new events for value in values: user_profile['total_events'] += 1 user_profile['last_activity'] = value['timestamp']
# Update risk score based on behavior if value['event_type'] == 'failed_login': user_profile['risk_score'] = min(1.0, user_profile['risk_score'] + 0.1) elif value['event_type'] == 'successful_login': user_profile['risk_score'] = max(0.0, user_profile['risk_score'] - 0.05)
# Update state state.update(user_profile)
return [(key, user_profile)]
# Apply stateful processingstateful_stream = processed_df \ .groupBy("user_id") \ .applyInPandas(update_user_state, schema=output_schema)Data Quality & Monitoring
1. Data Quality Checks
class DataQualityMonitor: def __init__(self, spark_session): self.spark = spark_session self.metrics = {}
def add_quality_checks(self, df): # Completeness check df = df.withColumn( "completeness_score", when(col("user_id").isNull(), 0) .when(col("event_type").isNull(), 0) .otherwise(1) )
# Timeliness check df = df.withColumn( "latency_seconds", unix_timestamp(current_timestamp()) - unix_timestamp(col("timestamp")) )
# Validity check df = df.withColumn( "is_valid", col("event_type").isin(VALID_EVENT_TYPES) & (col("latency_seconds") < 3600) # Events older than 1 hour are invalid )
return df
def monitor_stream(self, df): # Calculate metrics metrics_df = df \ .groupBy(window(col("timestamp"), "1 minute")) \ .agg( count("*").alias("total_events"), avg("completeness_score").alias("avg_completeness"), avg("latency_seconds").alias("avg_latency"), sum(when(col("is_valid"), 1).otherwise(0)).alias("valid_events") )
# Write metrics to monitoring system metrics_df.writeStream \ .foreachBatch(self.send_to_prometheus) \ .outputMode("update") \ .start()2. Prometheus Integration
from prometheus_client import Counter, Histogram, Gauge
# Define metricsevents_processed = Counter('pipeline_events_processed_total', 'Total events processed')event_latency = Histogram('pipeline_event_latency_seconds', 'Event processing latency')active_users = Gauge('pipeline_active_users', 'Number of active users')
def send_to_prometheus(batch_df, batch_id): metrics = batch_df.collect()
for row in metrics: events_processed.inc(row['total_events']) event_latency.observe(row['avg_latency']) active_users.set(row['active_user_count'])Handling Late Data
1. Watermarking Strategy
# Configure watermarking for late datalate_data_df = processed_df \ .withWatermark("timestamp", "30 minutes") \ .groupBy( window(col("timestamp"), "10 minutes"), col("user_id") ) \ .agg( count("*").alias("event_count") ) \ .writeStream \ .outputMode("update") \ .option("checkpointLocation", "/tmp/checkpoint") \ .trigger(processingTime='10 seconds') \ .start()2. Multi-Stream Joins
# Join user events with user profilesuser_profiles = spark.readStream \ .format("kafka") \ .option("subscribe", "user_profiles") \ .load()
enriched_events = processed_df \ .join( user_profiles, expr(""" processed_df.user_id = user_profiles.user_id AND user_profiles.timestamp >= processed_df.timestamp - interval 24 hours AND user_profiles.timestamp <= processed_df.timestamp """), "leftOuter" )Performance Optimization
1. Partition Optimization
# Repartition for optimal parallelismoptimized_df = processed_df \ .repartition(200, col("user_id")) \ .sortWithinPartitions("timestamp")2. Caching Strategies
# Cache frequently accessed datadimension_data = spark.read \ .format("parquet") \ .load("/data/dimensions/") \ .cache()
# Broadcast small datasetsfrom pyspark.sql.functions import broadcast
enriched_df = processed_df.join( broadcast(dimension_data), "dimension_id")Production Deployment
1. Kubernetes Deployment
apiVersion: apps/v1kind: StatefulSetmetadata: name: spark-streamingspec: serviceName: spark replicas: 3 template: spec: containers: - name: spark image: spark-streaming:latest resources: requests: memory: "4Gi" cpu: "2" limits: memory: "8Gi" cpu: "4" env: - name: SPARK_MODE value: "streaming" - name: CHECKPOINT_DIR value: "s3://bucket/checkpoints"2. Monitoring Dashboard
# Grafana dashboard configurationdashboard = { "dashboard": { "title": "Real-time Data Pipeline", "panels": [ { "title": "Events Per Second", "targets": [ { "expr": "rate(pipeline_events_processed_total[1m])" } ] }, { "title": "Processing Latency", "targets": [ { "expr": "histogram_quantile(0.95, pipeline_event_latency_seconds)" } ] } ] }}Lessons Learned
- Start with Schema Registry: It saves countless hours of debugging
- Plan for Failure: Design for at-least-once processing, handle duplicates downstream
- Monitor Everything: You can’t optimize what you don’t measure
- Test with Production Load: Performance testing with realistic data volumes is crucial
- Automate Recovery: Self-healing systems are essential for 24/7 operations
Conclusion
Building real-time data pipelines is challenging but rewarding. The key is to start simple, measure everything, and iterate. With the right architecture and tools, you can build systems that process millions of events per second reliably.
Remember: the best data pipeline is the one that delivers accurate data on time, not the one with the fanciest technology stack.
Next Steps
- Explore Kafka Streams for simpler use cases
- Implement exactly-once semantics with Kafka transactions
- Add machine learning models to the pipeline
- Build automated data quality frameworks
Happy streaming! 🚀