Master Web Scraping with Scrapy - Part 3

Master Web Scraping - Part 3: Anti-Detection and Scaling

ST

Surendra Tamang

50 min read advanced

Prerequisites

  • Completed Parts 1-2 of this series
  • Understanding of distributed systems
  • Basic knowledge of Redis and Docker
  • Experience with monitoring tools

Master Web Scraping with Scrapy: Anti-Detection and Scaling

Welcome to Part 3 of our comprehensive Scrapy series! In this advanced tutorial, we’ll explore sophisticated anti-detection techniques, distributed scraping architectures, and scaling strategies for enterprise-level web scraping operations.

What You’ll Learn in This Part

  • Advanced anti-detection and stealth techniques
  • Distributed scraping with Scrapy-Redis
  • IP rotation and proxy management
  • Browser fingerprinting avoidance
  • Rate limiting and adaptive throttling
  • Monitoring, alerting, and health checks
  • Legal compliance and ethical scraping
  • Performance optimization and scaling

Anti-Detection Techniques

Modern websites employ sophisticated detection mechanisms. Here’s how to stay under the radar:

Stealth Configuration

# webscraper/settings.py - Advanced Stealth Settings
# Respect robots.txt but with custom delay
ROBOTSTXT_OBEY = True
ROBOTSTXT_USER_AGENT = '*'
# Realistic browser behavior
DOWNLOAD_DELAY = 3 # Base delay between requests
RANDOMIZE_DOWNLOAD_DELAY = True # 0.5 * to 1.5 * DOWNLOAD_DELAY
DOWNLOAD_TIMEOUT = 30
DOWNLOAD_MAXSIZE = 1073741824 # 1GB
DOWNLOAD_WARNSIZE = 33554432 # 32MB
# Advanced AutoThrottle settings
AUTOTHROTTLE_ENABLED = True
AUTOTHROTTLE_START_DELAY = 1
AUTOTHROTTLE_MAX_DELAY = 60
AUTOTHROTTLE_TARGET_CONCURRENCY = 1.0 # Conservative for stealth
AUTOTHROTTLE_DEBUG = False
# Realistic request patterns
CONCURRENT_REQUESTS = 8
CONCURRENT_REQUESTS_PER_DOMAIN = 2
# Advanced cookie handling
COOKIES_ENABLED = True
COOKIES_DEBUG = False
# Memory and resource management
MEMUSAGE_ENABLED = True
MEMUSAGE_LIMIT_MB = 2048
MEMUSAGE_WARNING_MB = 1024
# DNS timeout
DNSCACHE_ENABLED = True
DNSCACHE_SIZE = 10000
DNS_TIMEOUT = 60
# Enable telnet console for debugging (disable in production)
TELNETCONSOLE_ENABLED = False
# Custom middleware stack for stealth
DOWNLOADER_MIDDLEWARES = {
'webscraper.middlewares.StealthUserAgentMiddleware': 400,
'webscraper.middlewares.ProxyRotationMiddleware': 410,
'webscraper.middlewares.HeaderSpoofingMiddleware': 420,
'webscraper.middlewares.CookiePersistenceMiddleware': 430,
'webscraper.middlewares.RequestTimingMiddleware': 440,
'webscraper.middlewares.FingerprintResistanceMiddleware': 450,
'scrapy.downloadermiddlewares.retry.RetryMiddleware': 500,
'scrapy.downloadermiddlewares.useragent.UserAgentMiddleware': None, # Disabled
}
# Anti-detection headers
DEFAULT_REQUEST_HEADERS = {
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
'Accept-Language': 'en-US,en;q=0.9',
'Accept-Encoding': 'gzip, deflate, br',
'DNT': '1',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1',
'Sec-Fetch-Dest': 'document',
'Sec-Fetch-Mode': 'navigate',
'Sec-Fetch-Site': 'none',
'Sec-Fetch-User': '?1',
'Cache-Control': 'max-age=0',
}

Advanced Stealth Middleware

