Master Web Scraping with Scrapy - Part 4

Master Web Scraping - Part 4: Data Processing and Storage

ST

Surendra Tamang

55 min read advanced

Prerequisites

  • Completed Parts 1-3 of this series
  • Knowledge of SQL and NoSQL databases
  • Understanding of data validation concepts
  • Basic knowledge of ETL processes

Master Web Scraping with Scrapy: Data Processing and Storage

Welcome to Part 4 of our comprehensive Scrapy series! In this tutorial, we’ll explore advanced data processing, validation, and storage techniques that transform raw scraped data into clean, structured, and actionable information.

What You’ll Learn in This Part

  • Advanced data cleaning and validation pipelines
  • Multiple storage backends (MongoDB, PostgreSQL, Elasticsearch)
  • Real-time data processing and streaming
  • Data quality monitoring and alerting
  • ETL processes and data warehousing
  • Data deduplication and conflict resolution
  • Performance optimization for data pipelines
  • Data backup and recovery strategies

Advanced Data Cleaning and Validation

Comprehensive Data Validation Pipeline

webscraper/pipelines.py
import re
import hashlib
import json
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Any
from itemadapter import ItemAdapter
from scrapy.exceptions import DropItem
import logging
class AdvancedValidationPipeline:
"""Comprehensive data validation and cleaning pipeline"""
def __init__(self):
self.logger = logging.getLogger(__name__)
self.validation_stats = {
'total_items': 0,
'valid_items': 0,
'dropped_items': 0,
'field_fixes': 0,
'validation_errors': {}
}
# Validation rules configuration
self.validation_rules = {
'required_fields': ['name', 'url'],
'field_types': {
'price': (int, float),
'rating': (int, float),
'review_count': int,
'in_stock': bool
},
'field_ranges': {
'price': (0, 1000000),
'rating': (0, 5),
'review_count': (0, float('inf'))
},
'string_patterns': {
'email': r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$',
'url': r'^https?:\/\/[^\s/$.?#].[^\s]*$',
'phone': r'^[\+]?[1-9][\d]{0,15}$'
},
'max_lengths': {
'name': 200,
'description': 5000,
'brand': 100
}
}
def process_item(self, item, spider):
adapter = ItemAdapter(item)
self.validation_stats['total_items'] += 1
try:
# Clean and validate all fields
self._clean_item_fields(adapter)
self._validate_required_fields(adapter)
self._validate_field_types(adapter)
self._validate_field_ranges(adapter)
self._validate_string_patterns(adapter)
self._validate_field_lengths(adapter)
# Additional business logic validation
self._validate_business_rules(adapter)
# Generate quality score
adapter['data_quality_score'] = self._calculate_quality_score(adapter)
# Add validation metadata
adapter['validation_timestamp'] = datetime.now().isoformat()
adapter['validation_passed'] = True
self.validation_stats['valid_items'] += 1
return item
except ValidationError as e:
self.validation_stats['dropped_items'] += 1
error_type = type(e).__name__
self.validation_stats['validation_errors'][error_type] = \
self.validation_stats['validation_errors'].get(error_type, 0) + 1
self.logger.warning(f"Validation failed for item: {e}")
raise DropItem(f"Validation error: {e}")
def _clean_item_fields(self, adapter):
"""Clean and normalize field values"""
# Clean text fields
text_fields = ['name', 'description', 'brand', 'category']
for field in text_fields:
if adapter.get(field):
cleaned = self._clean_text(adapter[field])
if cleaned != adapter[field]:
adapter[field] = cleaned
self.validation_stats['field_fixes'] += 1
# Clean price field
if adapter.get('price'):
cleaned_price = self._clean_price(adapter['price'])
if cleaned_price != adapter['price']:
adapter['price'] = cleaned_price
self.validation_stats['field_fixes'] += 1
# Clean URL fields
url_fields = ['url', 'image_url']
for field in url_fields:
if adapter.get(field):
cleaned_url = self._clean_url(adapter[field])
if cleaned_url != adapter[field]:
adapter[field] = cleaned_url
self.validation_stats['field_fixes'] += 1
# Normalize boolean fields
bool_fields = ['in_stock', 'featured', 'on_sale']
for field in bool_fields:
if field in adapter:
adapter[field] = self._normalize_boolean(adapter[field])
def _clean_text(self, text: str) -> str:
"""Clean and normalize text content"""
if not isinstance(text, str):
text = str(text)
# Remove excessive whitespace
text = re.sub(r'\s+', ' ', text.strip())
# Remove HTML tags
text = re.sub(r'<[^>]+>', '', text)
# Remove special characters that might cause issues
text = re.sub(r'[\x00-\x08\x0b\x0c\x0e-\x1f\x7f-\xff]', '', text)
# Normalize quotes
text = text.replace('"', '"').replace('"', '"')
text = text.replace(''', "'").replace(''', "'")
return text
def _clean_price(self, price: Any) -> Optional[float]:
"""Clean and convert price to float"""
if price is None:
return None
if isinstance(price, (int, float)):
return float(price)
if isinstance(price, str):
# Remove currency symbols and whitespace
price_clean = re.sub(r'[^\d.,]', '', price)
# Handle different decimal separators
if ',' in price_clean and '.' in price_clean:
# Assume comma is thousands separator
price_clean = price_clean.replace(',', '')
elif ',' in price_clean:
# Could be decimal separator or thousands
if price_clean.count(',') == 1 and len(price_clean.split(',')[1]) <= 2:
price_clean = price_clean.replace(',', '.')
else:
price_clean = price_clean.replace(',', '')
try:
return float(price_clean)
except ValueError:
return None
return None
def _clean_url(self, url: str) -> str:
"""Clean and normalize URLs"""
if not isinstance(url, str):
return str(url)
url = url.strip()
# Add protocol if missing
if url.startswith('//'):
url = 'https:' + url
elif not url.startswith(('http://', 'https://')):
url = 'https://' + url
return url
def _normalize_boolean(self, value: Any) -> bool:
"""Normalize various boolean representations"""
if isinstance(value, bool):
return value
if isinstance(value, str):
value = value.lower().strip()
return value in ('true', 'yes', '1', 'on', 'available', 'in stock')
if isinstance(value, (int, float)):
return bool(value)
return False
def _validate_required_fields(self, adapter):
"""Validate that required fields are present"""
for field in self.validation_rules['required_fields']:
if not adapter.get(field):
raise ValidationError(f"Required field missing: {field}")
def _validate_field_types(self, adapter):
"""Validate field data types"""
for field, expected_types in self.validation_rules['field_types'].items():
if field in adapter and adapter[field] is not None:
if not isinstance(adapter[field], expected_types):
raise ValidationError(
f"Field {field} has invalid type: "
f"expected {expected_types}, got {type(adapter[field])}"
)
def _validate_field_ranges(self, adapter):
"""Validate numeric field ranges"""
for field, (min_val, max_val) in self.validation_rules['field_ranges'].items():
if field in adapter and adapter[field] is not None:
value = adapter[field]
if not (min_val <= value <= max_val):
raise ValidationError(
f"Field {field} value {value} outside valid range [{min_val}, {max_val}]"
)
def _validate_string_patterns(self, adapter):
"""Validate string fields against regex patterns"""
for field, pattern in self.validation_rules['string_patterns'].items():
if field in adapter and adapter[field] is not None:
if not re.match(pattern, str(adapter[field])):
raise ValidationError(f"Field {field} doesn't match required pattern")
def _validate_field_lengths(self, adapter):
"""Validate string field lengths"""
for field, max_length in self.validation_rules['max_lengths'].items():
if field in adapter and adapter[field] is not None:
if len(str(adapter[field])) > max_length:
raise ValidationError(
f"Field {field} exceeds maximum length of {max_length}"
)
def _validate_business_rules(self, adapter):
"""Validate business-specific rules"""
# Example business rules
# Price must be positive for in-stock items
if adapter.get('in_stock') and adapter.get('price', 0) <= 0:
raise ValidationError("In-stock items must have positive price")
# Rating must be present if review_count > 0
if adapter.get('review_count', 0) > 0 and not adapter.get('rating'):
raise ValidationError("Items with reviews must have rating")
# Discount validation
original_price = adapter.get('original_price')
current_price = adapter.get('price')
if original_price and current_price:
if current_price > original_price:
raise ValidationError("Current price cannot exceed original price")
def _calculate_quality_score(self, adapter) -> float:
"""Calculate data quality score (0-1)"""
score = 0.0
max_score = 0.0
# Completeness score (40% of total)
important_fields = ['name', 'price', 'description', 'brand', 'category']
present_fields = sum(1 for field in important_fields if adapter.get(field))
completeness = present_fields / len(important_fields)
score += completeness * 0.4
max_score += 0.4
# Accuracy score (30% of total)
accuracy = 1.0 # Assume high accuracy if validation passed
score += accuracy * 0.3
max_score += 0.3
# Consistency score (20% of total)
consistency = self._check_consistency(adapter)
score += consistency * 0.2
max_score += 0.2
# Freshness score (10% of total)
freshness = 1.0 # Assume fresh data
score += freshness * 0.1
max_score += 0.1
return score / max_score if max_score > 0 else 0.0
def _check_consistency(self, adapter) -> float:
"""Check internal consistency of data"""
consistency_score = 1.0
# Check if price and in_stock are consistent
if adapter.get('price', 0) <= 0 and adapter.get('in_stock', False):
consistency_score -= 0.3
# Check if rating and review_count are consistent
rating = adapter.get('rating')
review_count = adapter.get('review_count', 0)
if rating and rating > 0 and review_count == 0:
consistency_score -= 0.2
return max(0.0, consistency_score)
def close_spider(self, spider):
"""Log validation statistics when spider closes"""
stats = self.validation_stats
total = stats['total_items']
self.logger.info("=== Validation Pipeline Statistics ===")
self.logger.info(f"Total items processed: {total}")
self.logger.info(f"Valid items: {stats['valid_items']} ({stats['valid_items']/total*100:.1f}%)")
self.logger.info(f"Dropped items: {stats['dropped_items']} ({stats['dropped_items']/total*100:.1f}%)")
self.logger.info(f"Field fixes applied: {stats['field_fixes']}")
if stats['validation_errors']:
self.logger.info("Validation errors by type:")
for error_type, count in stats['validation_errors'].items():
self.logger.info(f" {error_type}: {count}")
class ValidationError(Exception):
"""Custom exception for validation errors"""
pass
class DataEnrichmentPipeline:
"""Enrich items with additional computed fields"""
def process_item(self, item, spider):
adapter = ItemAdapter(item)
# Add computed fields
self._add_price_analysis(adapter)
self._add_text_analysis(adapter)
self._add_category_classification(adapter)
self._add_uniqueness_hash(adapter)
return item
def _add_price_analysis(self, adapter):
"""Add price-related computed fields"""
current_price = adapter.get('price')
original_price = adapter.get('original_price')
if current_price and original_price:
discount = original_price - current_price
discount_percent = (discount / original_price) * 100
adapter['discount_amount'] = discount
adapter['discount_percentage'] = round(discount_percent, 2)
adapter['is_on_sale'] = discount > 0
# Price tier classification
if current_price:
if current_price < 50:
adapter['price_tier'] = 'budget'
elif current_price < 200:
adapter['price_tier'] = 'mid-range'
elif current_price < 500:
adapter['price_tier'] = 'premium'
else:
adapter['price_tier'] = 'luxury'
def _add_text_analysis(self, adapter):
"""Add text analysis metrics"""
description = adapter.get('description', '')
if description:
words = description.split()
adapter['description_word_count'] = len(words)
adapter['description_char_count'] = len(description)
# Simple sentiment analysis (in production, use proper NLP)
positive_words = ['excellent', 'great', 'amazing', 'perfect', 'wonderful']
negative_words = ['bad', 'terrible', 'awful', 'poor', 'disappointing']
pos_count = sum(1 for word in words if word.lower() in positive_words)
neg_count = sum(1 for word in words if word.lower() in negative_words)
if pos_count > neg_count:
adapter['sentiment'] = 'positive'
elif neg_count > pos_count:
adapter['sentiment'] = 'negative'
else:
adapter['sentiment'] = 'neutral'
def _add_category_classification(self, adapter):
"""Add automatic category classification"""
name = adapter.get('name', '').lower()
description = adapter.get('description', '').lower()
text = f"{name} {description}"
# Simple keyword-based classification
categories = {
'electronics': ['phone', 'laptop', 'computer', 'tablet', 'tv', 'camera'],
'clothing': ['shirt', 'pants', 'dress', 'shoes', 'jacket', 'jeans'],
'home': ['furniture', 'kitchen', 'bedroom', 'bathroom', 'decor'],
'books': ['book', 'novel', 'textbook', 'manual', 'guide'],
'sports': ['fitness', 'exercise', 'sports', 'outdoor', 'gym']
}
scores = {}
for category, keywords in categories.items():
score = sum(1 for keyword in keywords if keyword in text)
if score > 0:
scores[category] = score
if scores:
adapter['auto_category'] = max(scores, key=scores.get)
adapter['category_confidence'] = max(scores.values()) / len(categories[adapter['auto_category']])
def _add_uniqueness_hash(self, adapter):
"""Add uniqueness hash for deduplication"""
# Create hash from key identifying fields
key_fields = ['name', 'brand', 'sku']
hash_input = ''.join(str(adapter.get(field, '')) for field in key_fields)
if hash_input.strip():
adapter['uniqueness_hash'] = hashlib.md5(
hash_input.lower().encode('utf-8')
).hexdigest()

