How can you build a high-performance, fault-tolerant, and scalable web scraping framework in Python using
#Python #WebScraping #AsyncIO #Selenium #Redis #ProxyRotation #FaultTolerance #DistributedSystems #DynamicContent #RateLimiting #Scalability
By: @DataScienceQ🚀
aiohttp, selenium, asyncio, and redis to handle dynamic content, bypass anti-bot measures, and distribute crawling tasks across multiple workers? Provide a concise code example demonstrating advanced features such as rotating proxies, request rate limiting, error recovery, and distributed task queue management.import asyncio
import aiohttp
import redis
import json
import random
from typing import Dict, Any, List
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import time
# Configuration
REDIS_URL = "redis://localhost:6379/0"
PROXIES = ["https://proxy1:8080", "https://proxy2:8080"]
HEADERS = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
}
class AsyncWebScraper:
def __init__(self, redis_url: str):
self.redis_client = redis.from_url(redis_url)
self.session = None
self.proxy = None
async def setup_session(self):
"""Setup aiohttp session with proxy."""
self.session = aiohttp.ClientSession()
async def get_with_proxy(self, url: str) -> str:
"""Fetch URL with random proxy."""
self.proxy = random.choice(PROXIES)
headers = HEADERS.copy()
headers["Accept"] = "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8"
try:
async with self.session.get(url, headers=headers, proxy=self.proxy) as response:
return await response.text()
except Exception as e:
print(f"Request failed: {e}")
return None
async def scrape_with_selenium(self, url: str) -> str:
"""Scrape dynamic content using Selenium."""
options = Options()
options.add_argument("--headless")
options.add_argument("--no-sandbox")
options.add_argument("--disable-dev-shm-usage")
driver = webdriver.Chrome(options=options)
try:
driver.get(url)
WebDriverWait(driver, 10).until(
EC.presence_of_element_located((By.TAG_NAME, "body"))
)
return driver.page_source
finally:
driver.quit()
async def process_task(self, task_id: str, url: str):
"""Process individual scraping task."""
# Rate limiting
await asyncio.sleep(random.uniform(1, 3))
# Try HTTP first, fallback to Selenium
html = await self.get_with_proxy(url)
if not html:
html = await self.scrape_with_selenium(url)
# Store result
if html:
await self.redis_client.set(f"result:{task_id}", html)
async def worker_loop(self):
"""Worker that processes tasks from Redis queue."""
while True:
task = await self.redis_client.brpop("scraping_queue", timeout=5)
if task:
task_id, url = task[1].decode().split(":")
await self.process_task(task_id, url)
# Example usage
async def main():
scraper = AsyncWebScraper(REDIS_URL)
await scraper.setup_session()
# Add tasks to queue
for i in range(5):
await scraper.redis_client.lpush("scraping_queue", f"{i}:https://example.com")
# Start worker
await scraper.worker_loop()
asyncio.run(main())
#Python #WebScraping #AsyncIO #Selenium #Redis #ProxyRotation #FaultTolerance #DistributedSystems #DynamicContent #RateLimiting #Scalability
By: @DataScienceQ
Please open Telegram to view this post
VIEW IN TELEGRAM
❤2