webscraper/middlewares.py
import random
import time
import json
import hashlib
from datetime import datetime, timedelta
from scrapy.downloadermiddlewares.useragent import UserAgentMiddleware
from scrapy import signals
from scrapy.exceptions import IgnoreRequest
class StealthUserAgentMiddleware(UserAgentMiddleware):
"""Advanced user agent rotation with browser consistency"""
def __init__(self):
# Browser families with consistent headers
self.browser_profiles = {
'chrome': {
'user_agents': [
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
],
'sec_ch_ua': '"Not_A Brand";v="8", "Chromium";v="120", "Google Chrome";v="120"',
'sec_ch_ua_mobile': '?0',
'sec_ch_ua_platform': '"Windows"',
},
'firefox': {
'user_agents': [
'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:121.0) Gecko/20100101 Firefox/121.0',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:121.0) Gecko/20100101 Firefox/121.0',
'Mozilla/5.0 (X11; Linux x86_64; rv:121.0) Gecko/20100101 Firefox/121.0',
],
},
'safari': {
'user_agents': [
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.1 Safari/605.1.15',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.6 Safari/605.1.15',
],
}
}
self.current_profile = None
self.session_ua = None
def process_request(self, request, spider):
# Maintain consistent user agent per session
if not self.session_ua:
browser = random.choice(list(self.browser_profiles.keys()))
self.current_profile = self.browser_profiles[browser]
self.session_ua = random.choice(self.current_profile['user_agents'])
request.headers['User-Agent'] = self.session_ua
# Add browser-specific headers
if 'chrome' in self.session_ua.lower():
profile = self.browser_profiles['chrome']
request.headers['sec-ch-ua'] = profile['sec_ch_ua']
request.headers['sec-ch-ua-mobile'] = profile['sec_ch_ua_mobile']
request.headers['sec-ch-ua-platform'] = profile['sec_ch_ua_platform']
class HeaderSpoofingMiddleware:
"""Spoof headers to mimic real browser behavior"""
def __init__(self):
self.session_id = hashlib.md5(str(time.time()).encode()).hexdigest()[:16]
self.request_count = 0
def process_request(self, request, spider):
self.request_count += 1
# Add realistic timing headers
request.headers['X-Request-ID'] = f"{self.session_id}-{self.request_count}"
# Randomize header order
headers_to_randomize = [
'Accept-Encoding',
'Accept-Language',
'Cache-Control',
'Connection',
'DNT',
'Upgrade-Insecure-Requests'
]
# Occasionally omit some headers to appear more human
if random.random() < 0.1: # 10% chance
header_to_remove = random.choice(headers_to_randomize)
if header_to_remove in request.headers:
del request.headers[header_to_remove]
# Add referer for non-start URLs
if not request.meta.get('is_start_url', False):
if 'Referer' not in request.headers:
# Use the domain as referer
domain = request.url.split('/')[2]
request.headers['Referer'] = f"https://{domain}/"
class RequestTimingMiddleware:
"""Implement realistic request timing patterns"""
def __init__(self):
self.last_request_time = {}
self.human_patterns = [
# Reading pattern: quick succession then pause
[0.5, 0.3, 0.8, 5.0],
# Browsing pattern: varied intervals
[1.2, 2.1, 0.7, 3.5, 1.8],
# Search pattern: quick then slow
[0.4, 0.6, 8.0, 2.0],
]
self.current_pattern = []
self.pattern_index = 0
def process_request(self, request, spider):
domain = request.url.split('/')[2]
current_time = time.time()
# Get or initialize timing for this domain
if domain not in self.last_request_time:
self.last_request_time[domain] = current_time
return
# Calculate time since last request to this domain
time_since_last = current_time - self.last_request_time[domain]
# Choose a human-like delay pattern
if not self.current_pattern:
self.current_pattern = random.choice(self.human_patterns).copy()
self.pattern_index = 0
target_delay = self.current_pattern[self.pattern_index]
self.pattern_index = (self.pattern_index + 1) % len(self.current_pattern)
# Apply delay if needed
if time_since_last < target_delay:
sleep_time = target_delay - time_since_last
spider.logger.debug(f'Applying human-like delay: {sleep_time:.2f}s for {domain}')
time.sleep(sleep_time)
self.last_request_time[domain] = time.time()
class FingerprintResistanceMiddleware:
"""Resist browser fingerprinting attempts"""
def process_request(self, request, spider):
# Randomize Accept header slightly
accept_variations = [
'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8',
'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
]
if random.random() < 0.3: # 30% chance to vary
request.headers['Accept'] = random.choice(accept_variations)
# Randomize Accept-Language
lang_variations = [
'en-US,en;q=0.9',
'en-US,en;q=0.8',
'en-US,en;q=0.9,es;q=0.8',
'en-US,en;q=0.5',
]
if random.random() < 0.2: # 20% chance to vary
request.headers['Accept-Language'] = random.choice(lang_variations)
class CookiePersistenceMiddleware:
"""Advanced cookie management for session consistency"""
def __init__(self):
self.session_cookies = {}
def process_response(self, request, response, spider):
# Store cookies for this domain
domain = request.url.split('/')[2]
cookies = response.headers.getlist('Set-Cookie')
if cookies:
if domain not in self.session_cookies:
self.session_cookies[domain] = {}
for cookie in cookies:
cookie_str = cookie.decode('utf-8')
if '=' in cookie_str:
name, value = cookie_str.split('=', 1)
# Store only the name=value part
value = value.split(';')[0]
self.session_cookies[domain][name] = value
return response
def process_request(self, request, spider):
domain = request.url.split('/')[2]
# Add stored cookies for this domain
if domain in self.session_cookies:
cookie_header = '; '.join([
f"{name}={value}"
for name, value in self.session_cookies[domain].items()
])
if cookie_header:
request.headers['Cookie'] = cookie_header

Proxy Rotation System

