Master Web Scraping with Scrapy - Part 5

Master Web Scraping - Part 5: Production Deployment

ST

Surendra Tamang

β€’ 60 min read β€’ expert

Prerequisites

  • Completed Parts 1-4 of this series
  • Knowledge of Docker and containerization
  • Understanding of CI/CD concepts
  • Basic knowledge of cloud platforms
  • Experience with monitoring tools

Master Web Scraping with Scrapy: Production Deployment

Welcome to the final part of our comprehensive Scrapy series! In this expert-level tutorial, we’ll transform your scraping projects into production-ready, enterprise-grade systems that can handle massive scale, ensure reliability, and maintain security in real-world environments.

What You’ll Learn in This Part

  • Docker containerization and multi-stage builds
  • Kubernetes orchestration and scaling
  • CI/CD pipelines with automated testing
  • Cloud deployment (AWS, GCP, Azure)
  • Production monitoring and observability
  • Security hardening and compliance
  • Auto-scaling and resource optimization
  • Disaster recovery and backup strategies
  • Performance tuning and optimization

Docker Containerization

Multi-Stage Dockerfile

# webscraper/Dockerfile
# Multi-stage build for optimized production image
# Build stage
FROM python:3.11-slim as builder
# Set environment variables
ENV PYTHONDONTWRITEBYTECODE=1 \
PYTHONUNBUFFERED=1 \
PIP_NO_CACHE_DIR=1 \
PIP_DISABLE_PIP_VERSION_CHECK=1
# Install system dependencies
RUN apt-get update && apt-get install -y \
gcc \
g++ \
libxml2-dev \
libxslt-dev \
libffi-dev \
libssl-dev \
build-essential \
&& rm -rf /var/lib/apt/lists/*
# Create and activate virtual environment
RUN python -m venv /opt/venv
ENV PATH="/opt/venv/bin:$PATH"
# Install Python dependencies
COPY requirements.txt .
RUN pip install --upgrade pip && \
pip install -r requirements.txt
# Production stage
FROM python:3.11-slim as production
# Set environment variables
ENV PYTHONDONTWRITEBYTECODE=1 \
PYTHONUNBUFFERED=1 \
PATH="/opt/venv/bin:$PATH"
# Create non-root user
RUN groupadd -r scrapy && useradd -r -g scrapy scrapy
# Install runtime dependencies
RUN apt-get update && apt-get install -y \
curl \
ca-certificates \
&& rm -rf /var/lib/apt/lists/*
# Copy virtual environment from builder stage
COPY --from=builder /opt/venv /opt/venv
# Create application directory
WORKDIR /app
# Copy application code
COPY --chown=scrapy:scrapy . .
# Create necessary directories
RUN mkdir -p /app/logs /app/data /app/exports && \
chown -R scrapy:scrapy /app
# Switch to non-root user
USER scrapy
# Health check
HEALTHCHECK --interval=30s --timeout=10s --start-period=60s --retries=3 \
CMD curl -f http://localhost:6023/ || exit 1
# Default command
CMD ["scrapy", "list"]
# Development stage
FROM production as development
USER root
# Install development dependencies
RUN pip install pytest pytest-cov black flake8 mypy
# Install debugging tools
RUN apt-get update && apt-get install -y \
vim \
htop \
net-tools \
&& rm -rf /var/lib/apt/lists/*
USER scrapy
# Override default command for development
CMD ["tail", "-f", "/dev/null"]

Docker Compose for Development

docker-compose.yml
version: '3.8'
services:
# Main scraper service
scraper:
build:
context: .
target: development
dockerfile: Dockerfile
volumes:
- .:/app
- ./data:/app/data
- ./logs:/app/logs
environment:
- SCRAPY_SETTINGS_MODULE=webscraper.settings.development
- REDIS_URL=redis://redis:6379/0
- MONGO_URI=mongodb://mongo:27017/scrapy_dev
- POSTGRES_HOST=postgres
- POSTGRES_DB=scrapy_dev
- POSTGRES_USER=scrapy
- POSTGRES_PASSWORD=scrapy_password
depends_on:
- redis
- mongo
- postgres
networks:
- scrapy-network
# Redis for distributed scraping
redis:
image: redis:7-alpine
ports:
- "6379:6379"
volumes:
- redis-data:/data
command: redis-server --appendonly yes
networks:
- scrapy-network
# MongoDB for document storage
mongo:
image: mongo:6
ports:
- "27017:27017"
volumes:
- mongo-data:/data/db
environment:
- MONGO_INITDB_ROOT_USERNAME=admin
- MONGO_INITDB_ROOT_PASSWORD=admin_password
networks:
- scrapy-network
# PostgreSQL for structured data
postgres:
image: postgres:15
ports:
- "5432:5432"
volumes:
- postgres-data:/var/lib/postgresql/data
environment:
- POSTGRES_DB=scrapy_dev
- POSTGRES_USER=scrapy
- POSTGRES_PASSWORD=scrapy_password
networks:
- scrapy-network
# Elasticsearch for search and analytics
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:8.8.0
ports:
- "9200:9200"
volumes:
- es-data:/usr/share/elasticsearch/data
environment:
- discovery.type=single-node
- xpack.security.enabled=false
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
networks:
- scrapy-network
# Kibana for data visualization
kibana:
image: docker.elastic.co/kibana/kibana:8.8.0
ports:
- "5601:5601"
environment:
- ELASTICSEARCH_HOSTS=http://elasticsearch:9200
depends_on:
- elasticsearch
networks:
- scrapy-network
# Prometheus for monitoring
prometheus:
image: prom/prometheus:latest
ports:
- "9090:9090"
volumes:
- ./monitoring/prometheus.yml:/etc/prometheus/prometheus.yml
- prometheus-data:/prometheus
command:
- '--config.file=/etc/prometheus/prometheus.yml'
- '--storage.tsdb.path=/prometheus'
- '--web.console.libraries=/etc/prometheus/console_libraries'
- '--web.console.templates=/etc/prometheus/consoles'
networks:
- scrapy-network
# Grafana for dashboards
grafana:
image: grafana/grafana:latest
ports:
- "3000:3000"
volumes:
- grafana-data:/var/lib/grafana
- ./monitoring/grafana:/etc/grafana/provisioning
environment:
- GF_SECURITY_ADMIN_PASSWORD=admin
depends_on:
- prometheus
networks:
- scrapy-network
# Splash for JavaScript rendering
splash:
image: scrapinghub/splash:latest
ports:
- "8050:8050"
command: --max-timeout=3600 --slots=5
networks:
- scrapy-network
volumes:
redis-data:
mongo-data:
postgres-data:
es-data:
prometheus-data:
grafana-data:
networks:
scrapy-network:
driver: bridge

Production Docker Compose

docker-compose.prod.yml
version: '3.8'
services:
# Load balancer
nginx:
image: nginx:alpine
ports:
- "80:80"
- "443:443"
volumes:
- ./nginx/nginx.conf:/etc/nginx/nginx.conf
- ./nginx/ssl:/etc/nginx/ssl
depends_on:
- scraper-coordinator
networks:
- scrapy-network
# Coordinator service
scraper-coordinator:
build:
context: .
target: production
command: ["python", "scripts/coordinator.py"]
environment:
- SCRAPY_SETTINGS_MODULE=webscraper.settings.production
- REDIS_URL=redis://redis-cluster:6379/0
- SENTRY_DSN=${SENTRY_DSN}
deploy:
replicas: 1
resources:
limits:
cpus: '0.5'
memory: 512M
reservations:
cpus: '0.25'
memory: 256M
depends_on:
- redis-cluster
networks:
- scrapy-network
# Worker services
scraper-worker:
build:
context: .
target: production
command: ["python", "scripts/worker.py"]
environment:
- SCRAPY_SETTINGS_MODULE=webscraper.settings.production
- REDIS_URL=redis://redis-cluster:6379/0
- WORKER_ID=${HOSTNAME}
deploy:
replicas: 3
resources:
limits:
cpus: '1.0'
memory: 1G
reservations:
cpus: '0.5'
memory: 512M
depends_on:
- redis-cluster
- scraper-coordinator
networks:
- scrapy-network
# Redis cluster
redis-cluster:
image: redis:7-alpine
command: redis-server --appendonly yes --maxmemory 256mb --maxmemory-policy allkeys-lru
volumes:
- redis-prod-data:/data
deploy:
resources:
limits:
cpus: '0.5'
memory: 512M
networks:
- scrapy-network
# Monitoring stack
prometheus:
image: prom/prometheus:latest
volumes:
- ./monitoring/prometheus.prod.yml:/etc/prometheus/prometheus.yml
- prometheus-prod-data:/prometheus
deploy:
resources:
limits:
cpus: '0.3'
memory: 256M
networks:
- scrapy-network
grafana:
image: grafana/grafana:latest
volumes:
- grafana-prod-data:/var/lib/grafana
environment:
- GF_SECURITY_ADMIN_PASSWORD=${GRAFANA_PASSWORD}
- GF_INSTALL_PLUGINS=grafana-piechart-panel
deploy:
resources:
limits:
cpus: '0.3'
memory: 256M
networks:
- scrapy-network
volumes:
redis-prod-data:
prometheus-prod-data:
grafana-prod-data:
networks:
scrapy-network:
driver: overlay
attachable: true

Kubernetes Deployment

Kubernetes Manifests

k8s/namespace.yaml
apiVersion: v1
kind: Namespace
metadata:
name: scrapy-production
labels:
name: scrapy-production
---
# k8s/configmap.yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: scrapy-config
namespace: scrapy-production
data:
SCRAPY_SETTINGS_MODULE: "webscraper.settings.production"
REDIS_URL: "redis://redis-service:6379/0"
LOG_LEVEL: "INFO"
CONCURRENT_REQUESTS: "16"
DOWNLOAD_DELAY: "1"
---
# k8s/secret.yaml
apiVersion: v1
kind: Secret
metadata:
name: scrapy-secrets
namespace: scrapy-production
type: Opaque
data:
# Base64 encoded values
POSTGRES_PASSWORD: c2NyYXB5X3Bhc3N3b3Jk # scrapy_password
MONGO_PASSWORD: bW9uZ29fcGFzc3dvcmQ= # mongo_password
SENTRY_DSN: aHR0cHM6Ly9zZW50cnkuaW8= # https://sentry.io
---
# k8s/redis-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: redis
namespace: scrapy-production
spec:
replicas: 1
selector:
matchLabels:
app: redis
template:
metadata:
labels:
app: redis
spec:
containers:
- name: redis
image: redis:7-alpine
ports:
- containerPort: 6379
command: ["redis-server"]
args: ["--appendonly", "yes", "--maxmemory", "256mb"]
resources:
requests:
memory: "128Mi"
cpu: "100m"
limits:
memory: "512Mi"
cpu: "500m"
volumeMounts:
- name: redis-storage
mountPath: /data
volumes:
- name: redis-storage
persistentVolumeClaim:
claimName: redis-pvc
---
apiVersion: v1
kind: Service
metadata:
name: redis-service
namespace: scrapy-production
spec:
selector:
app: redis
ports:
- port: 6379
targetPort: 6379
type: ClusterIP
---
# k8s/coordinator-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: scraper-coordinator
namespace: scrapy-production
spec:
replicas: 1
selector:
matchLabels:
app: scraper-coordinator
template:
metadata:
labels:
app: scraper-coordinator
spec:
containers:
- name: coordinator
image: your-registry/webscraper:latest
command: ["python", "scripts/coordinator.py"]
envFrom:
- configMapRef:
name: scrapy-config
- secretRef:
name: scrapy-secrets
resources:
requests:
memory: "256Mi"
cpu: "200m"
limits:
memory: "512Mi"
cpu: "500m"
ports:
- containerPort: 8080
livenessProbe:
httpGet:
path: /health
port: 8080
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /ready
port: 8080
initialDelaySeconds: 5
periodSeconds: 5
---
# k8s/worker-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: scraper-worker
namespace: scrapy-production
spec:
replicas: 3
selector:
matchLabels:
app: scraper-worker
template:
metadata:
labels:
app: scraper-worker
spec:
containers:
- name: worker
image: your-registry/webscraper:latest
command: ["python", "scripts/worker.py"]
envFrom:
- configMapRef:
name: scrapy-config
- secretRef:
name: scrapy-secrets
env:
- name: WORKER_ID
valueFrom:
fieldRef:
fieldPath: metadata.name
resources:
requests:
memory: "512Mi"
cpu: "300m"
limits:
memory: "1Gi"
cpu: "1000m"
ports:
- containerPort: 8081
livenessProbe:
httpGet:
path: /health
port: 8081
initialDelaySeconds: 60
periodSeconds: 30
readinessProbe:
httpGet:
path: /ready
port: 8081
initialDelaySeconds: 10
periodSeconds: 10
---
# k8s/hpa.yaml
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: scraper-worker-hpa
namespace: scrapy-production
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: scraper-worker
minReplicas: 2
maxReplicas: 10
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
- type: Resource
resource:
name: memory
target:
type: Utilization
averageUtilization: 80
behavior:
scaleUp:
stabilizationWindowSeconds: 300
policies:
- type: Percent
value: 100
periodSeconds: 15
scaleDown:
stabilizationWindowSeconds: 300
policies:
- type: Percent
value: 10
periodSeconds: 60
---
# k8s/cronjob.yaml
apiVersion: batch/v1
kind: CronJob
metadata:
name: scraper-job
namespace: scrapy-production
spec:
schedule: "0 */6 * * *" # Every 6 hours
jobTemplate:
spec:
template:
spec:
containers:
- name: scraper
image: your-registry/webscraper:latest
command: ["scrapy", "crawl", "ecommerce"]
envFrom:
- configMapRef:
name: scrapy-config
- secretRef:
name: scrapy-secrets
resources:
requests:
memory: "1Gi"
cpu: "500m"
limits:
memory: "2Gi"
cpu: "1000m"
restartPolicy: OnFailure
successfulJobsHistoryLimit: 3
failedJobsHistoryLimit: 1
---
# k8s/pvc.yaml
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: redis-pvc
namespace: scrapy-production
spec:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 10Gi
storageClassName: fast-ssd

