Master Web Scraping with Scrapy: Data Processing and Storage
Welcome to Part 4 of our comprehensive Scrapy series! In this tutorial, we’ll explore advanced data processing, validation, and storage techniques that transform raw scraped data into clean, structured, and actionable information.
What You’ll Learn in This Part
- Advanced data cleaning and validation pipelines
- Multiple storage backends (MongoDB, PostgreSQL, Elasticsearch)
- Real-time data processing and streaming
- Data quality monitoring and alerting
- ETL processes and data warehousing
- Data deduplication and conflict resolution
- Performance optimization for data pipelines
- Data backup and recovery strategies
Advanced Data Cleaning and Validation
Comprehensive Data Validation Pipeline
import reimport hashlibimport jsonfrom datetime import datetime, timedeltafrom typing import Dict, List, Optional, Anyfrom itemadapter import ItemAdapterfrom scrapy.exceptions import DropItemimport logging
class AdvancedValidationPipeline: """Comprehensive data validation and cleaning pipeline"""
def __init__(self): self.logger = logging.getLogger(__name__) self.validation_stats = { 'total_items': 0, 'valid_items': 0, 'dropped_items': 0, 'field_fixes': 0, 'validation_errors': {} }
# Validation rules configuration self.validation_rules = { 'required_fields': ['name', 'url'], 'field_types': { 'price': (int, float), 'rating': (int, float), 'review_count': int, 'in_stock': bool }, 'field_ranges': { 'price': (0, 1000000), 'rating': (0, 5), 'review_count': (0, float('inf')) }, 'string_patterns': { 'email': r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$', 'url': r'^https?:\/\/[^\s/$.?#].[^\s]*$', 'phone': r'^[\+]?[1-9][\d]{0,15}$' }, 'max_lengths': { 'name': 200, 'description': 5000, 'brand': 100 } }
def process_item(self, item, spider): adapter = ItemAdapter(item) self.validation_stats['total_items'] += 1
try: # Clean and validate all fields self._clean_item_fields(adapter) self._validate_required_fields(adapter) self._validate_field_types(adapter) self._validate_field_ranges(adapter) self._validate_string_patterns(adapter) self._validate_field_lengths(adapter)
# Additional business logic validation self._validate_business_rules(adapter)
# Generate quality score adapter['data_quality_score'] = self._calculate_quality_score(adapter)
# Add validation metadata adapter['validation_timestamp'] = datetime.now().isoformat() adapter['validation_passed'] = True
self.validation_stats['valid_items'] += 1 return item
except ValidationError as e: self.validation_stats['dropped_items'] += 1 error_type = type(e).__name__ self.validation_stats['validation_errors'][error_type] = \ self.validation_stats['validation_errors'].get(error_type, 0) + 1
self.logger.warning(f"Validation failed for item: {e}") raise DropItem(f"Validation error: {e}")
def _clean_item_fields(self, adapter): """Clean and normalize field values""" # Clean text fields text_fields = ['name', 'description', 'brand', 'category'] for field in text_fields: if adapter.get(field): cleaned = self._clean_text(adapter[field]) if cleaned != adapter[field]: adapter[field] = cleaned self.validation_stats['field_fixes'] += 1
# Clean price field if adapter.get('price'): cleaned_price = self._clean_price(adapter['price']) if cleaned_price != adapter['price']: adapter['price'] = cleaned_price self.validation_stats['field_fixes'] += 1
# Clean URL fields url_fields = ['url', 'image_url'] for field in url_fields: if adapter.get(field): cleaned_url = self._clean_url(adapter[field]) if cleaned_url != adapter[field]: adapter[field] = cleaned_url self.validation_stats['field_fixes'] += 1
# Normalize boolean fields bool_fields = ['in_stock', 'featured', 'on_sale'] for field in bool_fields: if field in adapter: adapter[field] = self._normalize_boolean(adapter[field])
def _clean_text(self, text: str) -> str: """Clean and normalize text content""" if not isinstance(text, str): text = str(text)
# Remove excessive whitespace text = re.sub(r'\s+', ' ', text.strip())
# Remove HTML tags text = re.sub(r'<[^>]+>', '', text)
# Remove special characters that might cause issues text = re.sub(r'[\x00-\x08\x0b\x0c\x0e-\x1f\x7f-\xff]', '', text)
# Normalize quotes text = text.replace('"', '"').replace('"', '"') text = text.replace(''', "'").replace(''', "'")
return text
def _clean_price(self, price: Any) -> Optional[float]: """Clean and convert price to float""" if price is None: return None
if isinstance(price, (int, float)): return float(price)
if isinstance(price, str): # Remove currency symbols and whitespace price_clean = re.sub(r'[^\d.,]', '', price)
# Handle different decimal separators if ',' in price_clean and '.' in price_clean: # Assume comma is thousands separator price_clean = price_clean.replace(',', '') elif ',' in price_clean: # Could be decimal separator or thousands if price_clean.count(',') == 1 and len(price_clean.split(',')[1]) <= 2: price_clean = price_clean.replace(',', '.') else: price_clean = price_clean.replace(',', '')
try: return float(price_clean) except ValueError: return None
return None
def _clean_url(self, url: str) -> str: """Clean and normalize URLs""" if not isinstance(url, str): return str(url)
url = url.strip()
# Add protocol if missing if url.startswith('//'): url = 'https:' + url elif not url.startswith(('http://', 'https://')): url = 'https://' + url
return url
def _normalize_boolean(self, value: Any) -> bool: """Normalize various boolean representations""" if isinstance(value, bool): return value
if isinstance(value, str): value = value.lower().strip() return value in ('true', 'yes', '1', 'on', 'available', 'in stock')
if isinstance(value, (int, float)): return bool(value)
return False
def _validate_required_fields(self, adapter): """Validate that required fields are present""" for field in self.validation_rules['required_fields']: if not adapter.get(field): raise ValidationError(f"Required field missing: {field}")
def _validate_field_types(self, adapter): """Validate field data types""" for field, expected_types in self.validation_rules['field_types'].items(): if field in adapter and adapter[field] is not None: if not isinstance(adapter[field], expected_types): raise ValidationError( f"Field {field} has invalid type: " f"expected {expected_types}, got {type(adapter[field])}" )
def _validate_field_ranges(self, adapter): """Validate numeric field ranges""" for field, (min_val, max_val) in self.validation_rules['field_ranges'].items(): if field in adapter and adapter[field] is not None: value = adapter[field] if not (min_val <= value <= max_val): raise ValidationError( f"Field {field} value {value} outside valid range [{min_val}, {max_val}]" )
def _validate_string_patterns(self, adapter): """Validate string fields against regex patterns""" for field, pattern in self.validation_rules['string_patterns'].items(): if field in adapter and adapter[field] is not None: if not re.match(pattern, str(adapter[field])): raise ValidationError(f"Field {field} doesn't match required pattern")
def _validate_field_lengths(self, adapter): """Validate string field lengths""" for field, max_length in self.validation_rules['max_lengths'].items(): if field in adapter and adapter[field] is not None: if len(str(adapter[field])) > max_length: raise ValidationError( f"Field {field} exceeds maximum length of {max_length}" )
def _validate_business_rules(self, adapter): """Validate business-specific rules""" # Example business rules
# Price must be positive for in-stock items if adapter.get('in_stock') and adapter.get('price', 0) <= 0: raise ValidationError("In-stock items must have positive price")
# Rating must be present if review_count > 0 if adapter.get('review_count', 0) > 0 and not adapter.get('rating'): raise ValidationError("Items with reviews must have rating")
# Discount validation original_price = adapter.get('original_price') current_price = adapter.get('price') if original_price and current_price: if current_price > original_price: raise ValidationError("Current price cannot exceed original price")
def _calculate_quality_score(self, adapter) -> float: """Calculate data quality score (0-1)""" score = 0.0 max_score = 0.0
# Completeness score (40% of total) important_fields = ['name', 'price', 'description', 'brand', 'category'] present_fields = sum(1 for field in important_fields if adapter.get(field)) completeness = present_fields / len(important_fields) score += completeness * 0.4 max_score += 0.4
# Accuracy score (30% of total) accuracy = 1.0 # Assume high accuracy if validation passed score += accuracy * 0.3 max_score += 0.3
# Consistency score (20% of total) consistency = self._check_consistency(adapter) score += consistency * 0.2 max_score += 0.2
# Freshness score (10% of total) freshness = 1.0 # Assume fresh data score += freshness * 0.1 max_score += 0.1
return score / max_score if max_score > 0 else 0.0
def _check_consistency(self, adapter) -> float: """Check internal consistency of data""" consistency_score = 1.0
# Check if price and in_stock are consistent if adapter.get('price', 0) <= 0 and adapter.get('in_stock', False): consistency_score -= 0.3
# Check if rating and review_count are consistent rating = adapter.get('rating') review_count = adapter.get('review_count', 0) if rating and rating > 0 and review_count == 0: consistency_score -= 0.2
return max(0.0, consistency_score)
def close_spider(self, spider): """Log validation statistics when spider closes""" stats = self.validation_stats total = stats['total_items']
self.logger.info("=== Validation Pipeline Statistics ===") self.logger.info(f"Total items processed: {total}") self.logger.info(f"Valid items: {stats['valid_items']} ({stats['valid_items']/total*100:.1f}%)") self.logger.info(f"Dropped items: {stats['dropped_items']} ({stats['dropped_items']/total*100:.1f}%)") self.logger.info(f"Field fixes applied: {stats['field_fixes']}")
if stats['validation_errors']: self.logger.info("Validation errors by type:") for error_type, count in stats['validation_errors'].items(): self.logger.info(f" {error_type}: {count}")
class ValidationError(Exception): """Custom exception for validation errors""" pass
class DataEnrichmentPipeline: """Enrich items with additional computed fields"""
def process_item(self, item, spider): adapter = ItemAdapter(item)
# Add computed fields self._add_price_analysis(adapter) self._add_text_analysis(adapter) self._add_category_classification(adapter) self._add_uniqueness_hash(adapter)
return item
def _add_price_analysis(self, adapter): """Add price-related computed fields""" current_price = adapter.get('price') original_price = adapter.get('original_price')
if current_price and original_price: discount = original_price - current_price discount_percent = (discount / original_price) * 100
adapter['discount_amount'] = discount adapter['discount_percentage'] = round(discount_percent, 2) adapter['is_on_sale'] = discount > 0
# Price tier classification if current_price: if current_price < 50: adapter['price_tier'] = 'budget' elif current_price < 200: adapter['price_tier'] = 'mid-range' elif current_price < 500: adapter['price_tier'] = 'premium' else: adapter['price_tier'] = 'luxury'
def _add_text_analysis(self, adapter): """Add text analysis metrics""" description = adapter.get('description', '')
if description: words = description.split() adapter['description_word_count'] = len(words) adapter['description_char_count'] = len(description)
# Simple sentiment analysis (in production, use proper NLP) positive_words = ['excellent', 'great', 'amazing', 'perfect', 'wonderful'] negative_words = ['bad', 'terrible', 'awful', 'poor', 'disappointing']
pos_count = sum(1 for word in words if word.lower() in positive_words) neg_count = sum(1 for word in words if word.lower() in negative_words)
if pos_count > neg_count: adapter['sentiment'] = 'positive' elif neg_count > pos_count: adapter['sentiment'] = 'negative' else: adapter['sentiment'] = 'neutral'
def _add_category_classification(self, adapter): """Add automatic category classification""" name = adapter.get('name', '').lower() description = adapter.get('description', '').lower() text = f"{name} {description}"
# Simple keyword-based classification categories = { 'electronics': ['phone', 'laptop', 'computer', 'tablet', 'tv', 'camera'], 'clothing': ['shirt', 'pants', 'dress', 'shoes', 'jacket', 'jeans'], 'home': ['furniture', 'kitchen', 'bedroom', 'bathroom', 'decor'], 'books': ['book', 'novel', 'textbook', 'manual', 'guide'], 'sports': ['fitness', 'exercise', 'sports', 'outdoor', 'gym'] }
scores = {} for category, keywords in categories.items(): score = sum(1 for keyword in keywords if keyword in text) if score > 0: scores[category] = score
if scores: adapter['auto_category'] = max(scores, key=scores.get) adapter['category_confidence'] = max(scores.values()) / len(categories[adapter['auto_category']])
def _add_uniqueness_hash(self, adapter): """Add uniqueness hash for deduplication""" # Create hash from key identifying fields key_fields = ['name', 'brand', 'sku'] hash_input = ''.join(str(adapter.get(field, '')) for field in key_fields)
if hash_input.strip(): adapter['uniqueness_hash'] = hashlib.md5( hash_input.lower().encode('utf-8') ).hexdigest()
Multiple Storage Backends
MongoDB Storage Pipeline
import pymongofrom pymongo import MongoClient, UpdateOnefrom itemadapter import ItemAdapterfrom datetime import datetimeimport loggingfrom typing import Dict, List
class MongoDBPipeline: """Advanced MongoDB storage pipeline with indexing and aggregation"""
def __init__(self, mongo_uri, mongo_db, collection_name): self.mongo_uri = mongo_uri self.mongo_db = mongo_db self.collection_name = collection_name self.client = None self.db = None self.collection = None self.logger = logging.getLogger(__name__)
# Batch processing self.batch_size = 100 self.batch_items = []
# Statistics self.stats = { 'inserted': 0, 'updated': 0, 'duplicates': 0, 'errors': 0 }
@classmethod def from_crawler(cls, crawler): return cls( mongo_uri=crawler.settings.get("MONGO_URI", "mongodb://localhost:27017"), mongo_db=crawler.settings.get("MONGO_DATABASE", "scrapy"), collection_name=crawler.settings.get("MONGO_COLLECTION", "items") )
def open_spider(self, spider): """Initialize MongoDB connection and setup indexes""" self.client = MongoClient(self.mongo_uri) self.db = self.client[self.mongo_db] self.collection = self.db[self.collection_name]
# Create indexes for performance self._create_indexes()
self.logger.info(f"Connected to MongoDB: {self.mongo_uri}/{self.mongo_db}")
def _create_indexes(self): """Create database indexes for optimal performance""" indexes = [ # Unique index for deduplication ("uniqueness_hash", pymongo.ASCENDING),
# Search indexes ("name", pymongo.TEXT), ("description", pymongo.TEXT), ("brand", pymongo.ASCENDING), ("category", pymongo.ASCENDING),
# Filter indexes ("price", pymongo.ASCENDING), ("rating", pymongo.DESCENDING), ("in_stock", pymongo.ASCENDING),
# Time-based indexes ("scraped_at", pymongo.DESCENDING), ("updated_at", pymongo.DESCENDING),
# Compound indexes (("category", pymongo.ASCENDING), ("price", pymongo.ASCENDING)), (("brand", pymongo.ASCENDING), ("rating", pymongo.DESCENDING)), ]
for index in indexes: try: if isinstance(index, tuple): self.collection.create_index([index]) else: self.collection.create_index(index) except Exception as e: self.logger.warning(f"Failed to create index {index}: {e}")
# Create text index for search try: self.collection.create_index([ ("name", pymongo.TEXT), ("description", pymongo.TEXT), ("brand", pymongo.TEXT) ], name="text_search") except Exception as e: self.logger.warning(f"Failed to create text index: {e}")
def process_item(self, item, spider): """Process item and add to batch""" adapter = ItemAdapter(item)
# Add MongoDB-specific fields adapter['_scraped_at'] = datetime.utcnow() adapter['_spider_name'] = spider.name
# Add to batch self.batch_items.append(dict(adapter))
# Process batch if full if len(self.batch_items) >= self.batch_size: self._process_batch()
return item
def _process_batch(self): """Process batch of items with upsert operations""" if not self.batch_items: return
try: operations = []
for item in self.batch_items: uniqueness_hash = item.get('uniqueness_hash')
if uniqueness_hash: # Upsert based on uniqueness hash operation = UpdateOne( {'uniqueness_hash': uniqueness_hash}, { '$set': item, '$setOnInsert': {'_created_at': datetime.utcnow()}, '$currentDate': {'_updated_at': True} }, upsert=True ) operations.append(operation) else: # Insert without duplicate check self.collection.insert_one(item) self.stats['inserted'] += 1
if operations: result = self.collection.bulk_write(operations, ordered=False) self.stats['inserted'] += result.upserted_count self.stats['updated'] += result.modified_count self.stats['duplicates'] += len(operations) - result.upserted_count - result.modified_count
except Exception as e: self.stats['errors'] += len(self.batch_items) self.logger.error(f"Error processing batch: {e}")
finally: self.batch_items.clear()
def close_spider(self, spider): """Process remaining items and close connection""" # Process remaining batch self._process_batch()
# Log statistics self.logger.info("=== MongoDB Pipeline Statistics ===") for stat, count in self.stats.items(): self.logger.info(f"{stat.capitalize()}: {count}")
# Create aggregation pipelines for analytics self._create_analytics_views()
# Close connection if self.client: self.client.close()
def _create_analytics_views(self): """Create MongoDB views for analytics""" try: # Category statistics view self.db.create_collection("category_stats", viewOn=self.collection_name, pipeline=[ {"$group": { "_id": "$category", "total_products": {"$sum": 1}, "avg_price": {"$avg": "$price"}, "avg_rating": {"$avg": "$rating"}, "in_stock_count": {"$sum": {"$cond": ["$in_stock", 1, 0]}} }}, {"$sort": {"total_products": -1}} ])
# Brand statistics view self.db.create_collection("brand_stats", viewOn=self.collection_name, pipeline=[ {"$group": { "_id": "$brand", "total_products": {"$sum": 1}, "avg_price": {"$avg": "$price"}, "price_range": { "min": {"$min": "$price"}, "max": {"$max": "$price"} } }}, {"$sort": {"total_products": -1}} ])
# Daily scraping statistics self.db.create_collection("daily_stats", viewOn=self.collection_name, pipeline=[ {"$group": { "_id": { "$dateToString": { "format": "%Y-%m-%d", "date": "$_scraped_at" } }, "items_scraped": {"$sum": 1}, "unique_brands": {"$addToSet": "$brand"}, "unique_categories": {"$addToSet": "$category"} }}, {"$addFields": { "unique_brand_count": {"$size": "$unique_brands"}, "unique_category_count": {"$size": "$unique_categories"} }}, {"$sort": {"_id": -1}} ])
except Exception as e: self.logger.warning(f"Failed to create analytics views: {e}")
class MongoDBAnalytics: """Advanced analytics queries for MongoDB"""
def __init__(self, mongo_uri, mongo_db, collection_name): self.client = MongoClient(mongo_uri) self.db = self.client[mongo_db] self.collection = self.db[collection_name]
def get_price_trends(self, days: int = 30) -> List[Dict]: """Get price trends over time""" pipeline = [ { "$match": { "_scraped_at": { "$gte": datetime.utcnow() - timedelta(days=days) } } }, { "$group": { "_id": { "date": {"$dateToString": {"format": "%Y-%m-%d", "date": "$_scraped_at"}}, "category": "$category" }, "avg_price": {"$avg": "$price"}, "min_price": {"$min": "$price"}, "max_price": {"$max": "$price"}, "product_count": {"$sum": 1} } }, {"$sort": {"_id.date": 1}} ]
return list(self.collection.aggregate(pipeline))
def get_top_products(self, category: str = None, limit: int = 10) -> List[Dict]: """Get top-rated products""" match_stage = {"rating": {"$exists": True, "$gte": 4.0}} if category: match_stage["category"] = category
pipeline = [ {"$match": match_stage}, {"$sort": {"rating": -1, "review_count": -1}}, {"$limit": limit}, { "$project": { "name": 1, "brand": 1, "price": 1, "rating": 1, "review_count": 1, "url": 1 } } ]
return list(self.collection.aggregate(pipeline))
def get_inventory_alerts(self) -> List[Dict]: """Get products that went out of stock recently""" pipeline = [ { "$match": { "in_stock": False, "_scraped_at": {"$gte": datetime.utcnow() - timedelta(days=1)} } }, { "$project": { "name": 1, "brand": 1, "price": 1, "category": 1, "_scraped_at": 1 } }, {"$sort": {"_scraped_at": -1}} ]
return list(self.collection.aggregate(pipeline))
PostgreSQL Storage Pipeline
import psycopg2import psycopg2.extrasfrom itemadapter import ItemAdapterfrom datetime import datetimeimport loggingimport jsonfrom typing import Dict, List, Optional
class PostgreSQLPipeline: """Advanced PostgreSQL storage with JSONB and full-text search"""
def __init__(self, postgres_settings): self.postgres_settings = postgres_settings self.connection = None self.cursor = None self.logger = logging.getLogger(__name__)
# Batch processing self.batch_size = 50 self.batch_items = []
# Statistics self.stats = { 'inserted': 0, 'updated': 0, 'errors': 0 }
@classmethod def from_crawler(cls, crawler): postgres_settings = crawler.settings.getdict("POSTGRES_SETTINGS") return cls(postgres_settings)
def open_spider(self, spider): """Initialize PostgreSQL connection and setup tables""" try: self.connection = psycopg2.connect(**self.postgres_settings) self.cursor = self.connection.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
# Create tables and indexes self._create_tables() self._create_indexes()
self.logger.info("Connected to PostgreSQL successfully")
except Exception as e: self.logger.error(f"Error connecting to PostgreSQL: {e}") raise
def _create_tables(self): """Create database tables""" create_tables_sql = """ -- Main products table CREATE TABLE IF NOT EXISTS products ( id SERIAL PRIMARY KEY, uniqueness_hash VARCHAR(32) UNIQUE, name VARCHAR(500) NOT NULL, brand VARCHAR(100), category VARCHAR(100), price DECIMAL(10,2), original_price DECIMAL(10,2), currency VARCHAR(3) DEFAULT 'USD', rating DECIMAL(3,2), review_count INTEGER, in_stock BOOLEAN DEFAULT true, description TEXT, specifications JSONB, images JSONB, url VARCHAR(1000), source_spider VARCHAR(50), data_quality_score DECIMAL(3,2), scraped_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, raw_data JSONB );
-- Price history table CREATE TABLE IF NOT EXISTS price_history ( id SERIAL PRIMARY KEY, product_id INTEGER REFERENCES products(id), price DECIMAL(10,2) NOT NULL, original_price DECIMAL(10,2), scraped_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP );
-- Categories table CREATE TABLE IF NOT EXISTS categories ( id SERIAL PRIMARY KEY, name VARCHAR(100) UNIQUE NOT NULL, parent_category VARCHAR(100), description TEXT );
-- Brands table CREATE TABLE IF NOT EXISTS brands ( id SERIAL PRIMARY KEY, name VARCHAR(100) UNIQUE NOT NULL, website VARCHAR(255), description TEXT );
-- Search statistics table CREATE TABLE IF NOT EXISTS search_stats ( id SERIAL PRIMARY KEY, search_term VARCHAR(255), results_count INTEGER, avg_price DECIMAL(10,2), searched_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); """
try: self.cursor.execute(create_tables_sql) self.connection.commit() self.logger.info("Database tables created successfully") except Exception as e: self.logger.error(f"Error creating tables: {e}") self.connection.rollback()
def _create_indexes(self): """Create database indexes""" indexes_sql = [ "CREATE INDEX IF NOT EXISTS idx_products_name ON products USING gin(to_tsvector('english', name));", "CREATE INDEX IF NOT EXISTS idx_products_brand ON products(brand);", "CREATE INDEX IF NOT EXISTS idx_products_category ON products(category);", "CREATE INDEX IF NOT EXISTS idx_products_price ON products(price);", "CREATE INDEX IF NOT EXISTS idx_products_rating ON products(rating DESC);", "CREATE INDEX IF NOT EXISTS idx_products_scraped_at ON products(scraped_at DESC);", "CREATE INDEX IF NOT EXISTS idx_products_in_stock ON products(in_stock);", "CREATE INDEX IF NOT EXISTS idx_products_data_quality ON products(data_quality_score DESC);", "CREATE INDEX IF NOT EXISTS idx_price_history_product_id ON price_history(product_id);", "CREATE INDEX IF NOT EXISTS idx_price_history_scraped_at ON price_history(scraped_at DESC);",
# JSONB indexes "CREATE INDEX IF NOT EXISTS idx_products_specs ON products USING gin(specifications);", "CREATE INDEX IF NOT EXISTS idx_products_raw_data ON products USING gin(raw_data);",
# Full-text search index "CREATE INDEX IF NOT EXISTS idx_products_fulltext ON products USING gin(to_tsvector('english', name || ' ' || COALESCE(description, '')));" ]
for index_sql in indexes_sql: try: self.cursor.execute(index_sql) self.connection.commit() except Exception as e: self.logger.warning(f"Error creating index: {e}") self.connection.rollback()
def process_item(self, item, spider): """Process item and add to batch""" adapter = ItemAdapter(item)
# Prepare item for PostgreSQL postgres_item = self._prepare_item(adapter, spider) self.batch_items.append(postgres_item)
# Process batch if full if len(self.batch_items) >= self.batch_size: self._process_batch()
return item
def _prepare_item(self, adapter: ItemAdapter, spider) -> Dict: """Prepare item for PostgreSQL storage""" return { 'uniqueness_hash': adapter.get('uniqueness_hash'), 'name': adapter.get('name'), 'brand': adapter.get('brand'), 'category': adapter.get('category'), 'price': adapter.get('price'), 'original_price': adapter.get('original_price'), 'currency': adapter.get('currency', 'USD'), 'rating': adapter.get('rating'), 'review_count': adapter.get('review_count'), 'in_stock': adapter.get('in_stock', True), 'description': adapter.get('description'), 'specifications': json.dumps(adapter.get('specifications', {})), 'images': json.dumps(adapter.get('images', [])), 'url': adapter.get('url'), 'source_spider': spider.name, 'data_quality_score': adapter.get('data_quality_score'), 'raw_data': json.dumps(dict(adapter)) }
def _process_batch(self): """Process batch of items with upsert""" if not self.batch_items: return
upsert_sql = """ INSERT INTO products ( uniqueness_hash, name, brand, category, price, original_price, currency, rating, review_count, in_stock, description, specifications, images, url, source_spider, data_quality_score, raw_data ) VALUES ( %(uniqueness_hash)s, %(name)s, %(brand)s, %(category)s, %(price)s, %(original_price)s, %(currency)s, %(rating)s, %(review_count)s, %(in_stock)s, %(description)s, %(specifications)s, %(images)s, %(url)s, %(source_spider)s, %(data_quality_score)s, %(raw_data)s ) ON CONFLICT (uniqueness_hash) DO UPDATE SET name = EXCLUDED.name, brand = EXCLUDED.brand, category = EXCLUDED.category, price = EXCLUDED.price, original_price = EXCLUDED.original_price, rating = EXCLUDED.rating, review_count = EXCLUDED.review_count, in_stock = EXCLUDED.in_stock, description = EXCLUDED.description, specifications = EXCLUDED.specifications, images = EXCLUDED.images, updated_at = CURRENT_TIMESTAMP, raw_data = EXCLUDED.raw_data RETURNING id, (xmax = 0) AS inserted; """
try: # Execute batch upsert results = [] for item in self.batch_items: self.cursor.execute(upsert_sql, item) result = self.cursor.fetchone() results.append(result)
# Insert price history self._insert_price_history(results)
self.connection.commit()
# Update statistics for result in results: if result['inserted']: self.stats['inserted'] += 1 else: self.stats['updated'] += 1
except Exception as e: self.stats['errors'] += len(self.batch_items) self.logger.error(f"Error processing batch: {e}") self.connection.rollback()
finally: self.batch_items.clear()
def _insert_price_history(self, results: List[Dict]): """Insert price history records""" price_history_sql = """ INSERT INTO price_history (product_id, price, original_price) VALUES (%s, %s, %s) """
price_history_data = [] for i, result in enumerate(results): item = self.batch_items[i] if item.get('price'): price_history_data.append(( result['id'], item['price'], item.get('original_price') ))
if price_history_data: self.cursor.executemany(price_history_sql, price_history_data)
def close_spider(self, spider): """Process remaining items and close connection""" # Process remaining batch self._process_batch()
# Update categories and brands tables self._update_reference_tables()
# Log statistics self.logger.info("=== PostgreSQL Pipeline Statistics ===") for stat, count in self.stats.items(): self.logger.info(f"{stat.capitalize()}: {count}")
# Close connection if self.cursor: self.cursor.close() if self.connection: self.connection.close()
def _update_reference_tables(self): """Update categories and brands reference tables""" try: # Update categories self.cursor.execute(""" INSERT INTO categories (name) SELECT DISTINCT category FROM products WHERE category IS NOT NULL ON CONFLICT (name) DO NOTHING; """)
# Update brands self.cursor.execute(""" INSERT INTO brands (name) SELECT DISTINCT brand FROM products WHERE brand IS NOT NULL ON CONFLICT (name) DO NOTHING; """)
self.connection.commit()
except Exception as e: self.logger.error(f"Error updating reference tables: {e}") self.connection.rollback()
class PostgreSQLAnalytics: """Advanced analytics queries for PostgreSQL"""
def __init__(self, postgres_settings): self.connection = psycopg2.connect(**postgres_settings) self.cursor = self.connection.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
def search_products(self, query: str, limit: int = 50) -> List[Dict]: """Full-text search for products""" search_sql = """ SELECT id, name, brand, category, price, rating, review_count, url, ts_rank(to_tsvector('english', name || ' ' || COALESCE(description, '')), plainto_tsquery('english', %s)) AS rank FROM products WHERE to_tsvector('english', name || ' ' || COALESCE(description, '')) @@ plainto_tsquery('english', %s) ORDER BY rank DESC, rating DESC NULLS LAST LIMIT %s; """
self.cursor.execute(search_sql, (query, query, limit)) return self.cursor.fetchall()
def get_price_trends(self, product_id: int, days: int = 30) -> List[Dict]: """Get price trends for a specific product""" trends_sql = """ SELECT DATE(scraped_at) as date, AVG(price) as avg_price, MIN(price) as min_price, MAX(price) as max_price, COUNT(*) as data_points FROM price_history WHERE product_id = %s AND scraped_at >= CURRENT_DATE - INTERVAL '%s days' GROUP BY DATE(scraped_at) ORDER BY date; """
self.cursor.execute(trends_sql, (product_id, days)) return self.cursor.fetchall()
def get_category_insights(self) -> List[Dict]: """Get insights by category""" insights_sql = """ SELECT category, COUNT(*) as product_count, AVG(price) as avg_price, MIN(price) as min_price, MAX(price) as max_price, AVG(rating) as avg_rating, COUNT(CASE WHEN in_stock THEN 1 END) as in_stock_count, AVG(data_quality_score) as avg_quality_score FROM products WHERE category IS NOT NULL GROUP BY category ORDER BY product_count DESC; """
self.cursor.execute(insights_sql) return self.cursor.fetchall()
Summary and Next Steps
In this comprehensive part, you’ve mastered:
✅ Advanced data validation and cleaning pipelines
✅ Multiple storage backends (MongoDB, PostgreSQL)
✅ Data enrichment and computed fields
✅ Batch processing and performance optimization
✅ Analytics and reporting capabilities
✅ Data quality monitoring and scoring
What’s Next?
In Part 5: Production Deployment, we’ll cover:
- Docker containerization and orchestration
- Cloud deployment (AWS, GCP, Azure)
- CI/CD pipelines for scrapers
- Production monitoring and logging
- Scaling strategies and auto-scaling
- Security and compliance in production
Practice Exercise
Build a complete data processing pipeline that:
- Validates and cleans e-commerce data with 95%+ quality scores
- Stores data in both MongoDB and PostgreSQL
- Provides real-time analytics and insights
- Implements automated data quality monitoring
- Handles millions of products with optimal performance
Happy data processing! 📊