webscraper/proxy_manager.py
import random
import time
import requests
from typing import List, Dict, Optional
import threading
from dataclasses import dataclass
from enum import Enum
class ProxyStatus(Enum):
ACTIVE = "active"
FAILED = "failed"
RATE_LIMITED = "rate_limited"
TESTING = "testing"
@dataclass
class ProxyInfo:
host: str
port: int
username: Optional[str] = None
password: Optional[str] = None
protocol: str = "http"
status: ProxyStatus = ProxyStatus.TESTING
success_count: int = 0
failure_count: int = 0
last_used: Optional[float] = None
response_time: Optional[float] = None
rate_limit_until: Optional[float] = None
@property
def url(self) -> str:
if self.username and self.password:
return f"{self.protocol}://{self.username}:{self.password}@{self.host}:{self.port}"
return f"{self.protocol}://{self.host}:{self.port}"
@property
def success_rate(self) -> float:
total = self.success_count + self.failure_count
return self.success_count / total if total > 0 else 0.0
def is_available(self) -> bool:
if self.status == ProxyStatus.FAILED:
return False
if self.rate_limit_until and time.time() < self.rate_limit_until:
return False
return True
class ProxyManager:
"""Advanced proxy rotation and health management"""
def __init__(self, proxy_list: List[Dict], health_check_interval: int = 300):
self.proxies = [ProxyInfo(**proxy) for proxy in proxy_list]
self.health_check_interval = health_check_interval
self.last_health_check = 0
self.lock = threading.Lock()
# Start health checking thread
self.health_thread = threading.Thread(target=self._health_check_loop, daemon=True)
self.health_thread.start()
def get_proxy(self) -> Optional[ProxyInfo]:
"""Get the best available proxy"""
with self.lock:
available_proxies = [p for p in self.proxies if p.is_available()]
if not available_proxies:
return None
# Sort by success rate and response time
available_proxies.sort(
key=lambda p: (p.success_rate, -p.response_time or 0),
reverse=True
)
# Weighted selection favoring better proxies
weights = [max(0.1, p.success_rate) for p in available_proxies]
proxy = random.choices(available_proxies, weights=weights)[0]
proxy.last_used = time.time()
return proxy
def report_success(self, proxy: ProxyInfo, response_time: float = None):
"""Report successful proxy usage"""
with self.lock:
proxy.success_count += 1
proxy.status = ProxyStatus.ACTIVE
if response_time:
proxy.response_time = response_time
def report_failure(self, proxy: ProxyInfo, is_rate_limit: bool = False):
"""Report proxy failure"""
with self.lock:
proxy.failure_count += 1
if is_rate_limit:
proxy.status = ProxyStatus.RATE_LIMITED
proxy.rate_limit_until = time.time() + 300 # 5 minute cooldown
elif proxy.failure_count > 5:
proxy.status = ProxyStatus.FAILED
def _health_check_loop(self):
"""Background thread for proxy health checking"""
while True:
time.sleep(self.health_check_interval)
self._perform_health_checks()
def _perform_health_checks(self):
"""Check health of all proxies"""
test_url = "http://httpbin.org/ip"
for proxy in self.proxies:
if proxy.status == ProxyStatus.FAILED:
continue
try:
start_time = time.time()
response = requests.get(
test_url,
proxies={"http": proxy.url, "https": proxy.url},
timeout=10
)
response_time = time.time() - start_time
if response.status_code == 200:
self.report_success(proxy, response_time)
else:
self.report_failure(proxy)
except Exception:
self.report_failure(proxy)
def get_stats(self) -> Dict:
"""Get proxy pool statistics"""
with self.lock:
total = len(self.proxies)
active = sum(1 for p in self.proxies if p.status == ProxyStatus.ACTIVE)
failed = sum(1 for p in self.proxies if p.status == ProxyStatus.FAILED)
rate_limited = sum(1 for p in self.proxies if p.status == ProxyStatus.RATE_LIMITED)
return {
'total': total,
'active': active,
'failed': failed,
'rate_limited': rate_limited,
'success_rate': sum(p.success_rate for p in self.proxies) / total if total > 0 else 0
}
# Proxy rotation middleware
class AdvancedProxyRotationMiddleware:
"""Advanced proxy rotation with health management"""
def __init__(self, proxy_list):
self.proxy_manager = ProxyManager(proxy_list)
@classmethod
def from_crawler(cls, crawler):
proxy_list = crawler.settings.get('PROXY_LIST', [])
return cls(proxy_list)
def process_request(self, request, spider):
proxy = self.proxy_manager.get_proxy()
if proxy:
request.meta['proxy'] = proxy.url
request.meta['proxy_info'] = proxy
spider.logger.debug(f'Using proxy: {proxy.host}:{proxy.port}')
else:
spider.logger.warning('No available proxies')
def process_response(self, request, response, spider):
proxy_info = request.meta.get('proxy_info')
if proxy_info:
if response.status == 200:
self.proxy_manager.report_success(proxy_info)
elif response.status == 429: # Rate limited
self.proxy_manager.report_failure(proxy_info, is_rate_limit=True)
elif response.status >= 400:
self.proxy_manager.report_failure(proxy_info)
return response
def process_exception(self, request, exception, spider):
proxy_info = request.meta.get('proxy_info')
if proxy_info:
self.proxy_manager.report_failure(proxy_info)