CI/CD Pipeline

GitHub Actions Workflow

.github/workflows/ci-cd.yml
name: CI/CD Pipeline
on:
push:
branches: [main, develop]
pull_request:
branches: [main]
env:
REGISTRY: ghcr.io
IMAGE_NAME: ${{ github.repository }}
jobs:
# Test and quality checks
test:
runs-on: ubuntu-latest
services:
redis:
image: redis
options: >-
--health-cmd "redis-cli ping"
--health-interval 10s
--health-timeout 5s
--health-retries 5
ports:
- 6379:6379
steps:
- uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.11'
- name: Cache dependencies
uses: actions/cache@v3
with:
path: ~/.cache/pip
key: ${{ runner.os }}-pip-${{ hashFiles('**/requirements.txt') }}
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -r requirements.txt
pip install -r requirements-dev.txt
- name: Code formatting check
run: |
black --check --diff .
- name: Linting
run: |
flake8 webscraper tests
- name: Type checking
run: |
mypy webscraper
- name: Security scan
run: |
bandit -r webscraper
- name: Run tests
run: |
pytest tests/ --cov=webscraper --cov-report=xml --cov-report=html
env:
REDIS_URL: redis://localhost:6379/0
- name: Upload coverage
uses: codecov/codecov-action@v3
with:
token: ${{ secrets.CODECOV_TOKEN }}
file: ./coverage.xml
# Build and push Docker image
build:
needs: test
runs-on: ubuntu-latest
permissions:
contents: read
packages: write
steps:
- uses: actions/checkout@v4
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
- name: Login to Container Registry
uses: docker/login-action@v3
with:
registry: ${{ env.REGISTRY }}
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
- name: Extract metadata
id: meta
uses: docker/metadata-action@v5
with:
images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}
tags: |
type=ref,event=branch
type=ref,event=pr
type=sha,prefix={{branch}}-
type=raw,value=latest,enable={{is_default_branch}}
- name: Build and push
uses: docker/build-push-action@v5
with:
context: .
target: production
push: true
tags: ${{ steps.meta.outputs.tags }}
labels: ${{ steps.meta.outputs.labels }}
cache-from: type=gha
cache-to: type=gha,mode=max
# Deploy to staging
deploy-staging:
needs: build
runs-on: ubuntu-latest
if: github.ref == 'refs/heads/develop'
environment: staging
steps:
- uses: actions/checkout@v4
- name: Configure kubectl
uses: azure/k8s-set-context@v3
with:
method: kubeconfig
kubeconfig: ${{ secrets.KUBE_CONFIG_STAGING }}
- name: Deploy to staging
run: |
# Update image in k8s manifests
sed -i "s|your-registry/webscraper:latest|${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:develop|g" k8s/*.yaml
# Apply manifests
kubectl apply -f k8s/ -n scrapy-staging
# Wait for rollout
kubectl rollout status deployment/scraper-coordinator -n scrapy-staging
kubectl rollout status deployment/scraper-worker -n scrapy-staging
- name: Run smoke tests
run: |
# Wait for services to be ready
kubectl wait --for=condition=ready pod -l app=scraper-coordinator -n scrapy-staging --timeout=300s
# Run basic smoke tests
python scripts/smoke_tests.py --environment=staging
# Deploy to production
deploy-production:
needs: build
runs-on: ubuntu-latest
if: github.ref == 'refs/heads/main'
environment: production
steps:
- uses: actions/checkout@v4
- name: Configure kubectl
uses: azure/k8s-set-context@v3
with:
method: kubeconfig
kubeconfig: ${{ secrets.KUBE_CONFIG_PRODUCTION }}
- name: Deploy to production
run: |
# Update image in k8s manifests
sed -i "s|your-registry/webscraper:latest|${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:latest|g" k8s/*.yaml
# Apply manifests with rolling update
kubectl apply -f k8s/ -n scrapy-production
# Wait for rollout
kubectl rollout status deployment/scraper-coordinator -n scrapy-production --timeout=600s
kubectl rollout status deployment/scraper-worker -n scrapy-production --timeout=600s
- name: Verify deployment
run: |
# Check pod health
kubectl get pods -n scrapy-production
# Run production health checks
python scripts/health_check.py --environment=production
- name: Notify deployment
uses: 8398a7/action-slack@v3
with:
status: ${{ job.status }}
channel: '#deployments'
webhook_url: ${{ secrets.SLACK_WEBHOOK }}
if: always()

Testing Framework

tests/test_spiders.py
import pytest
import responses
from scrapy.http import HtmlResponse, Request
from webscraper.spiders.ecommerce_spider import EcommerceSpider
from webscraper.items import ProductItem
class TestEcommerceSpider:
@pytest.fixture
def spider(self):
return EcommerceSpider()
@pytest.fixture
def sample_product_html(self):
return """
<html>
<body>
<h1 class="product-title">Test Product</h1>
<span class="price-current">$99.99</span>
<div class="product-description">
<p>This is a test product description</p>
</div>
<span class="brand-name">TestBrand</span>
<span class="rating-value">4.5</span>
<div class="stock-status">In Stock</div>
</body>
</html>
"""
def create_response(self, html, url="http://test.com"):
request = Request(url=url)
return HtmlResponse(url=url, request=request, body=html.encode('utf-8'))
def test_parse_product_basic(self, spider, sample_product_html):
"""Test basic product parsing"""
response = self.create_response(sample_product_html)
items = list(spider.parse_product(response))
assert len(items) == 1
item = items[0]
assert item['name'] == 'Test Product'
assert item['price'] == 99.99
assert item['brand'] == 'TestBrand'
assert item['rating'] == 4.5
def test_parse_product_missing_fields(self, spider):
"""Test handling of missing fields"""
html = "<html><body><h1>Product</h1></body></html>"
response = self.create_response(html)
items = list(spider.parse_product(response))
assert len(items) == 1
item = items[0]
assert item['name'] == 'Product'
assert 'price' not in item or item['price'] is None
@responses.activate
def test_api_integration(self, spider):
"""Test API integration"""
# Mock API response
responses.add(
responses.GET,
'http://api.test.com/products',
json={'products': [{'id': 1, 'name': 'API Product'}]},
status=200
)
# Test API call logic
import requests
response = requests.get('http://api.test.com/products')
assert response.status_code == 200
assert response.json()['products'][0]['name'] == 'API Product'
# tests/test_pipelines.py
import pytest
from itemadapter import ItemAdapter
from webscraper.pipelines import AdvancedValidationPipeline, ValidationError
from webscraper.items import ProductItem
class TestValidationPipeline:
@pytest.fixture
def pipeline(self):
return AdvancedValidationPipeline()
@pytest.fixture
def valid_item(self):
return ProductItem({
'name': 'Test Product',
'url': 'https://test.com/product',
'price': 99.99,
'rating': 4.5,
'in_stock': True
})
def test_valid_item_passes(self, pipeline, valid_item):
"""Test that valid item passes validation"""
result = pipeline.process_item(valid_item, None)
assert result is not None
assert result['validation_passed'] is True
def test_missing_required_field_fails(self, pipeline):
"""Test that missing required fields cause validation failure"""
item = ProductItem({'price': 99.99})
with pytest.raises(ValidationError):
pipeline.process_item(item, None)
def test_invalid_price_type_fails(self, pipeline):
"""Test that invalid price type causes validation failure"""
item = ProductItem({
'name': 'Test Product',
'url': 'https://test.com/product',
'price': 'invalid'
})
with pytest.raises(ValidationError):
pipeline.process_item(item, None)
def test_price_cleaning(self, pipeline):
"""Test price cleaning functionality"""
price_tests = [
('$99.99', 99.99),
('€1,234.56', 1234.56),
('1.234,56', 1234.56),
('FREE', None),
('', None)
]
for input_price, expected in price_tests:
cleaned = pipeline._clean_price(input_price)
assert cleaned == expected
# scripts/smoke_tests.py
import requests
import time
import sys
import argparse
from typing import Dict, List
class SmokeTests:
"""Basic smoke tests for deployed environment"""
def __init__(self, environment: str):
self.environment = environment
self.base_urls = {
'staging': 'https://staging-scraper.yourcompany.com',
'production': 'https://scraper.yourcompany.com'
}
self.base_url = self.base_urls[environment]
def test_health_endpoints(self) -> bool:
"""Test health endpoints"""
endpoints = ['/health', '/ready', '/metrics']
for endpoint in endpoints:
try:
response = requests.get(f"{self.base_url}{endpoint}", timeout=10)
if response.status_code != 200:
print(f"❌ Health check failed for {endpoint}: {response.status_code}")
return False
print(f"βœ… Health check passed for {endpoint}")
except Exception as e:
print(f"❌ Health check failed for {endpoint}: {e}")
return False
return True
def test_basic_functionality(self) -> bool:
"""Test basic scraping functionality"""
try:
# Trigger a test spider run
response = requests.post(
f"{self.base_url}/api/spiders/test/start",
json={'test_mode': True},
timeout=30
)
if response.status_code != 200:
print(f"❌ Failed to start test spider: {response.status_code}")
return False
job_id = response.json().get('job_id')
# Check job status
for _ in range(10): # Wait up to 50 seconds
status_response = requests.get(
f"{self.base_url}/api/jobs/{job_id}/status",
timeout=10
)
if status_response.status_code == 200:
status = status_response.json().get('status')
if status == 'completed':
print("βœ… Basic functionality test passed")
return True
elif status == 'failed':
print("❌ Basic functionality test failed")
return False
time.sleep(5)
print("❌ Basic functionality test timed out")
return False
except Exception as e:
print(f"❌ Basic functionality test failed: {e}")
return False
def run_all_tests(self) -> bool:
"""Run all smoke tests"""
print(f"πŸš€ Running smoke tests for {self.environment} environment")
tests = [
self.test_health_endpoints,
self.test_basic_functionality
]
results = []
for test in tests:
results.append(test())
success = all(results)
if success:
print("πŸŽ‰ All smoke tests passed!")
else:
print("πŸ’₯ Some smoke tests failed!")
return success
def main():
parser = argparse.ArgumentParser(description='Run smoke tests')
parser.add_argument('--environment', required=True,
choices=['staging', 'production'],
help='Target environment')
args = parser.parse_args()
tests = SmokeTests(args.environment)
success = tests.run_all_tests()
sys.exit(0 if success else 1)
if __name__ == "__main__":
main()

Production Monitoring and Observability

Prometheus Metrics

webscraper/monitoring/metrics.py
from prometheus_client import Counter, Histogram, Gauge, CollectorRegistry, push_to_gateway
import time
from functools import wraps
from typing import Callable
import logging
class ScrapingMetrics:
"""Prometheus metrics for scraping operations"""
def __init__(self, pushgateway_url: str = None):
self.registry = CollectorRegistry()
self.pushgateway_url = pushgateway_url
# Counters
self.requests_total = Counter(
'scrapy_requests_total',
'Total number of requests made',
['spider', 'status'],
registry=self.registry
)
self.items_scraped_total = Counter(
'scrapy_items_scraped_total',
'Total number of items scraped',
['spider', 'item_type'],
registry=self.registry
)
self.errors_total = Counter(
'scrapy_errors_total',
'Total number of errors',
['spider', 'error_type'],
registry=self.registry
)
# Histograms
self.response_time = Histogram(
'scrapy_response_time_seconds',
'Response time for requests',
['spider', 'domain'],
registry=self.registry
)
self.item_processing_time = Histogram(
'scrapy_item_processing_time_seconds',
'Time to process items',
['spider', 'pipeline'],
registry=self.registry
)
# Gauges
self.active_requests = Gauge(
'scrapy_active_requests',
'Number of active requests',
['spider'],
registry=self.registry
)
self.queue_size = Gauge(
'scrapy_queue_size',
'Size of request queue',
['spider'],
registry=self.registry
)
self.memory_usage = Gauge(
'scrapy_memory_usage_bytes',
'Memory usage in bytes',
['spider'],
registry=self.registry
)
def record_request(self, spider: str, status: str, response_time: float = None, domain: str = None):
"""Record request metrics"""
self.requests_total.labels(spider=spider, status=status).inc()
if response_time and domain:
self.response_time.labels(spider=spider, domain=domain).observe(response_time)
def record_item(self, spider: str, item_type: str):
"""Record scraped item"""
self.items_scraped_total.labels(spider=spider, item_type=item_type).inc()
def record_error(self, spider: str, error_type: str):
"""Record error"""
self.errors_total.labels(spider=spider, error_type=error_type).inc()
def update_queue_size(self, spider: str, size: int):
"""Update queue size"""
self.queue_size.labels(spider=spider).set(size)
def update_active_requests(self, spider: str, count: int):
"""Update active requests count"""
self.active_requests.labels(spider=spider).set(count)
def update_memory_usage(self, spider: str, bytes_used: int):
"""Update memory usage"""
self.memory_usage.labels(spider=spider).set(bytes_used)
def push_metrics(self, job_name: str):
"""Push metrics to Pushgateway"""
if self.pushgateway_url:
try:
push_to_gateway(
self.pushgateway_url,
job=job_name,
registry=self.registry
)
except Exception as e:
logging.error(f"Failed to push metrics: {e}")
def monitor_performance(metrics: ScrapingMetrics, spider_name: str):
"""Decorator to monitor function performance"""
def decorator(func: Callable) -> Callable:
@wraps(func)
def wrapper(*args, **kwargs):
start_time = time.time()
try:
result = func(*args, **kwargs)
return result
except Exception as e:
metrics.record_error(spider_name, type(e).__name__)
raise
finally:
duration = time.time() - start_time
metrics.item_processing_time.labels(
spider=spider_name,
pipeline=func.__name__
).observe(duration)
return wrapper
return decorator
# Integration with Scrapy
class PrometheusStatsCollector:
"""Collect Scrapy stats and export to Prometheus"""
def __init__(self, crawler):
self.crawler = crawler
self.metrics = ScrapingMetrics(
pushgateway_url=crawler.settings.get('PROMETHEUS_PUSHGATEWAY_URL')
)
self.spider_name = None
@classmethod
def from_crawler(cls, crawler):
return cls(crawler)
def spider_opened(self, spider):
self.spider_name = spider.name
spider.logger.info(f"Prometheus metrics enabled for spider: {spider.name}")
def spider_closed(self, spider, reason):
# Push final metrics
if self.metrics.pushgateway_url:
self.metrics.push_metrics(f"scrapy_{spider.name}")
# Log final stats
stats = self.crawler.stats.get_stats()
spider.logger.info(f"Final stats: {stats}")
def request_scheduled(self, request, spider):
self.metrics.update_active_requests(
spider.name,
self.crawler.stats.get_value('scheduler/enqueued', 0)
)
def response_received(self, response, request, spider):
# Record response metrics
status = str(response.status)
domain = response.url.split('/')[2] if '://' in response.url else 'unknown'
self.metrics.record_request(
spider=spider.name,
status=status,
domain=domain
)
def item_scraped(self, item, response, spider):
# Record item metrics
item_type = type(item).__name__
self.metrics.record_item(spider.name, item_type)

Grafana Dashboard Configuration

{
"dashboard": {
"id": null,
"title": "Scrapy Monitoring Dashboard",
"description": "Comprehensive monitoring for Scrapy spiders",
"tags": ["scrapy", "monitoring"],
"timezone": "browser",
"panels": [
{
"id": 1,
"title": "Requests per Second",
"type": "graph",
"targets": [
{
"expr": "rate(scrapy_requests_total[5m])",
"legendFormat": "{{spider}} - {{status}}"
}
],
"gridPos": {"h": 8, "w": 12, "x": 0, "y": 0},
"yAxes": [
{"label": "Requests/sec", "min": 0}
]
},
{
"id": 2,
"title": "Items Scraped per Hour",
"type": "graph",
"targets": [
{
"expr": "rate(scrapy_items_scraped_total[1h])*3600",
"legendFormat": "{{spider}} - {{item_type}}"
}
],
"gridPos": {"h": 8, "w": 12, "x": 12, "y": 0}
},
{
"id": 3,
"title": "Error Rate",
"type": "graph",
"targets": [
{
"expr": "rate(scrapy_errors_total[5m]) / rate(scrapy_requests_total[5m]) * 100",
"legendFormat": "{{spider}} error rate %"
}
],
"gridPos": {"h": 8, "w": 12, "x": 0, "y": 8},
"alert": {
"conditions": [
{
"query": {"queryType": "", "refId": "A"},
"reducer": {"type": "last", "params": []},
"evaluator": {"params": [5], "type": "gt"}
}
],
"executionErrorState": "alerting",
"frequency": "10s",
"handler": 1,
"name": "High Error Rate",
"noDataState": "no_data"
}
},
{
"id": 4,
"title": "Response Time Distribution",
"type": "heatmap",
"targets": [
{
"expr": "scrapy_response_time_seconds_bucket",
"legendFormat": "{{le}}"
}
],
"gridPos": {"h": 8, "w": 12, "x": 12, "y": 8}
},
{
"id": 5,
"title": "Memory Usage",
"type": "graph",
"targets": [
{
"expr": "scrapy_memory_usage_bytes / 1024 / 1024",
"legendFormat": "{{spider}} Memory (MB)"
}
],
"gridPos": {"h": 8, "w": 24, "x": 0, "y": 16}
},
{
"id": 6,
"title": "Queue Size",
"type": "graph",
"targets": [
{
"expr": "scrapy_queue_size",
"legendFormat": "{{spider}} Queue"
}
],
"gridPos": {"h": 8, "w": 12, "x": 0, "y": 24}
},
{
"id": 7,
"title": "Active Requests",
"type": "stat",
"targets": [
{
"expr": "sum(scrapy_active_requests)",
"legendFormat": "Total Active"
}
],
"gridPos": {"h": 8, "w": 12, "x": 12, "y": 24}
}
],
"time": {"from": "now-1h", "to": "now"},
"refresh": "5s"
}
}

Security and Compliance

Security Hardening

webscraper/security/security_middleware.py
import hmac
import hashlib
import time
from scrapy.downloadermiddlewares.useragent import UserAgentMiddleware
from scrapy.exceptions import IgnoreRequest
import logging
class SecurityMiddleware:
"""Security middleware for production environments"""
def __init__(self, settings):
self.settings = settings
self.logger = logging.getLogger(__name__)
# Rate limiting
self.rate_limits = {}
self.last_request_time = {}
# Security headers
self.required_headers = {
'User-Agent': True,
'Accept': True,
'Accept-Language': True
}
# Blocked patterns
self.blocked_patterns = [
r'.*\.exe$',
r'.*\.zip$',
r'.*admin.*',
r'.*login.*',
r'.*private.*'
]
def process_request(self, request, spider):
# Validate request security
if not self._validate_request_security(request):
raise IgnoreRequest("Request blocked by security policy")
# Apply rate limiting
self._apply_rate_limiting(request, spider)
# Add security headers
self._add_security_headers(request)
return None
def _validate_request_security(self, request):
"""Validate request against security policies"""
url = request.url.lower()
# Check blocked patterns
import re
for pattern in self.blocked_patterns:
if re.match(pattern, url):
self.logger.warning(f"Blocked request to: {request.url}")
return False
# Check for required headers
for header, required in self.required_headers.items():
if required and header not in request.headers:
self.logger.warning(f"Missing required header: {header}")
return False
return True
def _apply_rate_limiting(self, request, spider):
"""Apply rate limiting per domain"""
from urllib.parse import urlparse
domain = urlparse(request.url).netloc
current_time = time.time()
min_delay = self.settings.getfloat('SECURITY_MIN_DELAY', 1.0)
if domain in self.last_request_time:
time_since_last = current_time - self.last_request_time[domain]
if time_since_last < min_delay:
sleep_time = min_delay - time_since_last
spider.logger.debug(f"Security rate limiting: {sleep_time:.2f}s for {domain}")
time.sleep(sleep_time)
self.last_request_time[domain] = time.time()
def _add_security_headers(self, request):
"""Add security headers to requests"""
# Add timestamp for request validation
timestamp = str(int(time.time()))
request.headers['X-Request-Timestamp'] = timestamp
# Add security token if configured
secret_key = self.settings.get('SECURITY_SECRET_KEY')
if secret_key:
signature = hmac.new(
secret_key.encode(),
f"{request.url}{timestamp}".encode(),
hashlib.sha256
).hexdigest()
request.headers['X-Security-Signature'] = signature
class DataEncryptionPipeline:
"""Encrypt sensitive data before storage"""
def __init__(self, encryption_key):
self.encryption_key = encryption_key
self.sensitive_fields = ['email', 'phone', 'address', 'personal_id']
@classmethod
def from_crawler(cls, crawler):
encryption_key = crawler.settings.get('ENCRYPTION_KEY')
if not encryption_key:
raise ValueError("ENCRYPTION_KEY setting is required")
return cls(encryption_key)
def process_item(self, item, spider):
from cryptography.fernet import Fernet
fernet = Fernet(self.encryption_key.encode())
for field in self.sensitive_fields:
if field in item and item[field]:
# Encrypt sensitive data
encrypted_data = fernet.encrypt(str(item[field]).encode())
item[f"{field}_encrypted"] = encrypted_data.decode()
# Remove original field
del item[field]
return item
# scripts/security_audit.py
import subprocess
import json
import sys
from typing import Dict, List
class SecurityAudit:
"""Security audit for production deployment"""
def __init__(self):
self.issues = []
def audit_dependencies(self) -> bool:
"""Audit Python dependencies for known vulnerabilities"""
try:
result = subprocess.run(
['safety', 'check', '--json'],
capture_output=True,
text=True
)
if result.returncode != 0:
vulnerabilities = json.loads(result.stdout)
for vuln in vulnerabilities:
self.issues.append({
'type': 'dependency_vulnerability',
'severity': 'high',
'package': vuln['package_name'],
'vulnerability': vuln['vulnerability_id'],
'description': vuln['advisory']
})
return False
return True
except Exception as e:
self.issues.append({
'type': 'audit_error',
'severity': 'medium',
'description': f"Failed to audit dependencies: {e}"
})
return False
def audit_docker_image(self, image_name: str) -> bool:
"""Audit Docker image for security issues"""
try:
result = subprocess.run(
['trivy', 'image', '--format', 'json', image_name],
capture_output=True,
text=True
)
if result.returncode == 0:
scan_results = json.loads(result.stdout)
for result in scan_results.get('Results', []):
for vuln in result.get('Vulnerabilities', []):
if vuln.get('Severity') in ['HIGH', 'CRITICAL']:
self.issues.append({
'type': 'container_vulnerability',
'severity': vuln['Severity'].lower(),
'package': vuln.get('PkgName'),
'vulnerability': vuln.get('VulnerabilityID'),
'description': vuln.get('Description', '')
})
return len([i for i in self.issues if i['type'] == 'container_vulnerability']) == 0
except Exception as e:
self.issues.append({
'type': 'audit_error',
'severity': 'medium',
'description': f"Failed to audit Docker image: {e}"
})
return False
def audit_kubernetes_config(self, config_path: str) -> bool:
"""Audit Kubernetes configuration"""
security_checks = [
self._check_non_root_user,
self._check_resource_limits,
self._check_security_context,
self._check_network_policies
]
passed = True
for check in security_checks:
if not check(config_path):
passed = False
return passed
def _check_non_root_user(self, config_path: str) -> bool:
"""Check if containers run as non-root"""
# Implementation for checking non-root user
return True
def _check_resource_limits(self, config_path: str) -> bool:
"""Check if resource limits are set"""
# Implementation for checking resource limits
return True
def _check_security_context(self, config_path: str) -> bool:
"""Check security context configuration"""
# Implementation for checking security context
return True
def _check_network_policies(self, config_path: str) -> bool:
"""Check network policies"""
# Implementation for checking network policies
return True
def generate_report(self) -> Dict:
"""Generate security audit report"""
severity_counts = {}
for issue in self.issues:
severity = issue['severity']
severity_counts[severity] = severity_counts.get(severity, 0) + 1
return {
'total_issues': len(self.issues),
'severity_breakdown': severity_counts,
'issues': self.issues,
'passed': len(self.issues) == 0
}
def run_full_audit(self, image_name: str = None, config_path: str = None) -> bool:
"""Run complete security audit"""
print("πŸ”’ Running security audit...")
# Audit dependencies
deps_ok = self.audit_dependencies()
print(f"Dependencies: {'βœ…' if deps_ok else '❌'}")
# Audit Docker image if provided
if image_name:
image_ok = self.audit_docker_image(image_name)
print(f"Docker image: {'βœ…' if image_ok else '❌'}")
# Audit Kubernetes config if provided
if config_path:
k8s_ok = self.audit_kubernetes_config(config_path)
print(f"Kubernetes config: {'βœ…' if k8s_ok else '❌'}")
# Generate report
report = self.generate_report()
if report['passed']:
print("πŸŽ‰ Security audit passed!")
else:
print(f"πŸ’₯ Security audit failed with {report['total_issues']} issues")
for issue in self.issues:
print(f" - {issue['severity'].upper()}: {issue['description']}")
return report['passed']
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description='Run security audit')
parser.add_argument('--image', help='Docker image to audit')
parser.add_argument('--config', help='Kubernetes config path to audit')
args = parser.parse_args()
audit = SecurityAudit()
success = audit.run_full_audit(args.image, args.config)
sys.exit(0 if success else 1)

Summary and Congratulations!

πŸŽ‰ Congratulations! You’ve completed the comprehensive 5-part Scrapy mastery series!

What You’ve Accomplished

Throughout this series, you’ve built expertise in:

βœ… Part 1: Scrapy Fundamentals - Professional environment setup and basic spiders
βœ… Part 2: Advanced Techniques - JavaScript rendering, forms, and AJAX handling
βœ… Part 3: Anti-Detection & Scaling - Distributed scraping and stealth techniques
βœ… Part 4: Data Processing - Advanced validation, storage, and analytics
βœ… Part 5: Production Deployment - Enterprise-grade deployment and monitoring

Key Production Skills Mastered

  • Docker containerization with multi-stage builds and security
  • Kubernetes orchestration with auto-scaling and health checks
  • CI/CD pipelines with automated testing and deployment
  • Production monitoring with Prometheus and Grafana
  • Security hardening and compliance frameworks
  • Cloud deployment strategies across multiple platforms

Next Steps and Advanced Topics

  1. Machine Learning Integration: Add ML models for data classification and anomaly detection
  2. Real-time Processing: Implement streaming data pipelines with Apache Kafka
  3. Advanced Analytics: Build predictive models and business intelligence dashboards
  4. Global Distribution: Deploy across multiple regions with CDN integration
  5. Cost Optimization: Implement intelligent resource scheduling and spot instances

Best Practices Checklist

Before going to production, ensure you have:

  • Comprehensive test coverage (>90%)
  • Security audit passed
  • Performance benchmarks established
  • Monitoring and alerting configured
  • Backup and disaster recovery tested
  • Documentation and runbooks complete
  • Team training and knowledge transfer done

Community and Resources

  • Official Scrapy Documentation: docs.scrapy.org
  • Scrapy Community: Join the Scrapy community on GitHub and Discord
  • Advanced Courses: Consider specialized courses on distributed systems and data engineering
  • Professional Services: For enterprise implementations, consider professional consulting

Final Project Challenge

Build a complete production-ready scraping system that:

  1. Scrapes multiple e-commerce sites with different technologies
  2. Processes millions of products daily with 99.9% uptime
  3. Provides real-time analytics and business insights
  4. Scales automatically based on demand
  5. Maintains legal compliance and ethical standards
  6. Includes comprehensive monitoring and alerting

You now have the knowledge and tools to build enterprise-grade web scraping solutions that can handle any challenge!

Happy production scraping! πŸš€πŸ•·οΈ


This concludes our comprehensive 5-part series on mastering web scraping with Scrapy. You’re now equipped to tackle any web scraping challenge at enterprise scale!