Multiple Storage Backends

MongoDB Storage Pipeline

webscraper/storage/mongodb_pipeline.py
import pymongo
from pymongo import MongoClient, UpdateOne
from itemadapter import ItemAdapter
from datetime import datetime
import logging
from typing import Dict, List
class MongoDBPipeline:
"""Advanced MongoDB storage pipeline with indexing and aggregation"""
def __init__(self, mongo_uri, mongo_db, collection_name):
self.mongo_uri = mongo_uri
self.mongo_db = mongo_db
self.collection_name = collection_name
self.client = None
self.db = None
self.collection = None
self.logger = logging.getLogger(__name__)
# Batch processing
self.batch_size = 100
self.batch_items = []
# Statistics
self.stats = {
'inserted': 0,
'updated': 0,
'duplicates': 0,
'errors': 0
}
@classmethod
def from_crawler(cls, crawler):
return cls(
mongo_uri=crawler.settings.get("MONGO_URI", "mongodb://localhost:27017"),
mongo_db=crawler.settings.get("MONGO_DATABASE", "scrapy"),
collection_name=crawler.settings.get("MONGO_COLLECTION", "items")
)
def open_spider(self, spider):
"""Initialize MongoDB connection and setup indexes"""
self.client = MongoClient(self.mongo_uri)
self.db = self.client[self.mongo_db]
self.collection = self.db[self.collection_name]
# Create indexes for performance
self._create_indexes()
self.logger.info(f"Connected to MongoDB: {self.mongo_uri}/{self.mongo_db}")
def _create_indexes(self):
"""Create database indexes for optimal performance"""
indexes = [
# Unique index for deduplication
("uniqueness_hash", pymongo.ASCENDING),
# Search indexes
("name", pymongo.TEXT),
("description", pymongo.TEXT),
("brand", pymongo.ASCENDING),
("category", pymongo.ASCENDING),
# Filter indexes
("price", pymongo.ASCENDING),
("rating", pymongo.DESCENDING),
("in_stock", pymongo.ASCENDING),
# Time-based indexes
("scraped_at", pymongo.DESCENDING),
("updated_at", pymongo.DESCENDING),
# Compound indexes
(("category", pymongo.ASCENDING), ("price", pymongo.ASCENDING)),
(("brand", pymongo.ASCENDING), ("rating", pymongo.DESCENDING)),
]
for index in indexes:
try:
if isinstance(index, tuple):
self.collection.create_index([index])
else:
self.collection.create_index(index)
except Exception as e:
self.logger.warning(f"Failed to create index {index}: {e}")
# Create text index for search
try:
self.collection.create_index([
("name", pymongo.TEXT),
("description", pymongo.TEXT),
("brand", pymongo.TEXT)
], name="text_search")
except Exception as e:
self.logger.warning(f"Failed to create text index: {e}")
def process_item(self, item, spider):
"""Process item and add to batch"""
adapter = ItemAdapter(item)
# Add MongoDB-specific fields
adapter['_scraped_at'] = datetime.utcnow()
adapter['_spider_name'] = spider.name
# Add to batch
self.batch_items.append(dict(adapter))
# Process batch if full
if len(self.batch_items) >= self.batch_size:
self._process_batch()
return item
def _process_batch(self):
"""Process batch of items with upsert operations"""
if not self.batch_items:
return
try:
operations = []
for item in self.batch_items:
uniqueness_hash = item.get('uniqueness_hash')
if uniqueness_hash:
# Upsert based on uniqueness hash
operation = UpdateOne(
{'uniqueness_hash': uniqueness_hash},
{
'$set': item,
'$setOnInsert': {'_created_at': datetime.utcnow()},
'$currentDate': {'_updated_at': True}
},
upsert=True
)
operations.append(operation)
else:
# Insert without duplicate check
self.collection.insert_one(item)
self.stats['inserted'] += 1
if operations:
result = self.collection.bulk_write(operations, ordered=False)
self.stats['inserted'] += result.upserted_count
self.stats['updated'] += result.modified_count
self.stats['duplicates'] += len(operations) - result.upserted_count - result.modified_count
except Exception as e:
self.stats['errors'] += len(self.batch_items)
self.logger.error(f"Error processing batch: {e}")
finally:
self.batch_items.clear()
def close_spider(self, spider):
"""Process remaining items and close connection"""
# Process remaining batch
self._process_batch()
# Log statistics
self.logger.info("=== MongoDB Pipeline Statistics ===")
for stat, count in self.stats.items():
self.logger.info(f"{stat.capitalize()}: {count}")
# Create aggregation pipelines for analytics
self._create_analytics_views()
# Close connection
if self.client:
self.client.close()
def _create_analytics_views(self):
"""Create MongoDB views for analytics"""
try:
# Category statistics view
self.db.create_collection("category_stats", viewOn=self.collection_name, pipeline=[
{"$group": {
"_id": "$category",
"total_products": {"$sum": 1},
"avg_price": {"$avg": "$price"},
"avg_rating": {"$avg": "$rating"},
"in_stock_count": {"$sum": {"$cond": ["$in_stock", 1, 0]}}
}},
{"$sort": {"total_products": -1}}
])
# Brand statistics view
self.db.create_collection("brand_stats", viewOn=self.collection_name, pipeline=[
{"$group": {
"_id": "$brand",
"total_products": {"$sum": 1},
"avg_price": {"$avg": "$price"},
"price_range": {
"min": {"$min": "$price"},
"max": {"$max": "$price"}
}
}},
{"$sort": {"total_products": -1}}
])
# Daily scraping statistics
self.db.create_collection("daily_stats", viewOn=self.collection_name, pipeline=[
{"$group": {
"_id": {
"$dateToString": {
"format": "%Y-%m-%d",
"date": "$_scraped_at"
}
},
"items_scraped": {"$sum": 1},
"unique_brands": {"$addToSet": "$brand"},
"unique_categories": {"$addToSet": "$category"}
}},
{"$addFields": {
"unique_brand_count": {"$size": "$unique_brands"},
"unique_category_count": {"$size": "$unique_categories"}
}},
{"$sort": {"_id": -1}}
])
except Exception as e:
self.logger.warning(f"Failed to create analytics views: {e}")
class MongoDBAnalytics:
"""Advanced analytics queries for MongoDB"""
def __init__(self, mongo_uri, mongo_db, collection_name):
self.client = MongoClient(mongo_uri)
self.db = self.client[mongo_db]
self.collection = self.db[collection_name]
def get_price_trends(self, days: int = 30) -> List[Dict]:
"""Get price trends over time"""
pipeline = [
{
"$match": {
"_scraped_at": {
"$gte": datetime.utcnow() - timedelta(days=days)
}
}
},
{
"$group": {
"_id": {
"date": {"$dateToString": {"format": "%Y-%m-%d", "date": "$_scraped_at"}},
"category": "$category"
},
"avg_price": {"$avg": "$price"},
"min_price": {"$min": "$price"},
"max_price": {"$max": "$price"},
"product_count": {"$sum": 1}
}
},
{"$sort": {"_id.date": 1}}
]
return list(self.collection.aggregate(pipeline))
def get_top_products(self, category: str = None, limit: int = 10) -> List[Dict]:
"""Get top-rated products"""
match_stage = {"rating": {"$exists": True, "$gte": 4.0}}
if category:
match_stage["category"] = category
pipeline = [
{"$match": match_stage},
{"$sort": {"rating": -1, "review_count": -1}},
{"$limit": limit},
{
"$project": {
"name": 1,
"brand": 1,
"price": 1,
"rating": 1,
"review_count": 1,
"url": 1
}
}
]
return list(self.collection.aggregate(pipeline))
def get_inventory_alerts(self) -> List[Dict]:
"""Get products that went out of stock recently"""
pipeline = [
{
"$match": {
"in_stock": False,
"_scraped_at": {"$gte": datetime.utcnow() - timedelta(days=1)}
}
},
{
"$project": {
"name": 1,
"brand": 1,
"price": 1,
"category": 1,
"_scraped_at": 1
}
},
{"$sort": {"_scraped_at": -1}}
]
return list(self.collection.aggregate(pipeline))