Distributed Scraping with Scrapy-Redis

Scale your scraping operations across multiple machines:

Scrapy-Redis Setup

Terminal window
# Install Scrapy-Redis
pip install scrapy-redis
# Start Redis server
redis-server
# Or using Docker
docker run -d -p 6379:6379 redis:alpine

Distributed Spider Configuration

# webscraper/settings.py - Redis Configuration
# Enable Scrapy-Redis
SCHEDULER = "scrapy_redis.scheduler.Scheduler"
DUPEFILTER_CLASS = "scrapy_redis.dupefilter.RFPDupeFilter"
ITEM_PIPELINES = {
'scrapy_redis.pipelines.RedisPipeline': 300,
'webscraper.pipelines.ValidationPipeline': 400,
}
# Redis connection
REDIS_URL = 'redis://localhost:6379'
# Or with detailed configuration
REDIS_PARAMS = {
'host': 'localhost',
'port': 6379,
'db': 0,
'password': None,
}
# Scheduler configuration
SCHEDULER_PERSIST = True # Keep scheduler data after spider closes
SCHEDULER_QUEUE_CLASS = 'scrapy_redis.queue.PriorityQueue'
# Request serialization
SCHEDULER_SERIALIZER = "scrapy_redis.picklecompat"

Distributed Spider

webscraper/spiders/distributed_spider.py
from scrapy_redis.spiders import RedisSpider
from webscraper.items import ProductItem
from scrapy.loader import ItemLoader
import json
class DistributedEcommerceSpider(RedisSpider):
"""Distributed spider using Redis for coordination"""
name = 'distributed_ecommerce'
redis_key = 'distributed_ecommerce:start_urls'
custom_settings = {
'CONCURRENT_REQUESTS': 32,
'CONCURRENT_REQUESTS_PER_DOMAIN': 16,
'DOWNLOAD_DELAY': 1,
'SCHEDULER_IDLE_BEFORE_CLOSE': 10, # Wait 10 seconds before closing
}
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.processed_count = 0
self.error_count = 0
def parse(self, response):
"""Parse category pages and product listings"""
self.logger.info(f'Worker {self.name} processing: {response.url}')
# Extract product links
product_links = response.css('.product-item a::attr(href)').getall()
for link in product_links:
product_url = response.urljoin(link)
yield response.follow(
product_url,
callback=self.parse_product,
meta={'category_url': response.url}
)
# Extract pagination
next_page = response.css('.pagination .next::attr(href)').get()
if next_page:
yield response.follow(next_page, callback=self.parse)
# Add more category pages to Redis queue
self.discover_more_urls(response)
def parse_product(self, response):
"""Parse individual product pages"""
try:
loader = ItemLoader(item=ProductItem(), response=response)
loader.add_css('name', 'h1.product-title::text')
loader.add_css('price', '.price-current::text')
loader.add_css('description', '.product-description::text')
loader.add_css('brand', '.brand-name::text')
loader.add_css('rating', '.rating-value::text')
# Add metadata
loader.add_value('url', response.url)
loader.add_value('category_url', response.meta.get('category_url'))
loader.add_value('worker_id', self.name)
loader.add_value('scraped_at', self.get_current_time())
item = loader.load_item()
self.processed_count += 1
if self.processed_count % 100 == 0:
self.logger.info(f'Worker {self.name} processed {self.processed_count} products')
yield item
except Exception as e:
self.error_count += 1
self.logger.error(f'Error parsing product {response.url}: {e}')
def discover_more_urls(self, response):
"""Discover and add more URLs to the Redis queue"""
# Find category links
category_links = response.css('.category-nav a::attr(href)').getall()
for link in category_links:
category_url = response.urljoin(link)
# Add to Redis queue for other workers
self.server.lpush(
f'{self.redis_key}:discovered',
json.dumps({'url': category_url, 'priority': 1})
)
def get_current_time(self):
from datetime import datetime
return datetime.now().isoformat()
def closed(self, reason):
"""Spider closing callback"""
self.logger.info(f'Worker {self.name} closing: {reason}')
self.logger.info(f'Processed: {self.processed_count}, Errors: {self.error_count}')
# Master coordinator spider
class CoordinatorSpider(RedisSpider):
"""Coordinator spider that feeds URLs to workers"""
name = 'coordinator'
redis_key = 'distributed_ecommerce:start_urls'
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.seed_urls()
def seed_urls(self):
"""Seed initial URLs into Redis"""
initial_urls = [
'https://example-store.com/categories/electronics',
'https://example-store.com/categories/clothing',
'https://example-store.com/categories/home',
'https://example-store.com/categories/books',
]
for url in initial_urls:
self.server.lpush(self.redis_key, url)
self.logger.info(f'Seeded {len(initial_urls)} URLs')
def parse(self, response):
"""This spider doesn't parse, it just coordinates"""
pass

