Master Web Scraping with Scrapy: Advanced Techniques
Welcome to Part 2 of our comprehensive Scrapy series! In this tutorial, we’ll explore advanced techniques for handling complex, modern websites that use JavaScript, AJAX, and dynamic content loading.
What You’ll Learn in This Part
- JavaScript rendering with Scrapy-Splash integration
- Handling forms, logins, and session management
- Extracting data from AJAX requests and APIs
- Advanced selector techniques and data extraction
- Custom middleware development
- Handling cookies, headers, and authentication
- Working with infinite scroll and pagination
JavaScript-Heavy Websites with Scrapy-Splash
Modern websites heavily rely on JavaScript for content rendering. Scrapy alone cannot execute JavaScript, so we need Scrapy-Splash for these scenarios.
Setting Up Splash
# Install Docker (required for Splash)# On macOS with Homebrew:brew install docker
# Start Docker service and run Splashdocker run -p 8050:8050 scrapinghub/splash
# Install Scrapy-Splashpip install scrapy-splash
Configuring Scrapy for Splash
# Splash settingsSPLASH_URL = 'http://localhost:8050'
# Enable Splash middlewareDOWNLOADER_MIDDLEWARES = { 'scrapy_splash.SplashCookiesMiddleware': 723, 'scrapy_splash.SplashMiddleware': 725, 'scrapy.downloadermiddlewares.httpcompression.HttpCompressionMiddleware': 810,}
# Enable Splash spider middlewareSPIDER_MIDDLEWARES = { 'scrapy_splash.SplashDeduplicateArgsMiddleware': 100,}
# Splash duplicate filterDUPEFILTER_CLASS = 'scrapy_splash.SplashAwareDupeFilter'
# Splash HTTP cache storage backendHTTPCACHE_STORAGE = 'scrapy_splash.SplashAwareFSCacheStorage'
JavaScript-Enabled Spider
import scrapyfrom scrapy_splash import SplashRequestfrom webscraper.items import ProductItemfrom scrapy.loader import ItemLoaderimport json
class SPASpider(scrapy.Spider): name = 'spa_scraper' allowed_domains = ['example-spa.com']
custom_settings = { 'DOWNLOAD_DELAY': 2, 'SPLASH_URL': 'http://localhost:8050', }
def start_requests(self): urls = ['https://example-spa.com/products']
# Lua script for complex interactions lua_script = """ function main(splash, args) splash.private_mode_enabled = false splash:go(args.url) splash:wait(3)
-- Wait for products to load splash:wait_for_resume([[ function main(splash) { var products = document.querySelectorAll('.product-item'); if (products.length > 0) { splash.resume('Products loaded'); } else { setTimeout(function() { splash.resume('Timeout'); }, 10000); } } ]], 15)
-- Scroll to load more content splash:runjs([[ window.scrollTo(0, document.body.scrollHeight); ]]) splash:wait(2)
-- Click "Load More" button if present local load_more = splash:select('.load-more-btn') if load_more then load_more:click() splash:wait(3) end
return { html = splash:html(), png = splash:png(), har = splash:har(), url = splash:url() } end """
for url in urls: yield SplashRequest( url=url, callback=self.parse, args={ 'lua_source': lua_script, 'timeout': 30, 'resource_timeout': 10, 'wait': 5, } )
def parse(self, response): """Parse SPA product listings""" self.logger.info(f'Parsing SPA page: {response.url}')
# Extract products from JavaScript-rendered content products = response.css('.product-item')
for product in products: product_url = product.css('a::attr(href)').get() if product_url: # Use SplashRequest for product pages too yield SplashRequest( url=response.urljoin(product_url), callback=self.parse_product, args={'wait': 3} )
# Handle pagination in SPA next_page_data = response.css('script[type="application/json"]::text').get() if next_page_data: try: data = json.loads(next_page_data) if data.get('nextPage'): yield SplashRequest( url=data['nextPage'], callback=self.parse, args={'wait': 3} ) except json.JSONDecodeError: pass
def parse_product(self, response): """Parse individual product from SPA""" loader = ItemLoader(item=ProductItem(), response=response)
# Extract data that might be loaded via JavaScript loader.add_css('name', 'h1.product-title::text') loader.add_css('price', '.price-display::text') loader.add_css('description', '.product-description::text')
# Extract from JavaScript variables js_data = self.extract_js_data(response) if js_data: loader.add_value('name', js_data.get('productName')) loader.add_value('price', js_data.get('price')) loader.add_value('sku', js_data.get('sku'))
loader.add_value('url', response.url) loader.add_value('source', 'spa')
yield loader.load_item()
def extract_js_data(self, response): """Extract data from JavaScript variables""" # Look for common patterns js_patterns = [ r'window\.productData\s*=\s*({[^}]+})', r'var\s+product\s*=\s*({[^}]+})', r'__INITIAL_STATE__\s*=\s*({.+?});' ]
for pattern in js_patterns: import re match = re.search(pattern, response.text) if match: try: return json.loads(match.group(1)) except json.JSONDecodeError: continue
return None
Form Handling and Authentication
Many websites require login or form submission. Here’s how to handle these scenarios:
Login Spider
import scrapyfrom scrapy import FormRequestfrom webscraper.items import ProductItem
class LoginSpider(scrapy.Spider): name = 'login_scraper' allowed_domains = ['secure-store.com'] start_urls = ['https://secure-store.com/login']
def parse(self, response): """Handle login form""" # Check if already logged in if self.is_logged_in(response): return self.after_login(response)
# Extract form data and CSRF tokens csrf_token = response.css('input[name="csrf_token"]::attr(value)').get()
# Submit login form return FormRequest.from_response( response, formdata={ 'username': 'your_username', 'password': 'your_password', 'csrf_token': csrf_token, 'remember_me': '1' }, callback=self.after_login, dont_filter=True )
def is_logged_in(self, response): """Check if successfully logged in""" return bool(response.css('.user-dashboard'))
def after_login(self, response): """Handle post-login logic""" if not self.is_logged_in(response): self.logger.error('Login failed') return
self.logger.info('Successfully logged in')
# Navigate to protected areas protected_urls = [ 'https://secure-store.com/members/products', 'https://secure-store.com/premium/catalog' ]
for url in protected_urls: yield response.follow(url, callback=self.parse_protected_content)
def parse_protected_content(self, response): """Parse content that requires authentication""" products = response.css('.premium-product')
for product in products: loader = ItemLoader(item=ProductItem(), selector=product) loader.add_css('name', '.product-name::text') loader.add_css('price', '.member-price::text') loader.add_css('description', '.product-desc::text') loader.add_value('source', 'premium')
yield loader.load_item()
Complex Form Handling
import scrapyfrom scrapy import FormRequestimport json
class FormSpider(scrapy.Spider): name = 'form_handler'
def start_requests(self): # Start with a search form yield scrapy.Request( 'https://example.com/search', callback=self.parse_search_form )
def parse_search_form(self, response): """Handle complex search forms""" # Extract all form fields and hidden values form_data = {}
# Get all input fields for input_field in response.css('form input'): name = input_field.css('::attr(name)').get() value = input_field.css('::attr(value)').get() input_type = input_field.css('::attr(type)').get()
if name: if input_type == 'checkbox' and not input_field.css('::attr(checked)').get(): continue # Skip unchecked checkboxes form_data[name] = value or ''
# Get select fields for select in response.css('form select'): name = select.css('::attr(name)').get() selected = select.css('option[selected]::attr(value)').get() if name: form_data[name] = selected or ''
# Add our search parameters search_params = { 'query': 'laptops', 'category': 'electronics', 'price_min': '500', 'price_max': '2000', 'sort': 'price_desc' } form_data.update(search_params)
# Submit form yield FormRequest.from_response( response, formdata=form_data, callback=self.parse_search_results )
def parse_search_results(self, response): """Parse search results""" products = response.css('.search-result-item')
for product in products: # Extract product details product_data = { 'name': product.css('.product-title::text').get(), 'price': product.css('.price::text').get(), 'rating': product.css('.rating::attr(data-rating)').get(), 'url': response.urljoin(product.css('a::attr(href)').get()) }
if product_data['url']: yield response.follow( product_data['url'], callback=self.parse_product, meta={'product_data': product_data} )
AJAX Requests and API Integration
Modern websites often load data via AJAX. Here’s how to intercept and work with these requests:
AJAX Spider
import scrapyimport jsonfrom urllib.parse import urlencode
class AjaxSpider(scrapy.Spider): name = 'ajax_scraper' allowed_domains = ['api-example.com']
def start_requests(self): # Start with the main page to get initial data yield scrapy.Request( 'https://api-example.com/products', callback=self.parse_initial_page )
def parse_initial_page(self, response): """Extract API endpoints and initial data""" # Look for API endpoints in JavaScript api_endpoints = self.extract_api_endpoints(response)
# Extract pagination info total_pages = response.css('.pagination::attr(data-total-pages)').get() if total_pages: total_pages = int(total_pages) else: total_pages = 10 # Default fallback
# Generate API requests for all pages for page in range(1, total_pages + 1): api_url = f'https://api-example.com/api/products' params = { 'page': page, 'limit': 20, 'format': 'json' }
url = f"{api_url}?{urlencode(params)}" yield scrapy.Request( url=url, callback=self.parse_api_response, headers={ 'Accept': 'application/json', 'X-Requested-With': 'XMLHttpRequest', 'Referer': response.url } )
def extract_api_endpoints(self, response): """Extract API endpoints from JavaScript""" endpoints = []
# Common patterns for API endpoints import re patterns = [ r'api[\'"]:\s*[\'"]([^\'\"]+)', r'endpoint[\'"]:\s*[\'"]([^\'\"]+)', r'fetch\([\'"]([^\'\"]+)', r'axios\.get\([\'"]([^\'\"]+)' ]
for pattern in patterns: matches = re.findall(pattern, response.text) endpoints.extend(matches)
return list(set(endpoints)) # Remove duplicates
def parse_api_response(self, response): """Parse JSON API responses""" try: data = json.loads(response.text) except json.JSONDecodeError: self.logger.error(f'Invalid JSON response from {response.url}') return
# Handle different API response structures products = data.get('products', data.get('data', data.get('items', [])))
for product in products: # Create item from API data product_item = { 'id': product.get('id'), 'name': product.get('name', product.get('title')), 'price': product.get('price'), 'description': product.get('description'), 'category': product.get('category'), 'images': product.get('images', []), 'url': product.get('url'), 'api_source': response.url }
yield product_item
# Handle API pagination pagination = data.get('pagination', {}) if pagination.get('hasNextPage'): next_page = pagination.get('nextPage') if next_page: yield response.follow( next_page, callback=self.parse_api_response )
Real-time Data Spider
import scrapyimport jsonimport timefrom datetime import datetime
class RealtimeSpider(scrapy.Spider): name = 'realtime_scraper'
custom_settings = { 'DOWNLOAD_DELAY': 5, # Respectful delay for real-time data 'CONCURRENT_REQUESTS': 1, # Sequential requests for real-time }
def start_requests(self): # Monitor real-time endpoints endpoints = [ 'https://api.example.com/live/stock-prices', 'https://api.example.com/live/crypto-prices', 'https://api.example.com/live/forex-rates' ]
for endpoint in endpoints: yield scrapy.Request( endpoint, callback=self.parse_realtime_data, meta={ 'endpoint_type': self.get_endpoint_type(endpoint), 'start_time': time.time() } )
def get_endpoint_type(self, endpoint): """Determine endpoint type from URL""" if 'stock' in endpoint: return 'stocks' elif 'crypto' in endpoint: return 'cryptocurrency' elif 'forex' in endpoint: return 'forex' return 'unknown'
def parse_realtime_data(self, response): """Parse real-time financial data""" try: data = json.loads(response.text) except json.JSONDecodeError: return
endpoint_type = response.meta['endpoint_type'] timestamp = datetime.now().isoformat()
# Process based on endpoint type if endpoint_type == 'stocks': yield from self.process_stock_data(data, timestamp) elif endpoint_type == 'cryptocurrency': yield from self.process_crypto_data(data, timestamp) elif endpoint_type == 'forex': yield from self.process_forex_data(data, timestamp)
# Schedule next request for continuous monitoring yield scrapy.Request( response.url, callback=self.parse_realtime_data, meta=response.meta, dont_filter=True # Allow duplicate requests )
def process_stock_data(self, data, timestamp): """Process stock price data""" stocks = data.get('stocks', [])
for stock in stocks: yield { 'type': 'stock', 'symbol': stock.get('symbol'), 'price': stock.get('price'), 'change': stock.get('change'), 'change_percent': stock.get('changePercent'), 'volume': stock.get('volume'), 'timestamp': timestamp, 'market_cap': stock.get('marketCap') }
def process_crypto_data(self, data, timestamp): """Process cryptocurrency data""" currencies = data.get('data', [])
for currency in currencies: yield { 'type': 'cryptocurrency', 'symbol': currency.get('symbol'), 'name': currency.get('name'), 'price_usd': currency.get('price_usd'), 'price_btc': currency.get('price_btc'), 'volume_24h': currency.get('24h_volume_usd'), 'market_cap': currency.get('market_cap_usd'), 'change_24h': currency.get('percent_change_24h'), 'timestamp': timestamp }
def process_forex_data(self, data, timestamp): """Process forex rates data""" rates = data.get('rates', {}) base_currency = data.get('base', 'USD')
for currency, rate in rates.items(): yield { 'type': 'forex', 'base_currency': base_currency, 'target_currency': currency, 'rate': rate, 'timestamp': timestamp }
Advanced Selector Techniques
Complex XPath and CSS Selectors
# Advanced selector utilitiesclass AdvancedSelectors:
@staticmethod def extract_with_fallbacks(response, selectors): """Try multiple selectors until one works""" for selector in selectors: if selector.startswith('//'): # XPath selector result = response.xpath(selector).get() else: # CSS selector result = response.css(selector).get()
if result: return result.strip() return None
@staticmethod def extract_text_near_element(response, anchor_text, search_area='following'): """Extract text near a specific element""" if search_area == 'following': xpath = f"//text()[contains(., '{anchor_text}')]/following::text()[1]" elif search_area == 'preceding': xpath = f"//text()[contains(., '{anchor_text}')]/preceding::text()[1]" elif search_area == 'parent': xpath = f"//text()[contains(., '{anchor_text}')]/parent::*/text()"
return response.xpath(xpath).get()
@staticmethod def extract_table_data(response, table_selector): """Extract structured data from tables""" table = response.css(table_selector) if not table: return []
headers = table.css('thead tr th::text').getall() if not headers: headers = table.css('tr:first-child td::text').getall()
rows = [] for row in table.css('tbody tr, tr')[1:]: # Skip header row cells = row.css('td::text').getall() if len(cells) == len(headers): row_data = dict(zip(headers, cells)) rows.append(row_data)
return rows
@staticmethod def extract_nested_json(response, script_selector): """Extract JSON data from script tags""" scripts = response.css(script_selector)
for script in scripts: content = script.get() # Try to find JSON objects import re import json
json_patterns = [ r'var\s+\w+\s*=\s*({.+?});', r'window\.\w+\s*=\s*({.+?});', r'data:\s*({.+?})', ]
for pattern in json_patterns: matches = re.findall(pattern, content, re.DOTALL) for match in matches: try: return json.loads(match) except json.JSONDecodeError: continue
return None
# Usage in spiderdef parse_complex_page(self, response): """Example using advanced selectors""" selectors = AdvancedSelectors()
# Try multiple price selectors price = selectors.extract_with_fallbacks(response, [ '.price-current::text', '.price::text', '//span[@class="price"]//text()', '.product-price .value::text' ])
# Extract text near "Price:" label price_alt = selectors.extract_text_near_element(response, 'Price:', 'following')
# Extract table data specs = selectors.extract_table_data(response, '.specifications-table')
# Extract JSON configuration config = selectors.extract_nested_json(response, 'script[type="application/json"]')
yield { 'price': price or price_alt, 'specifications': specs, 'config': config }
Custom Middleware Development
Rotation Middleware
import randomimport timefrom scrapy.downloadermiddlewares.useragent import UserAgentMiddlewarefrom scrapy.exceptions import NotConfigured
class RotatingUserAgentMiddleware(UserAgentMiddleware): """Rotate user agents to avoid detection"""
def __init__(self, user_agent=''): self.user_agent = user_agent
# List of realistic user agents self.user_agent_list = [ 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/121.0', 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.1 Safari/605.1.15', 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', ]
def process_request(self, request, spider): ua = random.choice(self.user_agent_list) request.headers['User-Agent'] = ua return None
class ProxyRotationMiddleware: """Rotate proxies to distribute requests"""
def __init__(self, proxy_list=None): if not proxy_list: raise NotConfigured('No proxy list provided')
self.proxy_list = proxy_list self.proxy_index = 0
@classmethod def from_crawler(cls, crawler): proxy_list = crawler.settings.getlist('PROXY_LIST') return cls(proxy_list)
def process_request(self, request, spider): proxy = self.proxy_list[self.proxy_index] self.proxy_index = (self.proxy_index + 1) % len(self.proxy_list)
request.meta['proxy'] = proxy spider.logger.debug(f'Using proxy: {proxy}')
class RetryWithBackoffMiddleware: """Implement exponential backoff for retries"""
def __init__(self, max_retry_times=3, initial_delay=1): self.max_retry_times = max_retry_times self.initial_delay = initial_delay
@classmethod def from_crawler(cls, crawler): return cls( max_retry_times=crawler.settings.getint('RETRY_TIMES', 3), initial_delay=crawler.settings.getfloat('RETRY_INITIAL_DELAY', 1) )
def process_response(self, request, response, spider): if response.status in [429, 503, 502, 504]: # Rate limited or server errors retry_times = request.meta.get('retry_times', 0)
if retry_times < self.max_retry_times: # Calculate exponential backoff delay delay = self.initial_delay * (2 ** retry_times) spider.logger.info(f'Retrying {request.url} after {delay}s (attempt {retry_times + 1})')
# Add delay time.sleep(delay)
# Create retry request retry_request = request.copy() retry_request.meta['retry_times'] = retry_times + 1 retry_request.dont_filter = True
return retry_request
return response
class HeaderRotationMiddleware: """Rotate request headers to appear more natural"""
def __init__(self): self.header_sets = [ { 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8', 'Accept-Language': 'en-US,en;q=0.5', 'Accept-Encoding': 'gzip, deflate', 'DNT': '1', 'Connection': 'keep-alive', 'Upgrade-Insecure-Requests': '1', }, { 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8', 'Accept-Language': 'en-US,en;q=0.9', 'Accept-Encoding': 'gzip, deflate, br', 'Connection': 'keep-alive', 'Sec-Fetch-Dest': 'document', 'Sec-Fetch-Mode': 'navigate', 'Sec-Fetch-Site': 'none', } ]
def process_request(self, request, spider): headers = random.choice(self.header_sets) for key, value in headers.items(): request.headers[key] = value
return None
Infinite Scroll and Dynamic Loading
import scrapyfrom scrapy_splash import SplashRequestimport json
class InfiniteScrollSpider(scrapy.Spider): name = 'infinite_scroll' allowed_domains = ['infinite-example.com']
def start_requests(self): lua_script = """ function main(splash, args) splash:go(args.url) splash:wait(2)
-- Function to scroll and wait for content local function scroll_and_wait(times) for i = 1, times do splash:runjs([[ window.scrollTo(0, document.body.scrollHeight); ]]) splash:wait(2)
-- Check if "Load More" button exists and click it local load_more = splash:select('.load-more') if load_more then load_more:click() splash:wait(3) end
-- Check if reached end local end_marker = splash:select('.end-of-content') if end_marker then break end end end
-- Scroll multiple times to load content scroll_and_wait(5)
return { html = splash:html(), url = splash:url() } end """
yield SplashRequest( url='https://infinite-example.com/products', callback=self.parse, args={ 'lua_source': lua_script, 'timeout': 60 } )
def parse(self, response): """Parse infinite scroll content""" products = response.css('.product-item')
for product in products: yield { 'name': product.css('.product-name::text').get(), 'price': product.css('.price::text').get(), 'url': response.urljoin(product.css('a::attr(href)').get()) }
# Look for AJAX endpoints to continue pagination self.extract_ajax_pagination(response)
def extract_ajax_pagination(self, response): """Extract AJAX pagination endpoints""" import re
# Look for pagination API endpoints in JavaScript ajax_patterns = [ r'loadMore[\'\"]\s*:\s*[\'\"](.*?)[\'\"', r'pagination[\'\"]\s*:\s*[\'\"](.*?)[\'\"', r'nextPage[\'\"]\s*:\s*[\'\"](.*?)[\'\"' ]
for pattern in ajax_patterns: matches = re.findall(pattern, response.text) for match in matches: if match.startswith('http') or match.startswith('/'): yield scrapy.Request( url=response.urljoin(match), callback=self.parse_ajax_page, headers={'X-Requested-With': 'XMLHttpRequest'} )
def parse_ajax_page(self, response): """Parse AJAX loaded content""" try: data = json.loads(response.text)
# Extract HTML content from AJAX response html_content = data.get('html', '') if html_content: from scrapy import Selector selector = Selector(text=html_content)
products = selector.css('.product-item') for product in products: yield { 'name': product.css('.product-name::text').get(), 'price': product.css('.price::text').get(), 'ajax_source': True }
# Continue pagination if available next_page = data.get('nextPage') if next_page: yield response.follow( next_page, callback=self.parse_ajax_page )
except json.JSONDecodeError: self.logger.error(f'Invalid JSON from {response.url}')
Summary and Next Steps
In this part, you’ve mastered advanced Scrapy techniques including:
✅ JavaScript rendering with Scrapy-Splash
✅ Form handling and authentication for protected content
✅ AJAX requests and API integration for dynamic data
✅ Advanced selector techniques for complex extraction
✅ Custom middleware development for rotation and retry logic
✅ Infinite scroll and dynamic loading handling
What’s Next?
In Part 3: Anti-Detection and Scaling, we’ll cover:
- Advanced anti-detection techniques
- Distributed scraping with Scrapy-Redis
- Monitoring and alerting systems
- Performance optimization strategies
- Legal compliance and ethical scraping
Practice Exercise
Build a spider that can handle a modern e-commerce site with:
- JavaScript-rendered product listings
- User authentication for member prices
- AJAX-loaded reviews and ratings
- Infinite scroll pagination
- Form-based search functionality
Happy scraping! 🕸️