PostgreSQL Storage Pipeline

webscraper/storage/postgresql_pipeline.py
import psycopg2
import psycopg2.extras
from itemadapter import ItemAdapter
from datetime import datetime
import logging
import json
from typing import Dict, List, Optional
class PostgreSQLPipeline:
"""Advanced PostgreSQL storage with JSONB and full-text search"""
def __init__(self, postgres_settings):
self.postgres_settings = postgres_settings
self.connection = None
self.cursor = None
self.logger = logging.getLogger(__name__)
# Batch processing
self.batch_size = 50
self.batch_items = []
# Statistics
self.stats = {
'inserted': 0,
'updated': 0,
'errors': 0
}
@classmethod
def from_crawler(cls, crawler):
postgres_settings = crawler.settings.getdict("POSTGRES_SETTINGS")
return cls(postgres_settings)
def open_spider(self, spider):
"""Initialize PostgreSQL connection and setup tables"""
try:
self.connection = psycopg2.connect(**self.postgres_settings)
self.cursor = self.connection.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
# Create tables and indexes
self._create_tables()
self._create_indexes()
self.logger.info("Connected to PostgreSQL successfully")
except Exception as e:
self.logger.error(f"Error connecting to PostgreSQL: {e}")
raise
def _create_tables(self):
"""Create database tables"""
create_tables_sql = """
-- Main products table
CREATE TABLE IF NOT EXISTS products (
id SERIAL PRIMARY KEY,
uniqueness_hash VARCHAR(32) UNIQUE,
name VARCHAR(500) NOT NULL,
brand VARCHAR(100),
category VARCHAR(100),
price DECIMAL(10,2),
original_price DECIMAL(10,2),
currency VARCHAR(3) DEFAULT 'USD',
rating DECIMAL(3,2),
review_count INTEGER,
in_stock BOOLEAN DEFAULT true,
description TEXT,
specifications JSONB,
images JSONB,
url VARCHAR(1000),
source_spider VARCHAR(50),
data_quality_score DECIMAL(3,2),
scraped_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
raw_data JSONB
);
-- Price history table
CREATE TABLE IF NOT EXISTS price_history (
id SERIAL PRIMARY KEY,
product_id INTEGER REFERENCES products(id),
price DECIMAL(10,2) NOT NULL,
original_price DECIMAL(10,2),
scraped_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- Categories table
CREATE TABLE IF NOT EXISTS categories (
id SERIAL PRIMARY KEY,
name VARCHAR(100) UNIQUE NOT NULL,
parent_category VARCHAR(100),
description TEXT
);
-- Brands table
CREATE TABLE IF NOT EXISTS brands (
id SERIAL PRIMARY KEY,
name VARCHAR(100) UNIQUE NOT NULL,
website VARCHAR(255),
description TEXT
);
-- Search statistics table
CREATE TABLE IF NOT EXISTS search_stats (
id SERIAL PRIMARY KEY,
search_term VARCHAR(255),
results_count INTEGER,
avg_price DECIMAL(10,2),
searched_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
"""
try:
self.cursor.execute(create_tables_sql)
self.connection.commit()
self.logger.info("Database tables created successfully")
except Exception as e:
self.logger.error(f"Error creating tables: {e}")
self.connection.rollback()
def _create_indexes(self):
"""Create database indexes"""
indexes_sql = [
"CREATE INDEX IF NOT EXISTS idx_products_name ON products USING gin(to_tsvector('english', name));",
"CREATE INDEX IF NOT EXISTS idx_products_brand ON products(brand);",
"CREATE INDEX IF NOT EXISTS idx_products_category ON products(category);",
"CREATE INDEX IF NOT EXISTS idx_products_price ON products(price);",
"CREATE INDEX IF NOT EXISTS idx_products_rating ON products(rating DESC);",
"CREATE INDEX IF NOT EXISTS idx_products_scraped_at ON products(scraped_at DESC);",
"CREATE INDEX IF NOT EXISTS idx_products_in_stock ON products(in_stock);",
"CREATE INDEX IF NOT EXISTS idx_products_data_quality ON products(data_quality_score DESC);",
"CREATE INDEX IF NOT EXISTS idx_price_history_product_id ON price_history(product_id);",
"CREATE INDEX IF NOT EXISTS idx_price_history_scraped_at ON price_history(scraped_at DESC);",
# JSONB indexes
"CREATE INDEX IF NOT EXISTS idx_products_specs ON products USING gin(specifications);",
"CREATE INDEX IF NOT EXISTS idx_products_raw_data ON products USING gin(raw_data);",
# Full-text search index
"CREATE INDEX IF NOT EXISTS idx_products_fulltext ON products USING gin(to_tsvector('english', name || ' ' || COALESCE(description, '')));"
]
for index_sql in indexes_sql:
try:
self.cursor.execute(index_sql)
self.connection.commit()
except Exception as e:
self.logger.warning(f"Error creating index: {e}")
self.connection.rollback()
def process_item(self, item, spider):
"""Process item and add to batch"""
adapter = ItemAdapter(item)
# Prepare item for PostgreSQL
postgres_item = self._prepare_item(adapter, spider)
self.batch_items.append(postgres_item)
# Process batch if full
if len(self.batch_items) >= self.batch_size:
self._process_batch()
return item
def _prepare_item(self, adapter: ItemAdapter, spider) -> Dict:
"""Prepare item for PostgreSQL storage"""
return {
'uniqueness_hash': adapter.get('uniqueness_hash'),
'name': adapter.get('name'),
'brand': adapter.get('brand'),
'category': adapter.get('category'),
'price': adapter.get('price'),
'original_price': adapter.get('original_price'),
'currency': adapter.get('currency', 'USD'),
'rating': adapter.get('rating'),
'review_count': adapter.get('review_count'),
'in_stock': adapter.get('in_stock', True),
'description': adapter.get('description'),
'specifications': json.dumps(adapter.get('specifications', {})),
'images': json.dumps(adapter.get('images', [])),
'url': adapter.get('url'),
'source_spider': spider.name,
'data_quality_score': adapter.get('data_quality_score'),
'raw_data': json.dumps(dict(adapter))
}
def _process_batch(self):
"""Process batch of items with upsert"""
if not self.batch_items:
return
upsert_sql = """
INSERT INTO products (
uniqueness_hash, name, brand, category, price, original_price,
currency, rating, review_count, in_stock, description,
specifications, images, url, source_spider, data_quality_score, raw_data
) VALUES (
%(uniqueness_hash)s, %(name)s, %(brand)s, %(category)s, %(price)s, %(original_price)s,
%(currency)s, %(rating)s, %(review_count)s, %(in_stock)s, %(description)s,
%(specifications)s, %(images)s, %(url)s, %(source_spider)s, %(data_quality_score)s, %(raw_data)s
)
ON CONFLICT (uniqueness_hash) DO UPDATE SET
name = EXCLUDED.name,
brand = EXCLUDED.brand,
category = EXCLUDED.category,
price = EXCLUDED.price,
original_price = EXCLUDED.original_price,
rating = EXCLUDED.rating,
review_count = EXCLUDED.review_count,
in_stock = EXCLUDED.in_stock,
description = EXCLUDED.description,
specifications = EXCLUDED.specifications,
images = EXCLUDED.images,
updated_at = CURRENT_TIMESTAMP,
raw_data = EXCLUDED.raw_data
RETURNING id, (xmax = 0) AS inserted;
"""
try:
# Execute batch upsert
results = []
for item in self.batch_items:
self.cursor.execute(upsert_sql, item)
result = self.cursor.fetchone()
results.append(result)
# Insert price history
self._insert_price_history(results)
self.connection.commit()
# Update statistics
for result in results:
if result['inserted']:
self.stats['inserted'] += 1
else:
self.stats['updated'] += 1
except Exception as e:
self.stats['errors'] += len(self.batch_items)
self.logger.error(f"Error processing batch: {e}")
self.connection.rollback()
finally:
self.batch_items.clear()
def _insert_price_history(self, results: List[Dict]):
"""Insert price history records"""
price_history_sql = """
INSERT INTO price_history (product_id, price, original_price)
VALUES (%s, %s, %s)
"""
price_history_data = []
for i, result in enumerate(results):
item = self.batch_items[i]
if item.get('price'):
price_history_data.append((
result['id'],
item['price'],
item.get('original_price')
))
if price_history_data:
self.cursor.executemany(price_history_sql, price_history_data)
def close_spider(self, spider):
"""Process remaining items and close connection"""
# Process remaining batch
self._process_batch()
# Update categories and brands tables
self._update_reference_tables()
# Log statistics
self.logger.info("=== PostgreSQL Pipeline Statistics ===")
for stat, count in self.stats.items():
self.logger.info(f"{stat.capitalize()}: {count}")
# Close connection
if self.cursor:
self.cursor.close()
if self.connection:
self.connection.close()
def _update_reference_tables(self):
"""Update categories and brands reference tables"""
try:
# Update categories
self.cursor.execute("""
INSERT INTO categories (name)
SELECT DISTINCT category
FROM products
WHERE category IS NOT NULL
ON CONFLICT (name) DO NOTHING;
""")
# Update brands
self.cursor.execute("""
INSERT INTO brands (name)
SELECT DISTINCT brand
FROM products
WHERE brand IS NOT NULL
ON CONFLICT (name) DO NOTHING;
""")
self.connection.commit()
except Exception as e:
self.logger.error(f"Error updating reference tables: {e}")
self.connection.rollback()
class PostgreSQLAnalytics:
"""Advanced analytics queries for PostgreSQL"""
def __init__(self, postgres_settings):
self.connection = psycopg2.connect(**postgres_settings)
self.cursor = self.connection.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
def search_products(self, query: str, limit: int = 50) -> List[Dict]:
"""Full-text search for products"""
search_sql = """
SELECT id, name, brand, category, price, rating, review_count, url,
ts_rank(to_tsvector('english', name || ' ' || COALESCE(description, '')),
plainto_tsquery('english', %s)) AS rank
FROM products
WHERE to_tsvector('english', name || ' ' || COALESCE(description, ''))
@@ plainto_tsquery('english', %s)
ORDER BY rank DESC, rating DESC NULLS LAST
LIMIT %s;
"""
self.cursor.execute(search_sql, (query, query, limit))
return self.cursor.fetchall()
def get_price_trends(self, product_id: int, days: int = 30) -> List[Dict]:
"""Get price trends for a specific product"""
trends_sql = """
SELECT DATE(scraped_at) as date,
AVG(price) as avg_price,
MIN(price) as min_price,
MAX(price) as max_price,
COUNT(*) as data_points
FROM price_history
WHERE product_id = %s
AND scraped_at >= CURRENT_DATE - INTERVAL '%s days'
GROUP BY DATE(scraped_at)
ORDER BY date;
"""
self.cursor.execute(trends_sql, (product_id, days))
return self.cursor.fetchall()
def get_category_insights(self) -> List[Dict]:
"""Get insights by category"""
insights_sql = """
SELECT category,
COUNT(*) as product_count,
AVG(price) as avg_price,
MIN(price) as min_price,
MAX(price) as max_price,
AVG(rating) as avg_rating,
COUNT(CASE WHEN in_stock THEN 1 END) as in_stock_count,
AVG(data_quality_score) as avg_quality_score
FROM products
WHERE category IS NOT NULL
GROUP BY category
ORDER BY product_count DESC;
"""
self.cursor.execute(insights_sql)
return self.cursor.fetchall()

Summary and Next Steps

In this comprehensive part, you’ve mastered:

Advanced data validation and cleaning pipelines
Multiple storage backends (MongoDB, PostgreSQL)
Data enrichment and computed fields
Batch processing and performance optimization
Analytics and reporting capabilities
Data quality monitoring and scoring

What’s Next?

In Part 5: Production Deployment, we’ll cover:

  • Docker containerization and orchestration
  • Cloud deployment (AWS, GCP, Azure)
  • CI/CD pipelines for scrapers
  • Production monitoring and logging
  • Scaling strategies and auto-scaling
  • Security and compliance in production

Practice Exercise

Build a complete data processing pipeline that:

  1. Validates and cleans e-commerce data with 95%+ quality scores
  2. Stores data in both MongoDB and PostgreSQL
  3. Provides real-time analytics and insights
  4. Implements automated data quality monitoring
  5. Handles millions of products with optimal performance

Happy data processing! 📊