Worker Management Script

scripts/worker_manager.py
import subprocess
import sys
import time
import signal
import json
import redis
from typing import List, Dict
class WorkerManager:
"""Manage distributed scraping workers"""
def __init__(self, redis_url: str = 'redis://localhost:6379'):
self.redis_client = redis.from_url(redis_url)
self.workers: List[subprocess.Popen] = []
self.running = True
# Register signal handlers
signal.signal(signal.SIGINT, self.shutdown_handler)
signal.signal(signal.SIGTERM, self.shutdown_handler)
def start_workers(self, spider_name: str, num_workers: int = 4):
"""Start multiple worker processes"""
print(f"Starting {num_workers} workers for spider '{spider_name}'")
for i in range(num_workers):
worker_id = f"{spider_name}_worker_{i}"
cmd = [
'scrapy', 'crawl', spider_name,
'-s', f'BOT_NAME={worker_id}',
'-L', 'INFO'
]
worker = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
self.workers.append(worker)
print(f"Started worker {worker_id} (PID: {worker.pid})")
def monitor_workers(self):
"""Monitor worker health and restart if needed"""
while self.running:
for i, worker in enumerate(self.workers):
if worker.poll() is not None: # Worker has terminated
print(f"Worker {i} terminated with code {worker.returncode}")
# Restart worker if it crashed
if worker.returncode != 0:
print(f"Restarting worker {i}")
# Implementation for restarting worker
pass
# Monitor Redis queue sizes
self.print_queue_stats()
time.sleep(30) # Check every 30 seconds
def print_queue_stats(self):
"""Print Redis queue statistics"""
try:
queue_size = self.redis_client.llen('distributed_ecommerce:start_urls')
processing = self.redis_client.scard('distributed_ecommerce:dupefilter')
print(f"Queue size: {queue_size}, Processed: {processing}")
except Exception as e:
print(f"Error getting Redis stats: {e}")
def shutdown_handler(self, signum, frame):
"""Handle shutdown signals"""
print("\nShutting down workers...")
self.running = False
for i, worker in enumerate(self.workers):
print(f"Terminating worker {i}")
worker.terminate()
# Wait for graceful shutdown
try:
worker.wait(timeout=10)
except subprocess.TimeoutExpired:
print(f"Force killing worker {i}")
worker.kill()
print("All workers shut down")
sys.exit(0)
def run(self, spider_name: str, num_workers: int = 4):
"""Main execution method"""
self.start_workers(spider_name, num_workers)
self.monitor_workers()
if __name__ == "__main__":
if len(sys.argv) < 2:
print("Usage: python worker_manager.py <spider_name> [num_workers]")
sys.exit(1)
spider_name = sys.argv[1]
num_workers = int(sys.argv[2]) if len(sys.argv) > 2 else 4
manager = WorkerManager()
manager.run(spider_name, num_workers)

Monitoring and Alerting

Comprehensive Monitoring System

