Building Real-Time Data Pipelines with Apache Kafka and Spark

ST

Surendra Tamang

Software Engineer

48 min read
Share:
Real-time Data Pipeline Architecture

In today’s data-driven world, the ability to process and analyze data in real-time is no longer a luxury—it’s a necessity. After building data pipelines that process millions of events per second, I’ll share the architecture patterns and lessons learned.

The Evolution of Data Processing

Traditional batch processing served us well for years, but modern applications demand:

  • Sub-second latency for critical decisions
  • Handling late-arriving data gracefully
  • Processing unbounded data streams
  • Scaling elastically with load

Architecture Overview

Here’s a battle-tested architecture for real-time data processing:

Data Sources → Kafka → Spark Streaming → Data Stores → Analytics
     ↓                        ↓                ↓
  Schema Registry      Checkpointing      Real-time Dashboards

Setting Up Apache Kafka

1. Kafka Cluster Configuration

# kafka-cluster.yaml
version: '3.8'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    
  kafka1:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka1:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_JMX_PORT: 9101

2. Producer Implementation with Schema Registry

from confluent_kafka import Producer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer

class KafkaProducer:
    def __init__(self, bootstrap_servers, schema_registry_url):
        self.producer = Producer({
            'bootstrap.servers': bootstrap_servers,
            'enable.idempotence': True,
            'acks': 'all',
            'retries': 10,
            'max.in.flight.requests.per.connection': 5,
            'compression.type': 'snappy',
            'linger.ms': 10,
            'batch.size': 32768
        })
        
        self.schema_registry = SchemaRegistryClient({
            'url': schema_registry_url
        })
        
    def produce(self, topic, key, value, schema):
        serializer = AvroSerializer(
            self.schema_registry,
            schema
        )
        
        self.producer.produce(
            topic=topic,
            key=key,
            value=serializer(value, SerializationContext(topic, MessageField.VALUE)),
            on_delivery=self.delivery_callback
        )
        
    def delivery_callback(self, err, msg):
        if err:
            logger.error(f'Message delivery failed: {err}')
        else:
            logger.info(f'Message delivered to {msg.topic()} [{msg.partition()}]')

Spark Streaming Processing

1. Structured Streaming Setup

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

spark = SparkSession.builder \
    .appName("RealTimeDataPipeline") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .config("spark.streaming.kafka.maxRatePerPartition", "10000") \
    .getOrCreate()

# Read from Kafka
df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "events") \
    .option("startingOffsets", "latest") \
    .option("maxOffsetsPerTrigger", "100000") \
    .load()

2. Complex Event Processing

# Define schema
event_schema = StructType([
    StructField("event_id", StringType(), True),
    StructField("user_id", StringType(), True),
    StructField("event_type", StringType(), True),
    StructField("timestamp", TimestampType(), True),
    StructField("properties", MapType(StringType(), StringType()), True)
])

# Parse and process events
processed_df = df \
    .select(from_json(col("value").cast("string"), event_schema).alias("data")) \
    .select("data.*") \
    .withWatermark("timestamp", "10 minutes")

# Windowed aggregations
user_activity = processed_df \
    .groupBy(
        window(col("timestamp"), "5 minutes", "1 minute"),
        col("user_id")
    ) \
    .agg(
        count("*").alias("event_count"),
        collect_set("event_type").alias("event_types"),
        min("timestamp").alias("session_start"),
        max("timestamp").alias("session_end")
    )

# Pattern detection
suspicious_activity = user_activity \
    .filter(
        (col("event_count") > 100) | 
        (array_contains(col("event_types"), "failed_login") & 
         (size(col("event_types")) > 5))
    )

3. Stateful Stream Processing

def update_user_state(key, values, state):
    """
    Update user state with new events
    """
    # Get existing state
    user_profile = state.get() or {
        'total_events': 0,
        'last_activity': None,
        'risk_score': 0.0
    }
    
    # Update with new events
    for value in values:
        user_profile['total_events'] += 1
        user_profile['last_activity'] = value['timestamp']
        
        # Update risk score based on behavior
        if value['event_type'] == 'failed_login':
            user_profile['risk_score'] = min(1.0, user_profile['risk_score'] + 0.1)
        elif value['event_type'] == 'successful_login':
            user_profile['risk_score'] = max(0.0, user_profile['risk_score'] - 0.05)
    
    # Update state
    state.update(user_profile)
    
    return [(key, user_profile)]

# Apply stateful processing
stateful_stream = processed_df \
    .groupBy("user_id") \
    .applyInPandas(update_user_state, schema=output_schema)

Data Quality & Monitoring

1. Data Quality Checks

