Master Web Scraping with Scrapy: Production Deployment
Welcome to the final part of our comprehensive Scrapy series! In this expert-level tutorial, weβll transform your scraping projects into production-ready, enterprise-grade systems that can handle massive scale, ensure reliability, and maintain security in real-world environments.
What Youβll Learn in This Part
- Docker containerization and multi-stage builds
- Kubernetes orchestration and scaling
- CI/CD pipelines with automated testing
- Cloud deployment (AWS, GCP, Azure)
- Production monitoring and observability
- Security hardening and compliance
- Auto-scaling and resource optimization
- Disaster recovery and backup strategies
- Performance tuning and optimization
Docker Containerization
Multi-Stage Dockerfile
# webscraper/Dockerfile# Multi-stage build for optimized production image
# Build stageFROM python:3.11-slim as builder
# Set environment variablesENV PYTHONDONTWRITEBYTECODE=1 \ PYTHONUNBUFFERED=1 \ PIP_NO_CACHE_DIR=1 \ PIP_DISABLE_PIP_VERSION_CHECK=1
# Install system dependenciesRUN apt-get update && apt-get install -y \ gcc \ g++ \ libxml2-dev \ libxslt-dev \ libffi-dev \ libssl-dev \ build-essential \ && rm -rf /var/lib/apt/lists/*
# Create and activate virtual environmentRUN python -m venv /opt/venvENV PATH="/opt/venv/bin:$PATH"
# Install Python dependenciesCOPY requirements.txt .RUN pip install --upgrade pip && \ pip install -r requirements.txt
# Production stageFROM python:3.11-slim as production
# Set environment variablesENV PYTHONDONTWRITEBYTECODE=1 \ PYTHONUNBUFFERED=1 \ PATH="/opt/venv/bin:$PATH"
# Create non-root userRUN groupadd -r scrapy && useradd -r -g scrapy scrapy
# Install runtime dependenciesRUN apt-get update && apt-get install -y \ curl \ ca-certificates \ && rm -rf /var/lib/apt/lists/*
# Copy virtual environment from builder stageCOPY --from=builder /opt/venv /opt/venv
# Create application directoryWORKDIR /app
# Copy application codeCOPY --chown=scrapy:scrapy . .
# Create necessary directoriesRUN mkdir -p /app/logs /app/data /app/exports && \ chown -R scrapy:scrapy /app
# Switch to non-root userUSER scrapy
# Health checkHEALTHCHECK --interval=30s --timeout=10s --start-period=60s --retries=3 \ CMD curl -f http://localhost:6023/ || exit 1
# Default commandCMD ["scrapy", "list"]
# Development stageFROM production as development
USER root
# Install development dependenciesRUN pip install pytest pytest-cov black flake8 mypy
# Install debugging toolsRUN apt-get update && apt-get install -y \ vim \ htop \ net-tools \ && rm -rf /var/lib/apt/lists/*
USER scrapy
# Override default command for developmentCMD ["tail", "-f", "/dev/null"]
Docker Compose for Development
version: '3.8'
services: # Main scraper service scraper: build: context: . target: development dockerfile: Dockerfile volumes: - .:/app - ./data:/app/data - ./logs:/app/logs environment: - SCRAPY_SETTINGS_MODULE=webscraper.settings.development - REDIS_URL=redis://redis:6379/0 - MONGO_URI=mongodb://mongo:27017/scrapy_dev - POSTGRES_HOST=postgres - POSTGRES_DB=scrapy_dev - POSTGRES_USER=scrapy - POSTGRES_PASSWORD=scrapy_password depends_on: - redis - mongo - postgres networks: - scrapy-network
# Redis for distributed scraping redis: image: redis:7-alpine ports: - "6379:6379" volumes: - redis-data:/data command: redis-server --appendonly yes networks: - scrapy-network
# MongoDB for document storage mongo: image: mongo:6 ports: - "27017:27017" volumes: - mongo-data:/data/db environment: - MONGO_INITDB_ROOT_USERNAME=admin - MONGO_INITDB_ROOT_PASSWORD=admin_password networks: - scrapy-network
# PostgreSQL for structured data postgres: image: postgres:15 ports: - "5432:5432" volumes: - postgres-data:/var/lib/postgresql/data environment: - POSTGRES_DB=scrapy_dev - POSTGRES_USER=scrapy - POSTGRES_PASSWORD=scrapy_password networks: - scrapy-network
# Elasticsearch for search and analytics elasticsearch: image: docker.elastic.co/elasticsearch/elasticsearch:8.8.0 ports: - "9200:9200" volumes: - es-data:/usr/share/elasticsearch/data environment: - discovery.type=single-node - xpack.security.enabled=false - "ES_JAVA_OPTS=-Xms512m -Xmx512m" networks: - scrapy-network
# Kibana for data visualization kibana: image: docker.elastic.co/kibana/kibana:8.8.0 ports: - "5601:5601" environment: - ELASTICSEARCH_HOSTS=http://elasticsearch:9200 depends_on: - elasticsearch networks: - scrapy-network
# Prometheus for monitoring prometheus: image: prom/prometheus:latest ports: - "9090:9090" volumes: - ./monitoring/prometheus.yml:/etc/prometheus/prometheus.yml - prometheus-data:/prometheus command: - '--config.file=/etc/prometheus/prometheus.yml' - '--storage.tsdb.path=/prometheus' - '--web.console.libraries=/etc/prometheus/console_libraries' - '--web.console.templates=/etc/prometheus/consoles' networks: - scrapy-network
# Grafana for dashboards grafana: image: grafana/grafana:latest ports: - "3000:3000" volumes: - grafana-data:/var/lib/grafana - ./monitoring/grafana:/etc/grafana/provisioning environment: - GF_SECURITY_ADMIN_PASSWORD=admin depends_on: - prometheus networks: - scrapy-network
# Splash for JavaScript rendering splash: image: scrapinghub/splash:latest ports: - "8050:8050" command: --max-timeout=3600 --slots=5 networks: - scrapy-network
volumes: redis-data: mongo-data: postgres-data: es-data: prometheus-data: grafana-data:
networks: scrapy-network: driver: bridge
Production Docker Compose
version: '3.8'
services: # Load balancer nginx: image: nginx:alpine ports: - "80:80" - "443:443" volumes: - ./nginx/nginx.conf:/etc/nginx/nginx.conf - ./nginx/ssl:/etc/nginx/ssl depends_on: - scraper-coordinator networks: - scrapy-network
# Coordinator service scraper-coordinator: build: context: . target: production command: ["python", "scripts/coordinator.py"] environment: - SCRAPY_SETTINGS_MODULE=webscraper.settings.production - REDIS_URL=redis://redis-cluster:6379/0 - SENTRY_DSN=${SENTRY_DSN} deploy: replicas: 1 resources: limits: cpus: '0.5' memory: 512M reservations: cpus: '0.25' memory: 256M depends_on: - redis-cluster networks: - scrapy-network
# Worker services scraper-worker: build: context: . target: production command: ["python", "scripts/worker.py"] environment: - SCRAPY_SETTINGS_MODULE=webscraper.settings.production - REDIS_URL=redis://redis-cluster:6379/0 - WORKER_ID=${HOSTNAME} deploy: replicas: 3 resources: limits: cpus: '1.0' memory: 1G reservations: cpus: '0.5' memory: 512M depends_on: - redis-cluster - scraper-coordinator networks: - scrapy-network
# Redis cluster redis-cluster: image: redis:7-alpine command: redis-server --appendonly yes --maxmemory 256mb --maxmemory-policy allkeys-lru volumes: - redis-prod-data:/data deploy: resources: limits: cpus: '0.5' memory: 512M networks: - scrapy-network
# Monitoring stack prometheus: image: prom/prometheus:latest volumes: - ./monitoring/prometheus.prod.yml:/etc/prometheus/prometheus.yml - prometheus-prod-data:/prometheus deploy: resources: limits: cpus: '0.3' memory: 256M networks: - scrapy-network
grafana: image: grafana/grafana:latest volumes: - grafana-prod-data:/var/lib/grafana environment: - GF_SECURITY_ADMIN_PASSWORD=${GRAFANA_PASSWORD} - GF_INSTALL_PLUGINS=grafana-piechart-panel deploy: resources: limits: cpus: '0.3' memory: 256M networks: - scrapy-network
volumes: redis-prod-data: prometheus-prod-data: grafana-prod-data:
networks: scrapy-network: driver: overlay attachable: true
Kubernetes Deployment
Kubernetes Manifests
apiVersion: v1kind: Namespacemetadata: name: scrapy-production labels: name: scrapy-production
---# k8s/configmap.yamlapiVersion: v1kind: ConfigMapmetadata: name: scrapy-config namespace: scrapy-productiondata: SCRAPY_SETTINGS_MODULE: "webscraper.settings.production" REDIS_URL: "redis://redis-service:6379/0" LOG_LEVEL: "INFO" CONCURRENT_REQUESTS: "16" DOWNLOAD_DELAY: "1"
---# k8s/secret.yamlapiVersion: v1kind: Secretmetadata: name: scrapy-secrets namespace: scrapy-productiontype: Opaquedata: # Base64 encoded values POSTGRES_PASSWORD: c2NyYXB5X3Bhc3N3b3Jk # scrapy_password MONGO_PASSWORD: bW9uZ29fcGFzc3dvcmQ= # mongo_password SENTRY_DSN: aHR0cHM6Ly9zZW50cnkuaW8= # https://sentry.io
---# k8s/redis-deployment.yamlapiVersion: apps/v1kind: Deploymentmetadata: name: redis namespace: scrapy-productionspec: replicas: 1 selector: matchLabels: app: redis template: metadata: labels: app: redis spec: containers: - name: redis image: redis:7-alpine ports: - containerPort: 6379 command: ["redis-server"] args: ["--appendonly", "yes", "--maxmemory", "256mb"] resources: requests: memory: "128Mi" cpu: "100m" limits: memory: "512Mi" cpu: "500m" volumeMounts: - name: redis-storage mountPath: /data volumes: - name: redis-storage persistentVolumeClaim: claimName: redis-pvc
---apiVersion: v1kind: Servicemetadata: name: redis-service namespace: scrapy-productionspec: selector: app: redis ports: - port: 6379 targetPort: 6379 type: ClusterIP
---# k8s/coordinator-deployment.yamlapiVersion: apps/v1kind: Deploymentmetadata: name: scraper-coordinator namespace: scrapy-productionspec: replicas: 1 selector: matchLabels: app: scraper-coordinator template: metadata: labels: app: scraper-coordinator spec: containers: - name: coordinator image: your-registry/webscraper:latest command: ["python", "scripts/coordinator.py"] envFrom: - configMapRef: name: scrapy-config - secretRef: name: scrapy-secrets resources: requests: memory: "256Mi" cpu: "200m" limits: memory: "512Mi" cpu: "500m" ports: - containerPort: 8080 livenessProbe: httpGet: path: /health port: 8080 initialDelaySeconds: 30 periodSeconds: 10 readinessProbe: httpGet: path: /ready port: 8080 initialDelaySeconds: 5 periodSeconds: 5
---# k8s/worker-deployment.yamlapiVersion: apps/v1kind: Deploymentmetadata: name: scraper-worker namespace: scrapy-productionspec: replicas: 3 selector: matchLabels: app: scraper-worker template: metadata: labels: app: scraper-worker spec: containers: - name: worker image: your-registry/webscraper:latest command: ["python", "scripts/worker.py"] envFrom: - configMapRef: name: scrapy-config - secretRef: name: scrapy-secrets env: - name: WORKER_ID valueFrom: fieldRef: fieldPath: metadata.name resources: requests: memory: "512Mi" cpu: "300m" limits: memory: "1Gi" cpu: "1000m" ports: - containerPort: 8081 livenessProbe: httpGet: path: /health port: 8081 initialDelaySeconds: 60 periodSeconds: 30 readinessProbe: httpGet: path: /ready port: 8081 initialDelaySeconds: 10 periodSeconds: 10
---# k8s/hpa.yamlapiVersion: autoscaling/v2kind: HorizontalPodAutoscalermetadata: name: scraper-worker-hpa namespace: scrapy-productionspec: scaleTargetRef: apiVersion: apps/v1 kind: Deployment name: scraper-worker minReplicas: 2 maxReplicas: 10 metrics: - type: Resource resource: name: cpu target: type: Utilization averageUtilization: 70 - type: Resource resource: name: memory target: type: Utilization averageUtilization: 80 behavior: scaleUp: stabilizationWindowSeconds: 300 policies: - type: Percent value: 100 periodSeconds: 15 scaleDown: stabilizationWindowSeconds: 300 policies: - type: Percent value: 10 periodSeconds: 60
---# k8s/cronjob.yamlapiVersion: batch/v1kind: CronJobmetadata: name: scraper-job namespace: scrapy-productionspec: schedule: "0 */6 * * *" # Every 6 hours jobTemplate: spec: template: spec: containers: - name: scraper image: your-registry/webscraper:latest command: ["scrapy", "crawl", "ecommerce"] envFrom: - configMapRef: name: scrapy-config - secretRef: name: scrapy-secrets resources: requests: memory: "1Gi" cpu: "500m" limits: memory: "2Gi" cpu: "1000m" restartPolicy: OnFailure successfulJobsHistoryLimit: 3 failedJobsHistoryLimit: 1
---# k8s/pvc.yamlapiVersion: v1kind: PersistentVolumeClaimmetadata: name: redis-pvc namespace: scrapy-productionspec: accessModes: - ReadWriteOnce resources: requests: storage: 10Gi storageClassName: fast-ssd
CI/CD Pipeline
GitHub Actions Workflow
name: CI/CD Pipeline
on: push: branches: [main, develop] pull_request: branches: [main]
env: REGISTRY: ghcr.io IMAGE_NAME: ${{ github.repository }}
jobs: # Test and quality checks test: runs-on: ubuntu-latest services: redis: image: redis options: >- --health-cmd "redis-cli ping" --health-interval 10s --health-timeout 5s --health-retries 5 ports: - 6379:6379
steps: - uses: actions/checkout@v4
- name: Set up Python uses: actions/setup-python@v4 with: python-version: '3.11'
- name: Cache dependencies uses: actions/cache@v3 with: path: ~/.cache/pip key: ${{ runner.os }}-pip-${{ hashFiles('**/requirements.txt') }}
- name: Install dependencies run: | python -m pip install --upgrade pip pip install -r requirements.txt pip install -r requirements-dev.txt
- name: Code formatting check run: | black --check --diff .
- name: Linting run: | flake8 webscraper tests
- name: Type checking run: | mypy webscraper
- name: Security scan run: | bandit -r webscraper
- name: Run tests run: | pytest tests/ --cov=webscraper --cov-report=xml --cov-report=html env: REDIS_URL: redis://localhost:6379/0
- name: Upload coverage uses: codecov/codecov-action@v3 with: token: ${{ secrets.CODECOV_TOKEN }} file: ./coverage.xml
# Build and push Docker image build: needs: test runs-on: ubuntu-latest permissions: contents: read packages: write
steps: - uses: actions/checkout@v4
- name: Set up Docker Buildx uses: docker/setup-buildx-action@v3
- name: Login to Container Registry uses: docker/login-action@v3 with: registry: ${{ env.REGISTRY }} username: ${{ github.actor }} password: ${{ secrets.GITHUB_TOKEN }}
- name: Extract metadata id: meta uses: docker/metadata-action@v5 with: images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }} tags: | type=ref,event=branch type=ref,event=pr type=sha,prefix={{branch}}- type=raw,value=latest,enable={{is_default_branch}}
- name: Build and push uses: docker/build-push-action@v5 with: context: . target: production push: true tags: ${{ steps.meta.outputs.tags }} labels: ${{ steps.meta.outputs.labels }} cache-from: type=gha cache-to: type=gha,mode=max
# Deploy to staging deploy-staging: needs: build runs-on: ubuntu-latest if: github.ref == 'refs/heads/develop' environment: staging
steps: - uses: actions/checkout@v4
- name: Configure kubectl uses: azure/k8s-set-context@v3 with: method: kubeconfig kubeconfig: ${{ secrets.KUBE_CONFIG_STAGING }}
- name: Deploy to staging run: | # Update image in k8s manifests sed -i "s|your-registry/webscraper:latest|${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:develop|g" k8s/*.yaml
# Apply manifests kubectl apply -f k8s/ -n scrapy-staging
# Wait for rollout kubectl rollout status deployment/scraper-coordinator -n scrapy-staging kubectl rollout status deployment/scraper-worker -n scrapy-staging
- name: Run smoke tests run: | # Wait for services to be ready kubectl wait --for=condition=ready pod -l app=scraper-coordinator -n scrapy-staging --timeout=300s
# Run basic smoke tests python scripts/smoke_tests.py --environment=staging
# Deploy to production deploy-production: needs: build runs-on: ubuntu-latest if: github.ref == 'refs/heads/main' environment: production
steps: - uses: actions/checkout@v4
- name: Configure kubectl uses: azure/k8s-set-context@v3 with: method: kubeconfig kubeconfig: ${{ secrets.KUBE_CONFIG_PRODUCTION }}
- name: Deploy to production run: | # Update image in k8s manifests sed -i "s|your-registry/webscraper:latest|${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:latest|g" k8s/*.yaml
# Apply manifests with rolling update kubectl apply -f k8s/ -n scrapy-production
# Wait for rollout kubectl rollout status deployment/scraper-coordinator -n scrapy-production --timeout=600s kubectl rollout status deployment/scraper-worker -n scrapy-production --timeout=600s
- name: Verify deployment run: | # Check pod health kubectl get pods -n scrapy-production
# Run production health checks python scripts/health_check.py --environment=production
- name: Notify deployment uses: 8398a7/action-slack@v3 with: status: ${{ job.status }} channel: '#deployments' webhook_url: ${{ secrets.SLACK_WEBHOOK }} if: always()
Testing Framework
import pytestimport responsesfrom scrapy.http import HtmlResponse, Requestfrom webscraper.spiders.ecommerce_spider import EcommerceSpiderfrom webscraper.items import ProductItem
class TestEcommerceSpider:
@pytest.fixture def spider(self): return EcommerceSpider()
@pytest.fixture def sample_product_html(self): return """ <html> <body> <h1 class="product-title">Test Product</h1> <span class="price-current">$99.99</span> <div class="product-description"> <p>This is a test product description</p> </div> <span class="brand-name">TestBrand</span> <span class="rating-value">4.5</span> <div class="stock-status">In Stock</div> </body> </html> """
def create_response(self, html, url="http://test.com"): request = Request(url=url) return HtmlResponse(url=url, request=request, body=html.encode('utf-8'))
def test_parse_product_basic(self, spider, sample_product_html): """Test basic product parsing""" response = self.create_response(sample_product_html)
items = list(spider.parse_product(response))
assert len(items) == 1 item = items[0]
assert item['name'] == 'Test Product' assert item['price'] == 99.99 assert item['brand'] == 'TestBrand' assert item['rating'] == 4.5
def test_parse_product_missing_fields(self, spider): """Test handling of missing fields""" html = "<html><body><h1>Product</h1></body></html>" response = self.create_response(html)
items = list(spider.parse_product(response))
assert len(items) == 1 item = items[0] assert item['name'] == 'Product' assert 'price' not in item or item['price'] is None
@responses.activate def test_api_integration(self, spider): """Test API integration""" # Mock API response responses.add( responses.GET, 'http://api.test.com/products', json={'products': [{'id': 1, 'name': 'API Product'}]}, status=200 )
# Test API call logic import requests response = requests.get('http://api.test.com/products') assert response.status_code == 200 assert response.json()['products'][0]['name'] == 'API Product'
# tests/test_pipelines.pyimport pytestfrom itemadapter import ItemAdapterfrom webscraper.pipelines import AdvancedValidationPipeline, ValidationErrorfrom webscraper.items import ProductItem
class TestValidationPipeline:
@pytest.fixture def pipeline(self): return AdvancedValidationPipeline()
@pytest.fixture def valid_item(self): return ProductItem({ 'name': 'Test Product', 'url': 'https://test.com/product', 'price': 99.99, 'rating': 4.5, 'in_stock': True })
def test_valid_item_passes(self, pipeline, valid_item): """Test that valid item passes validation""" result = pipeline.process_item(valid_item, None) assert result is not None assert result['validation_passed'] is True
def test_missing_required_field_fails(self, pipeline): """Test that missing required fields cause validation failure""" item = ProductItem({'price': 99.99})
with pytest.raises(ValidationError): pipeline.process_item(item, None)
def test_invalid_price_type_fails(self, pipeline): """Test that invalid price type causes validation failure""" item = ProductItem({ 'name': 'Test Product', 'url': 'https://test.com/product', 'price': 'invalid' })
with pytest.raises(ValidationError): pipeline.process_item(item, None)
def test_price_cleaning(self, pipeline): """Test price cleaning functionality""" price_tests = [ ('$99.99', 99.99), ('β¬1,234.56', 1234.56), ('1.234,56', 1234.56), ('FREE', None), ('', None) ]
for input_price, expected in price_tests: cleaned = pipeline._clean_price(input_price) assert cleaned == expected
# scripts/smoke_tests.pyimport requestsimport timeimport sysimport argparsefrom typing import Dict, List
class SmokeTests: """Basic smoke tests for deployed environment"""
def __init__(self, environment: str): self.environment = environment self.base_urls = { 'staging': 'https://staging-scraper.yourcompany.com', 'production': 'https://scraper.yourcompany.com' } self.base_url = self.base_urls[environment]
def test_health_endpoints(self) -> bool: """Test health endpoints""" endpoints = ['/health', '/ready', '/metrics']
for endpoint in endpoints: try: response = requests.get(f"{self.base_url}{endpoint}", timeout=10) if response.status_code != 200: print(f"β Health check failed for {endpoint}: {response.status_code}") return False print(f"β
Health check passed for {endpoint}") except Exception as e: print(f"β Health check failed for {endpoint}: {e}") return False
return True
def test_basic_functionality(self) -> bool: """Test basic scraping functionality""" try: # Trigger a test spider run response = requests.post( f"{self.base_url}/api/spiders/test/start", json={'test_mode': True}, timeout=30 )
if response.status_code != 200: print(f"β Failed to start test spider: {response.status_code}") return False
job_id = response.json().get('job_id')
# Check job status for _ in range(10): # Wait up to 50 seconds status_response = requests.get( f"{self.base_url}/api/jobs/{job_id}/status", timeout=10 )
if status_response.status_code == 200: status = status_response.json().get('status') if status == 'completed': print("β
Basic functionality test passed") return True elif status == 'failed': print("β Basic functionality test failed") return False
time.sleep(5)
print("β Basic functionality test timed out") return False
except Exception as e: print(f"β Basic functionality test failed: {e}") return False
def run_all_tests(self) -> bool: """Run all smoke tests""" print(f"π Running smoke tests for {self.environment} environment")
tests = [ self.test_health_endpoints, self.test_basic_functionality ]
results = [] for test in tests: results.append(test())
success = all(results)
if success: print("π All smoke tests passed!") else: print("π₯ Some smoke tests failed!")
return success
def main(): parser = argparse.ArgumentParser(description='Run smoke tests') parser.add_argument('--environment', required=True, choices=['staging', 'production'], help='Target environment') args = parser.parse_args()
tests = SmokeTests(args.environment) success = tests.run_all_tests()
sys.exit(0 if success else 1)
if __name__ == "__main__": main()
Production Monitoring and Observability
Prometheus Metrics
from prometheus_client import Counter, Histogram, Gauge, CollectorRegistry, push_to_gatewayimport timefrom functools import wrapsfrom typing import Callableimport logging
class ScrapingMetrics: """Prometheus metrics for scraping operations"""
def __init__(self, pushgateway_url: str = None): self.registry = CollectorRegistry() self.pushgateway_url = pushgateway_url
# Counters self.requests_total = Counter( 'scrapy_requests_total', 'Total number of requests made', ['spider', 'status'], registry=self.registry )
self.items_scraped_total = Counter( 'scrapy_items_scraped_total', 'Total number of items scraped', ['spider', 'item_type'], registry=self.registry )
self.errors_total = Counter( 'scrapy_errors_total', 'Total number of errors', ['spider', 'error_type'], registry=self.registry )
# Histograms self.response_time = Histogram( 'scrapy_response_time_seconds', 'Response time for requests', ['spider', 'domain'], registry=self.registry )
self.item_processing_time = Histogram( 'scrapy_item_processing_time_seconds', 'Time to process items', ['spider', 'pipeline'], registry=self.registry )
# Gauges self.active_requests = Gauge( 'scrapy_active_requests', 'Number of active requests', ['spider'], registry=self.registry )
self.queue_size = Gauge( 'scrapy_queue_size', 'Size of request queue', ['spider'], registry=self.registry )
self.memory_usage = Gauge( 'scrapy_memory_usage_bytes', 'Memory usage in bytes', ['spider'], registry=self.registry )
def record_request(self, spider: str, status: str, response_time: float = None, domain: str = None): """Record request metrics""" self.requests_total.labels(spider=spider, status=status).inc()
if response_time and domain: self.response_time.labels(spider=spider, domain=domain).observe(response_time)
def record_item(self, spider: str, item_type: str): """Record scraped item""" self.items_scraped_total.labels(spider=spider, item_type=item_type).inc()
def record_error(self, spider: str, error_type: str): """Record error""" self.errors_total.labels(spider=spider, error_type=error_type).inc()
def update_queue_size(self, spider: str, size: int): """Update queue size""" self.queue_size.labels(spider=spider).set(size)
def update_active_requests(self, spider: str, count: int): """Update active requests count""" self.active_requests.labels(spider=spider).set(count)
def update_memory_usage(self, spider: str, bytes_used: int): """Update memory usage""" self.memory_usage.labels(spider=spider).set(bytes_used)
def push_metrics(self, job_name: str): """Push metrics to Pushgateway""" if self.pushgateway_url: try: push_to_gateway( self.pushgateway_url, job=job_name, registry=self.registry ) except Exception as e: logging.error(f"Failed to push metrics: {e}")
def monitor_performance(metrics: ScrapingMetrics, spider_name: str): """Decorator to monitor function performance""" def decorator(func: Callable) -> Callable: @wraps(func) def wrapper(*args, **kwargs): start_time = time.time() try: result = func(*args, **kwargs) return result except Exception as e: metrics.record_error(spider_name, type(e).__name__) raise finally: duration = time.time() - start_time metrics.item_processing_time.labels( spider=spider_name, pipeline=func.__name__ ).observe(duration) return wrapper return decorator
# Integration with Scrapyclass PrometheusStatsCollector: """Collect Scrapy stats and export to Prometheus"""
def __init__(self, crawler): self.crawler = crawler self.metrics = ScrapingMetrics( pushgateway_url=crawler.settings.get('PROMETHEUS_PUSHGATEWAY_URL') ) self.spider_name = None
@classmethod def from_crawler(cls, crawler): return cls(crawler)
def spider_opened(self, spider): self.spider_name = spider.name spider.logger.info(f"Prometheus metrics enabled for spider: {spider.name}")
def spider_closed(self, spider, reason): # Push final metrics if self.metrics.pushgateway_url: self.metrics.push_metrics(f"scrapy_{spider.name}")
# Log final stats stats = self.crawler.stats.get_stats() spider.logger.info(f"Final stats: {stats}")
def request_scheduled(self, request, spider): self.metrics.update_active_requests( spider.name, self.crawler.stats.get_value('scheduler/enqueued', 0) )
def response_received(self, response, request, spider): # Record response metrics status = str(response.status) domain = response.url.split('/')[2] if '://' in response.url else 'unknown'
self.metrics.record_request( spider=spider.name, status=status, domain=domain )
def item_scraped(self, item, response, spider): # Record item metrics item_type = type(item).__name__ self.metrics.record_item(spider.name, item_type)
Grafana Dashboard Configuration
{ "dashboard": { "id": null, "title": "Scrapy Monitoring Dashboard", "description": "Comprehensive monitoring for Scrapy spiders", "tags": ["scrapy", "monitoring"], "timezone": "browser", "panels": [ { "id": 1, "title": "Requests per Second", "type": "graph", "targets": [ { "expr": "rate(scrapy_requests_total[5m])", "legendFormat": "{{spider}} - {{status}}" } ], "gridPos": {"h": 8, "w": 12, "x": 0, "y": 0}, "yAxes": [ {"label": "Requests/sec", "min": 0} ] }, { "id": 2, "title": "Items Scraped per Hour", "type": "graph", "targets": [ { "expr": "rate(scrapy_items_scraped_total[1h])*3600", "legendFormat": "{{spider}} - {{item_type}}" } ], "gridPos": {"h": 8, "w": 12, "x": 12, "y": 0} }, { "id": 3, "title": "Error Rate", "type": "graph", "targets": [ { "expr": "rate(scrapy_errors_total[5m]) / rate(scrapy_requests_total[5m]) * 100", "legendFormat": "{{spider}} error rate %" } ], "gridPos": {"h": 8, "w": 12, "x": 0, "y": 8}, "alert": { "conditions": [ { "query": {"queryType": "", "refId": "A"}, "reducer": {"type": "last", "params": []}, "evaluator": {"params": [5], "type": "gt"} } ], "executionErrorState": "alerting", "frequency": "10s", "handler": 1, "name": "High Error Rate", "noDataState": "no_data" } }, { "id": 4, "title": "Response Time Distribution", "type": "heatmap", "targets": [ { "expr": "scrapy_response_time_seconds_bucket", "legendFormat": "{{le}}" } ], "gridPos": {"h": 8, "w": 12, "x": 12, "y": 8} }, { "id": 5, "title": "Memory Usage", "type": "graph", "targets": [ { "expr": "scrapy_memory_usage_bytes / 1024 / 1024", "legendFormat": "{{spider}} Memory (MB)" } ], "gridPos": {"h": 8, "w": 24, "x": 0, "y": 16} }, { "id": 6, "title": "Queue Size", "type": "graph", "targets": [ { "expr": "scrapy_queue_size", "legendFormat": "{{spider}} Queue" } ], "gridPos": {"h": 8, "w": 12, "x": 0, "y": 24} }, { "id": 7, "title": "Active Requests", "type": "stat", "targets": [ { "expr": "sum(scrapy_active_requests)", "legendFormat": "Total Active" } ], "gridPos": {"h": 8, "w": 12, "x": 12, "y": 24} } ], "time": {"from": "now-1h", "to": "now"}, "refresh": "5s" }}
Security and Compliance
Security Hardening
import hmacimport hashlibimport timefrom scrapy.downloadermiddlewares.useragent import UserAgentMiddlewarefrom scrapy.exceptions import IgnoreRequestimport logging
class SecurityMiddleware: """Security middleware for production environments"""
def __init__(self, settings): self.settings = settings self.logger = logging.getLogger(__name__)
# Rate limiting self.rate_limits = {} self.last_request_time = {}
# Security headers self.required_headers = { 'User-Agent': True, 'Accept': True, 'Accept-Language': True }
# Blocked patterns self.blocked_patterns = [ r'.*\.exe$', r'.*\.zip$', r'.*admin.*', r'.*login.*', r'.*private.*' ]
def process_request(self, request, spider): # Validate request security if not self._validate_request_security(request): raise IgnoreRequest("Request blocked by security policy")
# Apply rate limiting self._apply_rate_limiting(request, spider)
# Add security headers self._add_security_headers(request)
return None
def _validate_request_security(self, request): """Validate request against security policies""" url = request.url.lower()
# Check blocked patterns import re for pattern in self.blocked_patterns: if re.match(pattern, url): self.logger.warning(f"Blocked request to: {request.url}") return False
# Check for required headers for header, required in self.required_headers.items(): if required and header not in request.headers: self.logger.warning(f"Missing required header: {header}") return False
return True
def _apply_rate_limiting(self, request, spider): """Apply rate limiting per domain""" from urllib.parse import urlparse domain = urlparse(request.url).netloc
current_time = time.time() min_delay = self.settings.getfloat('SECURITY_MIN_DELAY', 1.0)
if domain in self.last_request_time: time_since_last = current_time - self.last_request_time[domain] if time_since_last < min_delay: sleep_time = min_delay - time_since_last spider.logger.debug(f"Security rate limiting: {sleep_time:.2f}s for {domain}") time.sleep(sleep_time)
self.last_request_time[domain] = time.time()
def _add_security_headers(self, request): """Add security headers to requests""" # Add timestamp for request validation timestamp = str(int(time.time())) request.headers['X-Request-Timestamp'] = timestamp
# Add security token if configured secret_key = self.settings.get('SECURITY_SECRET_KEY') if secret_key: signature = hmac.new( secret_key.encode(), f"{request.url}{timestamp}".encode(), hashlib.sha256 ).hexdigest() request.headers['X-Security-Signature'] = signature
class DataEncryptionPipeline: """Encrypt sensitive data before storage"""
def __init__(self, encryption_key): self.encryption_key = encryption_key self.sensitive_fields = ['email', 'phone', 'address', 'personal_id']
@classmethod def from_crawler(cls, crawler): encryption_key = crawler.settings.get('ENCRYPTION_KEY') if not encryption_key: raise ValueError("ENCRYPTION_KEY setting is required") return cls(encryption_key)
def process_item(self, item, spider): from cryptography.fernet import Fernet
fernet = Fernet(self.encryption_key.encode())
for field in self.sensitive_fields: if field in item and item[field]: # Encrypt sensitive data encrypted_data = fernet.encrypt(str(item[field]).encode()) item[f"{field}_encrypted"] = encrypted_data.decode() # Remove original field del item[field]
return item
# scripts/security_audit.pyimport subprocessimport jsonimport sysfrom typing import Dict, List
class SecurityAudit: """Security audit for production deployment"""
def __init__(self): self.issues = []
def audit_dependencies(self) -> bool: """Audit Python dependencies for known vulnerabilities""" try: result = subprocess.run( ['safety', 'check', '--json'], capture_output=True, text=True )
if result.returncode != 0: vulnerabilities = json.loads(result.stdout) for vuln in vulnerabilities: self.issues.append({ 'type': 'dependency_vulnerability', 'severity': 'high', 'package': vuln['package_name'], 'vulnerability': vuln['vulnerability_id'], 'description': vuln['advisory'] }) return False
return True
except Exception as e: self.issues.append({ 'type': 'audit_error', 'severity': 'medium', 'description': f"Failed to audit dependencies: {e}" }) return False
def audit_docker_image(self, image_name: str) -> bool: """Audit Docker image for security issues""" try: result = subprocess.run( ['trivy', 'image', '--format', 'json', image_name], capture_output=True, text=True )
if result.returncode == 0: scan_results = json.loads(result.stdout)
for result in scan_results.get('Results', []): for vuln in result.get('Vulnerabilities', []): if vuln.get('Severity') in ['HIGH', 'CRITICAL']: self.issues.append({ 'type': 'container_vulnerability', 'severity': vuln['Severity'].lower(), 'package': vuln.get('PkgName'), 'vulnerability': vuln.get('VulnerabilityID'), 'description': vuln.get('Description', '') })
return len([i for i in self.issues if i['type'] == 'container_vulnerability']) == 0
except Exception as e: self.issues.append({ 'type': 'audit_error', 'severity': 'medium', 'description': f"Failed to audit Docker image: {e}" }) return False
def audit_kubernetes_config(self, config_path: str) -> bool: """Audit Kubernetes configuration""" security_checks = [ self._check_non_root_user, self._check_resource_limits, self._check_security_context, self._check_network_policies ]
passed = True for check in security_checks: if not check(config_path): passed = False
return passed
def _check_non_root_user(self, config_path: str) -> bool: """Check if containers run as non-root""" # Implementation for checking non-root user return True
def _check_resource_limits(self, config_path: str) -> bool: """Check if resource limits are set""" # Implementation for checking resource limits return True
def _check_security_context(self, config_path: str) -> bool: """Check security context configuration""" # Implementation for checking security context return True
def _check_network_policies(self, config_path: str) -> bool: """Check network policies""" # Implementation for checking network policies return True
def generate_report(self) -> Dict: """Generate security audit report""" severity_counts = {} for issue in self.issues: severity = issue['severity'] severity_counts[severity] = severity_counts.get(severity, 0) + 1
return { 'total_issues': len(self.issues), 'severity_breakdown': severity_counts, 'issues': self.issues, 'passed': len(self.issues) == 0 }
def run_full_audit(self, image_name: str = None, config_path: str = None) -> bool: """Run complete security audit""" print("π Running security audit...")
# Audit dependencies deps_ok = self.audit_dependencies() print(f"Dependencies: {'β
' if deps_ok else 'β'}")
# Audit Docker image if provided if image_name: image_ok = self.audit_docker_image(image_name) print(f"Docker image: {'β
' if image_ok else 'β'}")
# Audit Kubernetes config if provided if config_path: k8s_ok = self.audit_kubernetes_config(config_path) print(f"Kubernetes config: {'β
' if k8s_ok else 'β'}")
# Generate report report = self.generate_report()
if report['passed']: print("π Security audit passed!") else: print(f"π₯ Security audit failed with {report['total_issues']} issues") for issue in self.issues: print(f" - {issue['severity'].upper()}: {issue['description']}")
return report['passed']
if __name__ == "__main__": import argparse
parser = argparse.ArgumentParser(description='Run security audit') parser.add_argument('--image', help='Docker image to audit') parser.add_argument('--config', help='Kubernetes config path to audit') args = parser.parse_args()
audit = SecurityAudit() success = audit.run_full_audit(args.image, args.config)
sys.exit(0 if success else 1)
Summary and Congratulations!
π Congratulations! Youβve completed the comprehensive 5-part Scrapy mastery series!
What Youβve Accomplished
Throughout this series, youβve built expertise in:
β
Part 1: Scrapy Fundamentals - Professional environment setup and basic spiders
β
Part 2: Advanced Techniques - JavaScript rendering, forms, and AJAX handling
β
Part 3: Anti-Detection & Scaling - Distributed scraping and stealth techniques
β
Part 4: Data Processing - Advanced validation, storage, and analytics
β
Part 5: Production Deployment - Enterprise-grade deployment and monitoring
Key Production Skills Mastered
- Docker containerization with multi-stage builds and security
- Kubernetes orchestration with auto-scaling and health checks
- CI/CD pipelines with automated testing and deployment
- Production monitoring with Prometheus and Grafana
- Security hardening and compliance frameworks
- Cloud deployment strategies across multiple platforms
Next Steps and Advanced Topics
- Machine Learning Integration: Add ML models for data classification and anomaly detection
- Real-time Processing: Implement streaming data pipelines with Apache Kafka
- Advanced Analytics: Build predictive models and business intelligence dashboards
- Global Distribution: Deploy across multiple regions with CDN integration
- Cost Optimization: Implement intelligent resource scheduling and spot instances
Best Practices Checklist
Before going to production, ensure you have:
- Comprehensive test coverage (>90%)
- Security audit passed
- Performance benchmarks established
- Monitoring and alerting configured
- Backup and disaster recovery tested
- Documentation and runbooks complete
- Team training and knowledge transfer done
Community and Resources
- Official Scrapy Documentation: docs.scrapy.org
- Scrapy Community: Join the Scrapy community on GitHub and Discord
- Advanced Courses: Consider specialized courses on distributed systems and data engineering
- Professional Services: For enterprise implementations, consider professional consulting
Final Project Challenge
Build a complete production-ready scraping system that:
- Scrapes multiple e-commerce sites with different technologies
- Processes millions of products daily with 99.9% uptime
- Provides real-time analytics and business insights
- Scales automatically based on demand
- Maintains legal compliance and ethical standards
- Includes comprehensive monitoring and alerting
You now have the knowledge and tools to build enterprise-grade web scraping solutions that can handle any challenge!
Happy production scraping! ππ·οΈ
This concludes our comprehensive 5-part series on mastering web scraping with Scrapy. Youβre now equipped to tackle any web scraping challenge at enterprise scale!