webscraper/monitoring.py
import time
import json
import smtplib
from datetime import datetime, timedelta
from email.mime.text import MimeText
from email.mime.multipart import MimeMultipart
from typing import Dict, List, Optional
import psutil
import redis
from dataclasses import dataclass, asdict
@dataclass
class ScrapingMetrics:
timestamp: str
spider_name: str
pages_scraped: int
items_extracted: int
errors: int
response_time_avg: float
memory_usage_mb: float
cpu_usage_percent: float
active_requests: int
queue_size: int
success_rate: float
class PerformanceMonitor:
"""Monitor scraping performance and system resources"""
def __init__(self, redis_url: str = 'redis://localhost:6379'):
self.redis_client = redis.from_url(redis_url)
self.metrics_history: List[ScrapingMetrics] = []
self.alert_thresholds = {
'error_rate': 0.1, # 10% error rate
'memory_usage': 2048, # 2GB memory usage
'cpu_usage': 80, # 80% CPU usage
'response_time': 10, # 10 second average response time
'queue_stall': 300, # 5 minutes without queue progress
}
self.last_queue_size = 0
self.queue_stall_start = None
def collect_metrics(self, spider_name: str) -> ScrapingMetrics:
"""Collect current performance metrics"""
# Get system metrics
memory_usage = psutil.virtual_memory().used / 1024 / 1024 # MB
cpu_usage = psutil.cpu_percent(interval=1)
# Get Redis metrics
try:
queue_size = self.redis_client.llen(f'{spider_name}:start_urls')
processed_count = self.redis_client.scard(f'{spider_name}:dupefilter')
# Detect queue stall
if queue_size == self.last_queue_size:
if not self.queue_stall_start:
self.queue_stall_start = time.time()
else:
self.queue_stall_start = None
self.last_queue_size = queue_size
except Exception:
queue_size = 0
processed_count = 0
# Create metrics object (simplified - in real implementation,
# you'd collect these from your spider's stats)
metrics = ScrapingMetrics(
timestamp=datetime.now().isoformat(),
spider_name=spider_name,
pages_scraped=processed_count,
items_extracted=processed_count * 0.8, # Estimate
errors=processed_count * 0.05, # Estimate
response_time_avg=2.5, # Would be collected from actual stats
memory_usage_mb=memory_usage,
cpu_usage_percent=cpu_usage,
active_requests=queue_size,
queue_size=queue_size,
success_rate=0.95 # Would be calculated from actual stats
)
self.metrics_history.append(metrics)
# Keep only last 24 hours of metrics
cutoff_time = datetime.now() - timedelta(hours=24)
self.metrics_history = [
m for m in self.metrics_history
if datetime.fromisoformat(m.timestamp) > cutoff_time
]
return metrics
def check_alerts(self, metrics: ScrapingMetrics) -> List[Dict]:
"""Check metrics against alert thresholds"""
alerts = []
# Error rate alert
error_rate = metrics.errors / max(1, metrics.pages_scraped)
if error_rate > self.alert_thresholds['error_rate']:
alerts.append({
'type': 'error_rate',
'severity': 'high',
'message': f'High error rate: {error_rate:.2%}',
'value': error_rate,
'threshold': self.alert_thresholds['error_rate']
})
# Memory usage alert
if metrics.memory_usage_mb > self.alert_thresholds['memory_usage']:
alerts.append({
'type': 'memory_usage',
'severity': 'medium',
'message': f'High memory usage: {metrics.memory_usage_mb:.1f}MB',
'value': metrics.memory_usage_mb,
'threshold': self.alert_thresholds['memory_usage']
})
# CPU usage alert
if metrics.cpu_usage_percent > self.alert_thresholds['cpu_usage']:
alerts.append({
'type': 'cpu_usage',
'severity': 'medium',
'message': f'High CPU usage: {metrics.cpu_usage_percent:.1f}%',
'value': metrics.cpu_usage_percent,
'threshold': self.alert_thresholds['cpu_usage']
})
# Response time alert
if metrics.response_time_avg > self.alert_thresholds['response_time']:
alerts.append({
'type': 'response_time',
'severity': 'medium',
'message': f'Slow response time: {metrics.response_time_avg:.1f}s',
'value': metrics.response_time_avg,
'threshold': self.alert_thresholds['response_time']
})
# Queue stall alert
if self.queue_stall_start:
stall_duration = time.time() - self.queue_stall_start
if stall_duration > self.alert_thresholds['queue_stall']:
alerts.append({
'type': 'queue_stall',
'severity': 'high',
'message': f'Queue stalled for {stall_duration/60:.1f} minutes',
'value': stall_duration,
'threshold': self.alert_thresholds['queue_stall']
})
return alerts
def generate_report(self, hours: int = 24) -> Dict:
"""Generate performance report"""
cutoff_time = datetime.now() - timedelta(hours=hours)
recent_metrics = [
m for m in self.metrics_history
if datetime.fromisoformat(m.timestamp) > cutoff_time
]
if not recent_metrics:
return {'error': 'No metrics available'}
# Calculate aggregated statistics
total_pages = sum(m.pages_scraped for m in recent_metrics)
total_items = sum(m.items_extracted for m in recent_metrics)
total_errors = sum(m.errors for m in recent_metrics)
avg_response_time = sum(m.response_time_avg for m in recent_metrics) / len(recent_metrics)
avg_memory = sum(m.memory_usage_mb for m in recent_metrics) / len(recent_metrics)
avg_cpu = sum(m.cpu_usage_percent for m in recent_metrics) / len(recent_metrics)
return {
'period': f'Last {hours} hours',
'total_pages_scraped': total_pages,
'total_items_extracted': total_items,
'total_errors': total_errors,
'error_rate': total_errors / max(1, total_pages),
'average_response_time': avg_response_time,
'average_memory_usage_mb': avg_memory,
'average_cpu_usage_percent': avg_cpu,
'current_queue_size': recent_metrics[-1].queue_size if recent_metrics else 0,
'metrics_count': len(recent_metrics)
}
class AlertManager:
"""Manage and send alerts"""
def __init__(self, config: Dict):
self.config = config
self.sent_alerts = {} # Track sent alerts to avoid spam
self.cooldown_period = 300 # 5 minutes cooldown between same alerts
def send_alert(self, alert: Dict, metrics: ScrapingMetrics):
"""Send alert via configured channels"""
alert_key = f"{alert['type']}_{alert.get('severity', 'medium')}"
current_time = time.time()
# Check cooldown
if alert_key in self.sent_alerts:
if current_time - self.sent_alerts[alert_key] < self.cooldown_period:
return # Skip sending due to cooldown
# Send via email
if self.config.get('email'):
self._send_email_alert(alert, metrics)
# Send via webhook
if self.config.get('webhook'):
self._send_webhook_alert(alert, metrics)
# Log to file
self._log_alert(alert, metrics)
self.sent_alerts[alert_key] = current_time
def _send_email_alert(self, alert: Dict, metrics: ScrapingMetrics):
"""Send alert via email"""
try:
smtp_config = self.config['email']
msg = MimeMultipart()
msg['From'] = smtp_config['from']
msg['To'] = smtp_config['to']
msg['Subject'] = f"Scraping Alert: {alert['type']} - {alert['severity'].upper()}"
body = f"""
Alert: {alert['message']}
Spider: {metrics.spider_name}
Time: {metrics.timestamp}
Current Value: {alert['value']}
Threshold: {alert['threshold']}
Current Metrics:
- Pages Scraped: {metrics.pages_scraped}
- Items Extracted: {metrics.items_extracted}
- Error Rate: {metrics.errors / max(1, metrics.pages_scraped):.2%}
- Memory Usage: {metrics.memory_usage_mb:.1f}MB
- CPU Usage: {metrics.cpu_usage_percent:.1f}%
- Queue Size: {metrics.queue_size}
"""
msg.attach(MimeText(body, 'plain'))
with smtplib.SMTP(smtp_config['smtp_server'], smtp_config['smtp_port']) as server:
if smtp_config.get('use_tls'):
server.starttls()
if smtp_config.get('username'):
server.login(smtp_config['username'], smtp_config['password'])
server.send_message(msg)
except Exception as e:
print(f"Failed to send email alert: {e}")
def _send_webhook_alert(self, alert: Dict, metrics: ScrapingMetrics):
"""Send alert via webhook"""
import requests
try:
webhook_config = self.config['webhook']
payload = {
'alert': alert,
'metrics': asdict(metrics),
'timestamp': metrics.timestamp
}
response = requests.post(
webhook_config['url'],
json=payload,
headers=webhook_config.get('headers', {}),
timeout=10
)
response.raise_for_status()
except Exception as e:
print(f"Failed to send webhook alert: {e}")
def _log_alert(self, alert: Dict, metrics: ScrapingMetrics):
"""Log alert to file"""
import logging
logging.basicConfig(
filename='scraping_alerts.log',
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logging.info(
f"ALERT: {alert['type']} - {alert['message']} "
f"(Spider: {metrics.spider_name}, Value: {alert['value']})"
)

Monitoring Dashboard Script

scripts/monitoring_dashboard.py
import time
import json
from webscraper.monitoring import PerformanceMonitor, AlertManager
def main():
"""Main monitoring loop"""
# Configuration
config = {
'email': {
'smtp_server': 'smtp.gmail.com',
'smtp_port': 587,
'use_tls': True,
'username': '[email protected]',
'password': 'your-app-password',
'from': '[email protected]',
},
'webhook': {
'url': 'https://hooks.slack.com/services/YOUR/SLACK/WEBHOOK',
'headers': {'Content-Type': 'application/json'}
}
}
monitor = PerformanceMonitor()
alert_manager = AlertManager(config)
spider_name = 'distributed_ecommerce'
print("Starting monitoring dashboard...")
print("Press Ctrl+C to stop")
try:
while True:
# Collect metrics
metrics = monitor.collect_metrics(spider_name)
# Check for alerts
alerts = monitor.check_alerts(metrics)
# Send alerts
for alert in alerts:
alert_manager.send_alert(alert, metrics)
print(f"ALERT: {alert['message']}")
# Print current status
print(f"\n[{metrics.timestamp}] Spider: {spider_name}")
print(f"Pages: {metrics.pages_scraped}, Items: {metrics.items_extracted}")
print(f"Errors: {metrics.errors}, Queue: {metrics.queue_size}")
print(f"Memory: {metrics.memory_usage_mb:.1f}MB, CPU: {metrics.cpu_usage_percent:.1f}%")
if alerts:
print(f"Active alerts: {len(alerts)}")
# Generate hourly report
if int(time.time()) % 3600 == 0: # Every hour
report = monitor.generate_report(hours=1)
print("\n--- Hourly Report ---")
print(json.dumps(report, indent=2))
time.sleep(60) # Check every minute
except KeyboardInterrupt:
print("\nMonitoring stopped")
if __name__ == "__main__":
main()

Compliance Framework

webscraper/compliance.py
import time
import requests
from urllib.robotparser import RobotFileParser
from urllib.parse import urljoin, urlparse
from typing import Dict, List, Optional
import logging
class ComplianceManager:
"""Ensure legal and ethical scraping compliance"""
def __init__(self):
self.robots_cache = {}
self.rate_limits = {}
self.compliance_rules = {
'respect_robots_txt': True,
'rate_limit_default': 1.0, # 1 second between requests
'rate_limit_per_domain': {},
'user_agent_required': True,
'contact_info': '[email protected]',
'personal_data_handling': 'exclude',
'copyright_respect': True,
}
self.logger = logging.getLogger(__name__)
def check_robots_txt(self, url: str, user_agent: str = '*') -> bool:
"""Check if URL is allowed by robots.txt"""
domain = urlparse(url).netloc
if domain not in self.robots_cache:
robots_url = urljoin(f"https://{domain}", "/robots.txt")
try:
rp = RobotFileParser()
rp.set_url(robots_url)
rp.read()
self.robots_cache[domain] = rp
# Extract crawl delay if specified
delay = rp.crawl_delay(user_agent)
if delay:
self.rate_limits[domain] = float(delay)
except Exception as e:
self.logger.warning(f"Could not fetch robots.txt for {domain}: {e}")
return True # Allow if robots.txt can't be fetched
robots_parser = self.robots_cache.get(domain)
if robots_parser:
return robots_parser.can_fetch(user_agent, url)
return True
def get_rate_limit(self, domain: str) -> float:
"""Get appropriate rate limit for domain"""
# Check domain-specific rate limit
if domain in self.compliance_rules['rate_limit_per_domain']:
return self.compliance_rules['rate_limit_per_domain'][domain]
# Check robots.txt specified delay
if domain in self.rate_limits:
return self.rate_limits[domain]
# Use default
return self.compliance_rules['rate_limit_default']
def validate_request(self, request) -> bool:
"""Validate request against compliance rules"""
url = request.url
domain = urlparse(url).netloc
user_agent = request.headers.get('User-Agent', '')
# Check robots.txt
if self.compliance_rules['respect_robots_txt']:
if not self.check_robots_txt(url, user_agent):
self.logger.warning(f"Blocked by robots.txt: {url}")
return False
# Check user agent requirement
if self.compliance_rules['user_agent_required']:
if not user_agent or 'bot' not in user_agent.lower():
self.logger.warning(f"Invalid user agent for compliance: {user_agent}")
return False
return True
def filter_personal_data(self, item: Dict) -> Dict:
"""Filter out personal data from scraped items"""
if self.compliance_rules['personal_data_handling'] == 'exclude':
sensitive_fields = [
'email', 'phone', 'address', 'ssn', 'credit_card',
'personal_id', 'passport', 'driver_license'
]
filtered_item = {}
for key, value in item.items():
if key.lower() not in sensitive_fields:
# Additional check for values that look like personal data
if not self.looks_like_personal_data(str(value)):
filtered_item[key] = value
else:
self.logger.info(f"Filtered potential personal data: {key}")
return filtered_item
return item
def looks_like_personal_data(self, text: str) -> bool:
"""Heuristic check for personal data patterns"""
import re
patterns = [
r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', # Email
r'\b\d{3}-\d{2}-\d{4}\b', # SSN
r'\b\d{3}-\d{3}-\d{4}\b', # Phone
r'\b\d{4}[- ]?\d{4}[- ]?\d{4}[- ]?\d{4}\b', # Credit card
]
for pattern in patterns:
if re.search(pattern, text):
return True
return False
def generate_compliance_report(self) -> Dict:
"""Generate compliance status report"""
return {
'robots_txt_domains_checked': len(self.robots_cache),
'custom_rate_limits': len(self.rate_limits),
'compliance_rules': self.compliance_rules,
'timestamp': time.time()
}
# Compliance middleware
class ComplianceMiddleware:
"""Scrapy middleware for compliance enforcement"""
def __init__(self):
self.compliance_manager = ComplianceManager()
self.last_request_time = {}
def process_request(self, request, spider):
# Validate compliance
if not self.compliance_manager.validate_request(request):
raise IgnoreRequest(f"Request blocked by compliance rules: {request.url}")
# Enforce rate limiting
domain = urlparse(request.url).netloc
rate_limit = self.compliance_manager.get_rate_limit(domain)
if domain in self.last_request_time:
time_since_last = time.time() - self.last_request_time[domain]
if time_since_last < rate_limit:
sleep_time = rate_limit - time_since_last
spider.logger.debug(f"Rate limiting: sleeping {sleep_time:.2f}s for {domain}")
time.sleep(sleep_time)
self.last_request_time[domain] = time.time()
def process_item(self, item, spider):
# Filter personal data
filtered_item = self.compliance_manager.filter_personal_data(dict(item))
return filtered_item

Summary and Next Steps

In this advanced part, you’ve mastered:

Sophisticated anti-detection techniques and stealth configurations
Distributed scraping with Scrapy-Redis for massive scale
Advanced proxy management with health monitoring
Comprehensive monitoring and alerting systems
Legal compliance and ethical scraping frameworks
Performance optimization and scaling strategies

What’s Next?

In Part 4: Data Processing and Storage, we’ll cover:

  • Advanced data cleaning and validation
  • Multiple storage backends (MongoDB, PostgreSQL, Elasticsearch)
  • Real-time data pipelines
  • Data quality monitoring
  • ETL processes and data warehousing

Practice Exercise

Build an enterprise-grade distributed scraper that:

  1. Uses anti-detection techniques to scrape a major e-commerce site
  2. Scales across multiple workers with Redis coordination
  3. Implements comprehensive monitoring and alerting
  4. Ensures legal compliance and ethical scraping
  5. Handles millions of products with proper rate limiting

Happy scraping at scale! 🚀