Master Web Scraping with Scrapy: Anti-Detection and Scaling
Welcome to Part 3 of our comprehensive Scrapy series! In this advanced tutorial, we’ll explore sophisticated anti-detection techniques, distributed scraping architectures, and scaling strategies for enterprise-level web scraping operations.
What You’ll Learn in This Part
- Advanced anti-detection and stealth techniques
- Distributed scraping with Scrapy-Redis
- IP rotation and proxy management
- Browser fingerprinting avoidance
- Rate limiting and adaptive throttling
- Monitoring, alerting, and health checks
- Legal compliance and ethical scraping
- Performance optimization and scaling
Anti-Detection Techniques
Modern websites employ sophisticated detection mechanisms. Here’s how to stay under the radar:
Stealth Configuration
# webscraper/settings.py - Advanced Stealth Settings
# Respect robots.txt but with custom delayROBOTSTXT_OBEY = TrueROBOTSTXT_USER_AGENT = '*'
# Realistic browser behaviorDOWNLOAD_DELAY = 3 # Base delay between requestsRANDOMIZE_DOWNLOAD_DELAY = True # 0.5 * to 1.5 * DOWNLOAD_DELAYDOWNLOAD_TIMEOUT = 30DOWNLOAD_MAXSIZE = 1073741824 # 1GBDOWNLOAD_WARNSIZE = 33554432 # 32MB
# Advanced AutoThrottle settingsAUTOTHROTTLE_ENABLED = TrueAUTOTHROTTLE_START_DELAY = 1AUTOTHROTTLE_MAX_DELAY = 60AUTOTHROTTLE_TARGET_CONCURRENCY = 1.0 # Conservative for stealthAUTOTHROTTLE_DEBUG = False
# Realistic request patternsCONCURRENT_REQUESTS = 8CONCURRENT_REQUESTS_PER_DOMAIN = 2
# Advanced cookie handlingCOOKIES_ENABLED = TrueCOOKIES_DEBUG = False
# Memory and resource managementMEMUSAGE_ENABLED = TrueMEMUSAGE_LIMIT_MB = 2048MEMUSAGE_WARNING_MB = 1024
# DNS timeoutDNSCACHE_ENABLED = TrueDNSCACHE_SIZE = 10000DNS_TIMEOUT = 60
# Enable telnet console for debugging (disable in production)TELNETCONSOLE_ENABLED = False
# Custom middleware stack for stealthDOWNLOADER_MIDDLEWARES = { 'webscraper.middlewares.StealthUserAgentMiddleware': 400, 'webscraper.middlewares.ProxyRotationMiddleware': 410, 'webscraper.middlewares.HeaderSpoofingMiddleware': 420, 'webscraper.middlewares.CookiePersistenceMiddleware': 430, 'webscraper.middlewares.RequestTimingMiddleware': 440, 'webscraper.middlewares.FingerprintResistanceMiddleware': 450, 'scrapy.downloadermiddlewares.retry.RetryMiddleware': 500, 'scrapy.downloadermiddlewares.useragent.UserAgentMiddleware': None, # Disabled}
# Anti-detection headersDEFAULT_REQUEST_HEADERS = { 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7', 'Accept-Language': 'en-US,en;q=0.9', 'Accept-Encoding': 'gzip, deflate, br', 'DNT': '1', 'Connection': 'keep-alive', 'Upgrade-Insecure-Requests': '1', 'Sec-Fetch-Dest': 'document', 'Sec-Fetch-Mode': 'navigate', 'Sec-Fetch-Site': 'none', 'Sec-Fetch-User': '?1', 'Cache-Control': 'max-age=0',}
Advanced Stealth Middleware
import randomimport timeimport jsonimport hashlibfrom datetime import datetime, timedeltafrom scrapy.downloadermiddlewares.useragent import UserAgentMiddlewarefrom scrapy import signalsfrom scrapy.exceptions import IgnoreRequest
class StealthUserAgentMiddleware(UserAgentMiddleware): """Advanced user agent rotation with browser consistency"""
def __init__(self): # Browser families with consistent headers self.browser_profiles = { 'chrome': { 'user_agents': [ 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', ], 'sec_ch_ua': '"Not_A Brand";v="8", "Chromium";v="120", "Google Chrome";v="120"', 'sec_ch_ua_mobile': '?0', 'sec_ch_ua_platform': '"Windows"', }, 'firefox': { 'user_agents': [ 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:121.0) Gecko/20100101 Firefox/121.0', 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:121.0) Gecko/20100101 Firefox/121.0', 'Mozilla/5.0 (X11; Linux x86_64; rv:121.0) Gecko/20100101 Firefox/121.0', ], }, 'safari': { 'user_agents': [ 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.1 Safari/605.1.15', 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.6 Safari/605.1.15', ], } } self.current_profile = None self.session_ua = None
def process_request(self, request, spider): # Maintain consistent user agent per session if not self.session_ua: browser = random.choice(list(self.browser_profiles.keys())) self.current_profile = self.browser_profiles[browser] self.session_ua = random.choice(self.current_profile['user_agents'])
request.headers['User-Agent'] = self.session_ua
# Add browser-specific headers if 'chrome' in self.session_ua.lower(): profile = self.browser_profiles['chrome'] request.headers['sec-ch-ua'] = profile['sec_ch_ua'] request.headers['sec-ch-ua-mobile'] = profile['sec_ch_ua_mobile'] request.headers['sec-ch-ua-platform'] = profile['sec_ch_ua_platform']
class HeaderSpoofingMiddleware: """Spoof headers to mimic real browser behavior"""
def __init__(self): self.session_id = hashlib.md5(str(time.time()).encode()).hexdigest()[:16] self.request_count = 0
def process_request(self, request, spider): self.request_count += 1
# Add realistic timing headers request.headers['X-Request-ID'] = f"{self.session_id}-{self.request_count}"
# Randomize header order headers_to_randomize = [ 'Accept-Encoding', 'Accept-Language', 'Cache-Control', 'Connection', 'DNT', 'Upgrade-Insecure-Requests' ]
# Occasionally omit some headers to appear more human if random.random() < 0.1: # 10% chance header_to_remove = random.choice(headers_to_randomize) if header_to_remove in request.headers: del request.headers[header_to_remove]
# Add referer for non-start URLs if not request.meta.get('is_start_url', False): if 'Referer' not in request.headers: # Use the domain as referer domain = request.url.split('/')[2] request.headers['Referer'] = f"https://{domain}/"
class RequestTimingMiddleware: """Implement realistic request timing patterns"""
def __init__(self): self.last_request_time = {} self.human_patterns = [ # Reading pattern: quick succession then pause [0.5, 0.3, 0.8, 5.0], # Browsing pattern: varied intervals [1.2, 2.1, 0.7, 3.5, 1.8], # Search pattern: quick then slow [0.4, 0.6, 8.0, 2.0], ] self.current_pattern = [] self.pattern_index = 0
def process_request(self, request, spider): domain = request.url.split('/')[2] current_time = time.time()
# Get or initialize timing for this domain if domain not in self.last_request_time: self.last_request_time[domain] = current_time return
# Calculate time since last request to this domain time_since_last = current_time - self.last_request_time[domain]
# Choose a human-like delay pattern if not self.current_pattern: self.current_pattern = random.choice(self.human_patterns).copy() self.pattern_index = 0
target_delay = self.current_pattern[self.pattern_index] self.pattern_index = (self.pattern_index + 1) % len(self.current_pattern)
# Apply delay if needed if time_since_last < target_delay: sleep_time = target_delay - time_since_last spider.logger.debug(f'Applying human-like delay: {sleep_time:.2f}s for {domain}') time.sleep(sleep_time)
self.last_request_time[domain] = time.time()
class FingerprintResistanceMiddleware: """Resist browser fingerprinting attempts"""
def process_request(self, request, spider): # Randomize Accept header slightly accept_variations = [ 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8', 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8', 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', ]
if random.random() < 0.3: # 30% chance to vary request.headers['Accept'] = random.choice(accept_variations)
# Randomize Accept-Language lang_variations = [ 'en-US,en;q=0.9', 'en-US,en;q=0.8', 'en-US,en;q=0.9,es;q=0.8', 'en-US,en;q=0.5', ]
if random.random() < 0.2: # 20% chance to vary request.headers['Accept-Language'] = random.choice(lang_variations)
class CookiePersistenceMiddleware: """Advanced cookie management for session consistency"""
def __init__(self): self.session_cookies = {}
def process_response(self, request, response, spider): # Store cookies for this domain domain = request.url.split('/')[2] cookies = response.headers.getlist('Set-Cookie')
if cookies: if domain not in self.session_cookies: self.session_cookies[domain] = {}
for cookie in cookies: cookie_str = cookie.decode('utf-8') if '=' in cookie_str: name, value = cookie_str.split('=', 1) # Store only the name=value part value = value.split(';')[0] self.session_cookies[domain][name] = value
return response
def process_request(self, request, spider): domain = request.url.split('/')[2]
# Add stored cookies for this domain if domain in self.session_cookies: cookie_header = '; '.join([ f"{name}={value}" for name, value in self.session_cookies[domain].items() ]) if cookie_header: request.headers['Cookie'] = cookie_header
Proxy Rotation System
import randomimport timeimport requestsfrom typing import List, Dict, Optionalimport threadingfrom dataclasses import dataclassfrom enum import Enum
class ProxyStatus(Enum): ACTIVE = "active" FAILED = "failed" RATE_LIMITED = "rate_limited" TESTING = "testing"
@dataclassclass ProxyInfo: host: str port: int username: Optional[str] = None password: Optional[str] = None protocol: str = "http" status: ProxyStatus = ProxyStatus.TESTING success_count: int = 0 failure_count: int = 0 last_used: Optional[float] = None response_time: Optional[float] = None rate_limit_until: Optional[float] = None
@property def url(self) -> str: if self.username and self.password: return f"{self.protocol}://{self.username}:{self.password}@{self.host}:{self.port}" return f"{self.protocol}://{self.host}:{self.port}"
@property def success_rate(self) -> float: total = self.success_count + self.failure_count return self.success_count / total if total > 0 else 0.0
def is_available(self) -> bool: if self.status == ProxyStatus.FAILED: return False if self.rate_limit_until and time.time() < self.rate_limit_until: return False return True
class ProxyManager: """Advanced proxy rotation and health management"""
def __init__(self, proxy_list: List[Dict], health_check_interval: int = 300): self.proxies = [ProxyInfo(**proxy) for proxy in proxy_list] self.health_check_interval = health_check_interval self.last_health_check = 0 self.lock = threading.Lock()
# Start health checking thread self.health_thread = threading.Thread(target=self._health_check_loop, daemon=True) self.health_thread.start()
def get_proxy(self) -> Optional[ProxyInfo]: """Get the best available proxy""" with self.lock: available_proxies = [p for p in self.proxies if p.is_available()]
if not available_proxies: return None
# Sort by success rate and response time available_proxies.sort( key=lambda p: (p.success_rate, -p.response_time or 0), reverse=True )
# Weighted selection favoring better proxies weights = [max(0.1, p.success_rate) for p in available_proxies] proxy = random.choices(available_proxies, weights=weights)[0]
proxy.last_used = time.time() return proxy
def report_success(self, proxy: ProxyInfo, response_time: float = None): """Report successful proxy usage""" with self.lock: proxy.success_count += 1 proxy.status = ProxyStatus.ACTIVE if response_time: proxy.response_time = response_time
def report_failure(self, proxy: ProxyInfo, is_rate_limit: bool = False): """Report proxy failure""" with self.lock: proxy.failure_count += 1
if is_rate_limit: proxy.status = ProxyStatus.RATE_LIMITED proxy.rate_limit_until = time.time() + 300 # 5 minute cooldown elif proxy.failure_count > 5: proxy.status = ProxyStatus.FAILED
def _health_check_loop(self): """Background thread for proxy health checking""" while True: time.sleep(self.health_check_interval) self._perform_health_checks()
def _perform_health_checks(self): """Check health of all proxies""" test_url = "http://httpbin.org/ip"
for proxy in self.proxies: if proxy.status == ProxyStatus.FAILED: continue
try: start_time = time.time() response = requests.get( test_url, proxies={"http": proxy.url, "https": proxy.url}, timeout=10 ) response_time = time.time() - start_time
if response.status_code == 200: self.report_success(proxy, response_time) else: self.report_failure(proxy)
except Exception: self.report_failure(proxy)
def get_stats(self) -> Dict: """Get proxy pool statistics""" with self.lock: total = len(self.proxies) active = sum(1 for p in self.proxies if p.status == ProxyStatus.ACTIVE) failed = sum(1 for p in self.proxies if p.status == ProxyStatus.FAILED) rate_limited = sum(1 for p in self.proxies if p.status == ProxyStatus.RATE_LIMITED)
return { 'total': total, 'active': active, 'failed': failed, 'rate_limited': rate_limited, 'success_rate': sum(p.success_rate for p in self.proxies) / total if total > 0 else 0 }
# Proxy rotation middlewareclass AdvancedProxyRotationMiddleware: """Advanced proxy rotation with health management"""
def __init__(self, proxy_list): self.proxy_manager = ProxyManager(proxy_list)
@classmethod def from_crawler(cls, crawler): proxy_list = crawler.settings.get('PROXY_LIST', []) return cls(proxy_list)
def process_request(self, request, spider): proxy = self.proxy_manager.get_proxy()
if proxy: request.meta['proxy'] = proxy.url request.meta['proxy_info'] = proxy spider.logger.debug(f'Using proxy: {proxy.host}:{proxy.port}') else: spider.logger.warning('No available proxies')
def process_response(self, request, response, spider): proxy_info = request.meta.get('proxy_info')
if proxy_info: if response.status == 200: self.proxy_manager.report_success(proxy_info) elif response.status == 429: # Rate limited self.proxy_manager.report_failure(proxy_info, is_rate_limit=True) elif response.status >= 400: self.proxy_manager.report_failure(proxy_info)
return response
def process_exception(self, request, exception, spider): proxy_info = request.meta.get('proxy_info')
if proxy_info: self.proxy_manager.report_failure(proxy_info)
Distributed Scraping with Scrapy-Redis
Scale your scraping operations across multiple machines:
Scrapy-Redis Setup
# Install Scrapy-Redispip install scrapy-redis
# Start Redis serverredis-server
# Or using Dockerdocker run -d -p 6379:6379 redis:alpine
Distributed Spider Configuration
# webscraper/settings.py - Redis Configuration
# Enable Scrapy-RedisSCHEDULER = "scrapy_redis.scheduler.Scheduler"DUPEFILTER_CLASS = "scrapy_redis.dupefilter.RFPDupeFilter"ITEM_PIPELINES = { 'scrapy_redis.pipelines.RedisPipeline': 300, 'webscraper.pipelines.ValidationPipeline': 400,}
# Redis connectionREDIS_URL = 'redis://localhost:6379'
# Or with detailed configurationREDIS_PARAMS = { 'host': 'localhost', 'port': 6379, 'db': 0, 'password': None,}
# Scheduler configurationSCHEDULER_PERSIST = True # Keep scheduler data after spider closesSCHEDULER_QUEUE_CLASS = 'scrapy_redis.queue.PriorityQueue'
# Request serializationSCHEDULER_SERIALIZER = "scrapy_redis.picklecompat"
Distributed Spider
from scrapy_redis.spiders import RedisSpiderfrom webscraper.items import ProductItemfrom scrapy.loader import ItemLoaderimport json
class DistributedEcommerceSpider(RedisSpider): """Distributed spider using Redis for coordination"""
name = 'distributed_ecommerce' redis_key = 'distributed_ecommerce:start_urls'
custom_settings = { 'CONCURRENT_REQUESTS': 32, 'CONCURRENT_REQUESTS_PER_DOMAIN': 16, 'DOWNLOAD_DELAY': 1, 'SCHEDULER_IDLE_BEFORE_CLOSE': 10, # Wait 10 seconds before closing }
def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.processed_count = 0 self.error_count = 0
def parse(self, response): """Parse category pages and product listings""" self.logger.info(f'Worker {self.name} processing: {response.url}')
# Extract product links product_links = response.css('.product-item a::attr(href)').getall()
for link in product_links: product_url = response.urljoin(link) yield response.follow( product_url, callback=self.parse_product, meta={'category_url': response.url} )
# Extract pagination next_page = response.css('.pagination .next::attr(href)').get() if next_page: yield response.follow(next_page, callback=self.parse)
# Add more category pages to Redis queue self.discover_more_urls(response)
def parse_product(self, response): """Parse individual product pages""" try: loader = ItemLoader(item=ProductItem(), response=response)
loader.add_css('name', 'h1.product-title::text') loader.add_css('price', '.price-current::text') loader.add_css('description', '.product-description::text') loader.add_css('brand', '.brand-name::text') loader.add_css('rating', '.rating-value::text')
# Add metadata loader.add_value('url', response.url) loader.add_value('category_url', response.meta.get('category_url')) loader.add_value('worker_id', self.name) loader.add_value('scraped_at', self.get_current_time())
item = loader.load_item() self.processed_count += 1
if self.processed_count % 100 == 0: self.logger.info(f'Worker {self.name} processed {self.processed_count} products')
yield item
except Exception as e: self.error_count += 1 self.logger.error(f'Error parsing product {response.url}: {e}')
def discover_more_urls(self, response): """Discover and add more URLs to the Redis queue""" # Find category links category_links = response.css('.category-nav a::attr(href)').getall()
for link in category_links: category_url = response.urljoin(link) # Add to Redis queue for other workers self.server.lpush( f'{self.redis_key}:discovered', json.dumps({'url': category_url, 'priority': 1}) )
def get_current_time(self): from datetime import datetime return datetime.now().isoformat()
def closed(self, reason): """Spider closing callback""" self.logger.info(f'Worker {self.name} closing: {reason}') self.logger.info(f'Processed: {self.processed_count}, Errors: {self.error_count}')
# Master coordinator spiderclass CoordinatorSpider(RedisSpider): """Coordinator spider that feeds URLs to workers"""
name = 'coordinator' redis_key = 'distributed_ecommerce:start_urls'
def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.seed_urls()
def seed_urls(self): """Seed initial URLs into Redis""" initial_urls = [ 'https://example-store.com/categories/electronics', 'https://example-store.com/categories/clothing', 'https://example-store.com/categories/home', 'https://example-store.com/categories/books', ]
for url in initial_urls: self.server.lpush(self.redis_key, url)
self.logger.info(f'Seeded {len(initial_urls)} URLs')
def parse(self, response): """This spider doesn't parse, it just coordinates""" pass
Worker Management Script
import subprocessimport sysimport timeimport signalimport jsonimport redisfrom typing import List, Dict
class WorkerManager: """Manage distributed scraping workers"""
def __init__(self, redis_url: str = 'redis://localhost:6379'): self.redis_client = redis.from_url(redis_url) self.workers: List[subprocess.Popen] = [] self.running = True
# Register signal handlers signal.signal(signal.SIGINT, self.shutdown_handler) signal.signal(signal.SIGTERM, self.shutdown_handler)
def start_workers(self, spider_name: str, num_workers: int = 4): """Start multiple worker processes""" print(f"Starting {num_workers} workers for spider '{spider_name}'")
for i in range(num_workers): worker_id = f"{spider_name}_worker_{i}" cmd = [ 'scrapy', 'crawl', spider_name, '-s', f'BOT_NAME={worker_id}', '-L', 'INFO' ]
worker = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True )
self.workers.append(worker) print(f"Started worker {worker_id} (PID: {worker.pid})")
def monitor_workers(self): """Monitor worker health and restart if needed""" while self.running: for i, worker in enumerate(self.workers): if worker.poll() is not None: # Worker has terminated print(f"Worker {i} terminated with code {worker.returncode}")
# Restart worker if it crashed if worker.returncode != 0: print(f"Restarting worker {i}") # Implementation for restarting worker pass
# Monitor Redis queue sizes self.print_queue_stats()
time.sleep(30) # Check every 30 seconds
def print_queue_stats(self): """Print Redis queue statistics""" try: queue_size = self.redis_client.llen('distributed_ecommerce:start_urls') processing = self.redis_client.scard('distributed_ecommerce:dupefilter')
print(f"Queue size: {queue_size}, Processed: {processing}")
except Exception as e: print(f"Error getting Redis stats: {e}")
def shutdown_handler(self, signum, frame): """Handle shutdown signals""" print("\nShutting down workers...") self.running = False
for i, worker in enumerate(self.workers): print(f"Terminating worker {i}") worker.terminate()
# Wait for graceful shutdown try: worker.wait(timeout=10) except subprocess.TimeoutExpired: print(f"Force killing worker {i}") worker.kill()
print("All workers shut down") sys.exit(0)
def run(self, spider_name: str, num_workers: int = 4): """Main execution method""" self.start_workers(spider_name, num_workers) self.monitor_workers()
if __name__ == "__main__": if len(sys.argv) < 2: print("Usage: python worker_manager.py <spider_name> [num_workers]") sys.exit(1)
spider_name = sys.argv[1] num_workers = int(sys.argv[2]) if len(sys.argv) > 2 else 4
manager = WorkerManager() manager.run(spider_name, num_workers)
Monitoring and Alerting
Comprehensive Monitoring System
import timeimport jsonimport smtplibfrom datetime import datetime, timedeltafrom email.mime.text import MimeTextfrom email.mime.multipart import MimeMultipartfrom typing import Dict, List, Optionalimport psutilimport redisfrom dataclasses import dataclass, asdict
@dataclassclass ScrapingMetrics: timestamp: str spider_name: str pages_scraped: int items_extracted: int errors: int response_time_avg: float memory_usage_mb: float cpu_usage_percent: float active_requests: int queue_size: int success_rate: float
class PerformanceMonitor: """Monitor scraping performance and system resources"""
def __init__(self, redis_url: str = 'redis://localhost:6379'): self.redis_client = redis.from_url(redis_url) self.metrics_history: List[ScrapingMetrics] = [] self.alert_thresholds = { 'error_rate': 0.1, # 10% error rate 'memory_usage': 2048, # 2GB memory usage 'cpu_usage': 80, # 80% CPU usage 'response_time': 10, # 10 second average response time 'queue_stall': 300, # 5 minutes without queue progress } self.last_queue_size = 0 self.queue_stall_start = None
def collect_metrics(self, spider_name: str) -> ScrapingMetrics: """Collect current performance metrics""" # Get system metrics memory_usage = psutil.virtual_memory().used / 1024 / 1024 # MB cpu_usage = psutil.cpu_percent(interval=1)
# Get Redis metrics try: queue_size = self.redis_client.llen(f'{spider_name}:start_urls') processed_count = self.redis_client.scard(f'{spider_name}:dupefilter')
# Detect queue stall if queue_size == self.last_queue_size: if not self.queue_stall_start: self.queue_stall_start = time.time() else: self.queue_stall_start = None
self.last_queue_size = queue_size
except Exception: queue_size = 0 processed_count = 0
# Create metrics object (simplified - in real implementation, # you'd collect these from your spider's stats) metrics = ScrapingMetrics( timestamp=datetime.now().isoformat(), spider_name=spider_name, pages_scraped=processed_count, items_extracted=processed_count * 0.8, # Estimate errors=processed_count * 0.05, # Estimate response_time_avg=2.5, # Would be collected from actual stats memory_usage_mb=memory_usage, cpu_usage_percent=cpu_usage, active_requests=queue_size, queue_size=queue_size, success_rate=0.95 # Would be calculated from actual stats )
self.metrics_history.append(metrics)
# Keep only last 24 hours of metrics cutoff_time = datetime.now() - timedelta(hours=24) self.metrics_history = [ m for m in self.metrics_history if datetime.fromisoformat(m.timestamp) > cutoff_time ]
return metrics
def check_alerts(self, metrics: ScrapingMetrics) -> List[Dict]: """Check metrics against alert thresholds""" alerts = []
# Error rate alert error_rate = metrics.errors / max(1, metrics.pages_scraped) if error_rate > self.alert_thresholds['error_rate']: alerts.append({ 'type': 'error_rate', 'severity': 'high', 'message': f'High error rate: {error_rate:.2%}', 'value': error_rate, 'threshold': self.alert_thresholds['error_rate'] })
# Memory usage alert if metrics.memory_usage_mb > self.alert_thresholds['memory_usage']: alerts.append({ 'type': 'memory_usage', 'severity': 'medium', 'message': f'High memory usage: {metrics.memory_usage_mb:.1f}MB', 'value': metrics.memory_usage_mb, 'threshold': self.alert_thresholds['memory_usage'] })
# CPU usage alert if metrics.cpu_usage_percent > self.alert_thresholds['cpu_usage']: alerts.append({ 'type': 'cpu_usage', 'severity': 'medium', 'message': f'High CPU usage: {metrics.cpu_usage_percent:.1f}%', 'value': metrics.cpu_usage_percent, 'threshold': self.alert_thresholds['cpu_usage'] })
# Response time alert if metrics.response_time_avg > self.alert_thresholds['response_time']: alerts.append({ 'type': 'response_time', 'severity': 'medium', 'message': f'Slow response time: {metrics.response_time_avg:.1f}s', 'value': metrics.response_time_avg, 'threshold': self.alert_thresholds['response_time'] })
# Queue stall alert if self.queue_stall_start: stall_duration = time.time() - self.queue_stall_start if stall_duration > self.alert_thresholds['queue_stall']: alerts.append({ 'type': 'queue_stall', 'severity': 'high', 'message': f'Queue stalled for {stall_duration/60:.1f} minutes', 'value': stall_duration, 'threshold': self.alert_thresholds['queue_stall'] })
return alerts
def generate_report(self, hours: int = 24) -> Dict: """Generate performance report""" cutoff_time = datetime.now() - timedelta(hours=hours) recent_metrics = [ m for m in self.metrics_history if datetime.fromisoformat(m.timestamp) > cutoff_time ]
if not recent_metrics: return {'error': 'No metrics available'}
# Calculate aggregated statistics total_pages = sum(m.pages_scraped for m in recent_metrics) total_items = sum(m.items_extracted for m in recent_metrics) total_errors = sum(m.errors for m in recent_metrics) avg_response_time = sum(m.response_time_avg for m in recent_metrics) / len(recent_metrics) avg_memory = sum(m.memory_usage_mb for m in recent_metrics) / len(recent_metrics) avg_cpu = sum(m.cpu_usage_percent for m in recent_metrics) / len(recent_metrics)
return { 'period': f'Last {hours} hours', 'total_pages_scraped': total_pages, 'total_items_extracted': total_items, 'total_errors': total_errors, 'error_rate': total_errors / max(1, total_pages), 'average_response_time': avg_response_time, 'average_memory_usage_mb': avg_memory, 'average_cpu_usage_percent': avg_cpu, 'current_queue_size': recent_metrics[-1].queue_size if recent_metrics else 0, 'metrics_count': len(recent_metrics) }
class AlertManager: """Manage and send alerts"""
def __init__(self, config: Dict): self.config = config self.sent_alerts = {} # Track sent alerts to avoid spam self.cooldown_period = 300 # 5 minutes cooldown between same alerts
def send_alert(self, alert: Dict, metrics: ScrapingMetrics): """Send alert via configured channels""" alert_key = f"{alert['type']}_{alert.get('severity', 'medium')}" current_time = time.time()
# Check cooldown if alert_key in self.sent_alerts: if current_time - self.sent_alerts[alert_key] < self.cooldown_period: return # Skip sending due to cooldown
# Send via email if self.config.get('email'): self._send_email_alert(alert, metrics)
# Send via webhook if self.config.get('webhook'): self._send_webhook_alert(alert, metrics)
# Log to file self._log_alert(alert, metrics)
self.sent_alerts[alert_key] = current_time
def _send_email_alert(self, alert: Dict, metrics: ScrapingMetrics): """Send alert via email""" try: smtp_config = self.config['email']
msg = MimeMultipart() msg['From'] = smtp_config['from'] msg['To'] = smtp_config['to'] msg['Subject'] = f"Scraping Alert: {alert['type']} - {alert['severity'].upper()}"
body = f""" Alert: {alert['message']}
Spider: {metrics.spider_name} Time: {metrics.timestamp} Current Value: {alert['value']} Threshold: {alert['threshold']}
Current Metrics: - Pages Scraped: {metrics.pages_scraped} - Items Extracted: {metrics.items_extracted} - Error Rate: {metrics.errors / max(1, metrics.pages_scraped):.2%} - Memory Usage: {metrics.memory_usage_mb:.1f}MB - CPU Usage: {metrics.cpu_usage_percent:.1f}% - Queue Size: {metrics.queue_size} """
msg.attach(MimeText(body, 'plain'))
with smtplib.SMTP(smtp_config['smtp_server'], smtp_config['smtp_port']) as server: if smtp_config.get('use_tls'): server.starttls() if smtp_config.get('username'): server.login(smtp_config['username'], smtp_config['password']) server.send_message(msg)
except Exception as e: print(f"Failed to send email alert: {e}")
def _send_webhook_alert(self, alert: Dict, metrics: ScrapingMetrics): """Send alert via webhook""" import requests
try: webhook_config = self.config['webhook']
payload = { 'alert': alert, 'metrics': asdict(metrics), 'timestamp': metrics.timestamp }
response = requests.post( webhook_config['url'], json=payload, headers=webhook_config.get('headers', {}), timeout=10 ) response.raise_for_status()
except Exception as e: print(f"Failed to send webhook alert: {e}")
def _log_alert(self, alert: Dict, metrics: ScrapingMetrics): """Log alert to file""" import logging
logging.basicConfig( filename='scraping_alerts.log', level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' )
logging.info( f"ALERT: {alert['type']} - {alert['message']} " f"(Spider: {metrics.spider_name}, Value: {alert['value']})" )
Monitoring Dashboard Script
import timeimport jsonfrom webscraper.monitoring import PerformanceMonitor, AlertManager
def main(): """Main monitoring loop"""
# Configuration config = { 'email': { 'smtp_server': 'smtp.gmail.com', 'smtp_port': 587, 'use_tls': True, 'password': 'your-app-password', }, 'webhook': { 'url': 'https://hooks.slack.com/services/YOUR/SLACK/WEBHOOK', 'headers': {'Content-Type': 'application/json'} } }
monitor = PerformanceMonitor() alert_manager = AlertManager(config)
spider_name = 'distributed_ecommerce'
print("Starting monitoring dashboard...") print("Press Ctrl+C to stop")
try: while True: # Collect metrics metrics = monitor.collect_metrics(spider_name)
# Check for alerts alerts = monitor.check_alerts(metrics)
# Send alerts for alert in alerts: alert_manager.send_alert(alert, metrics) print(f"ALERT: {alert['message']}")
# Print current status print(f"\n[{metrics.timestamp}] Spider: {spider_name}") print(f"Pages: {metrics.pages_scraped}, Items: {metrics.items_extracted}") print(f"Errors: {metrics.errors}, Queue: {metrics.queue_size}") print(f"Memory: {metrics.memory_usage_mb:.1f}MB, CPU: {metrics.cpu_usage_percent:.1f}%")
if alerts: print(f"Active alerts: {len(alerts)}")
# Generate hourly report if int(time.time()) % 3600 == 0: # Every hour report = monitor.generate_report(hours=1) print("\n--- Hourly Report ---") print(json.dumps(report, indent=2))
time.sleep(60) # Check every minute
except KeyboardInterrupt: print("\nMonitoring stopped")
if __name__ == "__main__": main()
Legal Compliance and Ethics
Compliance Framework
import timeimport requestsfrom urllib.robotparser import RobotFileParserfrom urllib.parse import urljoin, urlparsefrom typing import Dict, List, Optionalimport logging
class ComplianceManager: """Ensure legal and ethical scraping compliance"""
def __init__(self): self.robots_cache = {} self.rate_limits = {} self.compliance_rules = { 'respect_robots_txt': True, 'rate_limit_default': 1.0, # 1 second between requests 'rate_limit_per_domain': {}, 'user_agent_required': True, 'personal_data_handling': 'exclude', 'copyright_respect': True, }
self.logger = logging.getLogger(__name__)
def check_robots_txt(self, url: str, user_agent: str = '*') -> bool: """Check if URL is allowed by robots.txt""" domain = urlparse(url).netloc
if domain not in self.robots_cache: robots_url = urljoin(f"https://{domain}", "/robots.txt")
try: rp = RobotFileParser() rp.set_url(robots_url) rp.read() self.robots_cache[domain] = rp
# Extract crawl delay if specified delay = rp.crawl_delay(user_agent) if delay: self.rate_limits[domain] = float(delay)
except Exception as e: self.logger.warning(f"Could not fetch robots.txt for {domain}: {e}") return True # Allow if robots.txt can't be fetched
robots_parser = self.robots_cache.get(domain) if robots_parser: return robots_parser.can_fetch(user_agent, url)
return True
def get_rate_limit(self, domain: str) -> float: """Get appropriate rate limit for domain""" # Check domain-specific rate limit if domain in self.compliance_rules['rate_limit_per_domain']: return self.compliance_rules['rate_limit_per_domain'][domain]
# Check robots.txt specified delay if domain in self.rate_limits: return self.rate_limits[domain]
# Use default return self.compliance_rules['rate_limit_default']
def validate_request(self, request) -> bool: """Validate request against compliance rules""" url = request.url domain = urlparse(url).netloc user_agent = request.headers.get('User-Agent', '')
# Check robots.txt if self.compliance_rules['respect_robots_txt']: if not self.check_robots_txt(url, user_agent): self.logger.warning(f"Blocked by robots.txt: {url}") return False
# Check user agent requirement if self.compliance_rules['user_agent_required']: if not user_agent or 'bot' not in user_agent.lower(): self.logger.warning(f"Invalid user agent for compliance: {user_agent}") return False
return True
def filter_personal_data(self, item: Dict) -> Dict: """Filter out personal data from scraped items""" if self.compliance_rules['personal_data_handling'] == 'exclude': sensitive_fields = [ 'email', 'phone', 'address', 'ssn', 'credit_card', 'personal_id', 'passport', 'driver_license' ]
filtered_item = {} for key, value in item.items(): if key.lower() not in sensitive_fields: # Additional check for values that look like personal data if not self.looks_like_personal_data(str(value)): filtered_item[key] = value else: self.logger.info(f"Filtered potential personal data: {key}")
return filtered_item
return item
def looks_like_personal_data(self, text: str) -> bool: """Heuristic check for personal data patterns""" import re
patterns = [ r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', # Email r'\b\d{3}-\d{2}-\d{4}\b', # SSN r'\b\d{3}-\d{3}-\d{4}\b', # Phone r'\b\d{4}[- ]?\d{4}[- ]?\d{4}[- ]?\d{4}\b', # Credit card ]
for pattern in patterns: if re.search(pattern, text): return True
return False
def generate_compliance_report(self) -> Dict: """Generate compliance status report""" return { 'robots_txt_domains_checked': len(self.robots_cache), 'custom_rate_limits': len(self.rate_limits), 'compliance_rules': self.compliance_rules, 'timestamp': time.time() }
# Compliance middlewareclass ComplianceMiddleware: """Scrapy middleware for compliance enforcement"""
def __init__(self): self.compliance_manager = ComplianceManager() self.last_request_time = {}
def process_request(self, request, spider): # Validate compliance if not self.compliance_manager.validate_request(request): raise IgnoreRequest(f"Request blocked by compliance rules: {request.url}")
# Enforce rate limiting domain = urlparse(request.url).netloc rate_limit = self.compliance_manager.get_rate_limit(domain)
if domain in self.last_request_time: time_since_last = time.time() - self.last_request_time[domain] if time_since_last < rate_limit: sleep_time = rate_limit - time_since_last spider.logger.debug(f"Rate limiting: sleeping {sleep_time:.2f}s for {domain}") time.sleep(sleep_time)
self.last_request_time[domain] = time.time()
def process_item(self, item, spider): # Filter personal data filtered_item = self.compliance_manager.filter_personal_data(dict(item)) return filtered_item
Summary and Next Steps
In this advanced part, you’ve mastered:
✅ Sophisticated anti-detection techniques and stealth configurations
✅ Distributed scraping with Scrapy-Redis for massive scale
✅ Advanced proxy management with health monitoring
✅ Comprehensive monitoring and alerting systems
✅ Legal compliance and ethical scraping frameworks
✅ Performance optimization and scaling strategies
What’s Next?
In Part 4: Data Processing and Storage, we’ll cover:
- Advanced data cleaning and validation
- Multiple storage backends (MongoDB, PostgreSQL, Elasticsearch)
- Real-time data pipelines
- Data quality monitoring
- ETL processes and data warehousing
Practice Exercise
Build an enterprise-grade distributed scraper that:
- Uses anti-detection techniques to scrape a major e-commerce site
- Scales across multiple workers with Redis coordination
- Implements comprehensive monitoring and alerting
- Ensures legal compliance and ethical scraping
- Handles millions of products with proper rate limiting
Happy scraping at scale! 🚀