class DataQualityMonitor:
    def __init__(self, spark_session):
        self.spark = spark_session
        self.metrics = {}
        
    def add_quality_checks(self, df):
        # Completeness check
        df = df.withColumn(
            "completeness_score",
            when(col("user_id").isNull(), 0)
            .when(col("event_type").isNull(), 0)
            .otherwise(1)
        )
        
        # Timeliness check
        df = df.withColumn(
            "latency_seconds",
            unix_timestamp(current_timestamp()) - unix_timestamp(col("timestamp"))
        )
        
        # Validity check
        df = df.withColumn(
            "is_valid",
            col("event_type").isin(VALID_EVENT_TYPES) &
            (col("latency_seconds") < 3600)  # Events older than 1 hour are invalid
        )
        
        return df
    
    def monitor_stream(self, df):
        # Calculate metrics
        metrics_df = df \
            .groupBy(window(col("timestamp"), "1 minute")) \
            .agg(
                count("*").alias("total_events"),
                avg("completeness_score").alias("avg_completeness"),
                avg("latency_seconds").alias("avg_latency"),
                sum(when(col("is_valid"), 1).otherwise(0)).alias("valid_events")
            )
        
        # Write metrics to monitoring system
        metrics_df.writeStream \
            .foreachBatch(self.send_to_prometheus) \
            .outputMode("update") \
            .start()

2. Prometheus Integration

from prometheus_client import Counter, Histogram, Gauge

# Define metrics
events_processed = Counter('pipeline_events_processed_total', 'Total events processed')
event_latency = Histogram('pipeline_event_latency_seconds', 'Event processing latency')
active_users = Gauge('pipeline_active_users', 'Number of active users')

def send_to_prometheus(batch_df, batch_id):
    metrics = batch_df.collect()
    
    for row in metrics:
        events_processed.inc(row['total_events'])
        event_latency.observe(row['avg_latency'])
        active_users.set(row['active_user_count'])

Handling Late Data

1. Watermarking Strategy

# Configure watermarking for late data
late_data_df = processed_df \
    .withWatermark("timestamp", "30 minutes") \
    .groupBy(
        window(col("timestamp"), "10 minutes"),
        col("user_id")
    ) \
    .agg(
        count("*").alias("event_count")
    ) \
    .writeStream \
    .outputMode("update") \
    .option("checkpointLocation", "/tmp/checkpoint") \
    .trigger(processingTime='10 seconds') \
    .start()

2. Multi-Stream Joins

# Join user events with user profiles
user_profiles = spark.readStream \
    .format("kafka") \
    .option("subscribe", "user_profiles") \
    .load()

enriched_events = processed_df \
    .join(
        user_profiles,
        expr("""
            processed_df.user_id = user_profiles.user_id AND
            user_profiles.timestamp >= processed_df.timestamp - interval 24 hours AND
            user_profiles.timestamp <= processed_df.timestamp
        """),
        "leftOuter"
    )

Performance Optimization

1. Partition Optimization

# Repartition for optimal parallelism
optimized_df = processed_df \
    .repartition(200, col("user_id")) \
    .sortWithinPartitions("timestamp")

2. Caching Strategies

# Cache frequently accessed data
dimension_data = spark.read \
    .format("parquet") \
    .load("/data/dimensions/") \
    .cache()

# Broadcast small datasets
from pyspark.sql.functions import broadcast

enriched_df = processed_df.join(
    broadcast(dimension_data),
    "dimension_id"
)

Production Deployment

1. Kubernetes Deployment

apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: spark-streaming
spec:
  serviceName: spark
  replicas: 3
  template:
    spec:
      containers:
      - name: spark
        image: spark-streaming:latest
        resources:
          requests:
            memory: "4Gi"
            cpu: "2"
          limits:
            memory: "8Gi"
            cpu: "4"
        env:
        - name: SPARK_MODE
          value: "streaming"
        - name: CHECKPOINT_DIR
          value: "s3://bucket/checkpoints"

2. Monitoring Dashboard

# Grafana dashboard configuration
dashboard = {
    "dashboard": {
        "title": "Real-time Data Pipeline",
        "panels": [
            {
                "title": "Events Per Second",
                "targets": [
                    {
                        "expr": "rate(pipeline_events_processed_total[1m])"
                    }
                ]
            },
            {
                "title": "Processing Latency",
                "targets": [
                    {
                        "expr": "histogram_quantile(0.95, pipeline_event_latency_seconds)"
                    }
                ]
            }
        ]
    }
}

Lessons Learned

  1. Start with Schema Registry: It saves countless hours of debugging
  2. Plan for Failure: Design for at-least-once processing, handle duplicates downstream
  3. Monitor Everything: You can’t optimize what you don’t measure
  4. Test with Production Load: Performance testing with realistic data volumes is crucial
  5. Automate Recovery: Self-healing systems are essential for 24/7 operations

Conclusion

Building real-time data pipelines is challenging but rewarding. The key is to start simple, measure everything, and iterate. With the right architecture and tools, you can build systems that process millions of events per second reliably.

Remember: the best data pipeline is the one that delivers accurate data on time, not the one with the fanciest technology stack.

Next Steps

  • Explore Kafka Streams for simpler use cases
  • Implement exactly-once semantics with Kafka transactions
  • Add machine learning models to the pipeline
  • Build automated data quality frameworks

Happy streaming! 🚀

ST

About Surendra Tamang

Software Engineer specializing in web scraping, data engineering, and full-stack development. Passionate about transforming complex data challenges into elegant solutions that drive business value.

Get More Technical Insights

Subscribe to receive weekly articles on web scraping, data engineering, and software development. Join 1000+ developers and engineers who trust our content.

No spam. Unsubscribe anytime.

Continue Reading

Get Technical Insights

Subscribe to receive weekly articles on web scraping, data engineering, and software development.