Python Data Science Jobs & Interviews
20.3K subscribers
187 photos
4 videos
25 files
325 links
Your go-to hub for Python and Data Science—featuring questions, answers, quizzes, and interview tips to sharpen your skills and boost your career in the data-driven world.

Admin: @Hussein_Sheikho
Download Telegram
How can you build a high-performance, fault-tolerant, and scalable web scraping framework in Python using aiohttp, selenium, asyncio, and redis to handle dynamic content, bypass anti-bot measures, and distribute crawling tasks across multiple workers? Provide a concise code example demonstrating advanced features such as rotating proxies, request rate limiting, error recovery, and distributed task queue management.

import asyncio
import aiohttp
import redis
import json
import random
from typing import Dict, Any, List
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import time

# Configuration
REDIS_URL = "redis://localhost:6379/0"
PROXIES = ["https://proxy1:8080", "https://proxy2:8080"]
HEADERS = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
}

class AsyncWebScraper:
def __init__(self, redis_url: str):
self.redis_client = redis.from_url(redis_url)
self.session = None
self.proxy = None

async def setup_session(self):
"""Setup aiohttp session with proxy."""
self.session = aiohttp.ClientSession()

async def get_with_proxy(self, url: str) -> str:
"""Fetch URL with random proxy."""
self.proxy = random.choice(PROXIES)
headers = HEADERS.copy()
headers["Accept"] = "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8"

try:
async with self.session.get(url, headers=headers, proxy=self.proxy) as response:
return await response.text()
except Exception as e:
print(f"Request failed: {e}")
return None

async def scrape_with_selenium(self, url: str) -> str:
"""Scrape dynamic content using Selenium."""
options = Options()
options.add_argument("--headless")
options.add_argument("--no-sandbox")
options.add_argument("--disable-dev-shm-usage")

driver = webdriver.Chrome(options=options)
try:
driver.get(url)
WebDriverWait(driver, 10).until(
EC.presence_of_element_located((By.TAG_NAME, "body"))
)
return driver.page_source
finally:
driver.quit()

async def process_task(self, task_id: str, url: str):
"""Process individual scraping task."""
# Rate limiting
await asyncio.sleep(random.uniform(1, 3))

# Try HTTP first, fallback to Selenium
html = await self.get_with_proxy(url)
if not html:
html = await self.scrape_with_selenium(url)

# Store result
if html:
await self.redis_client.set(f"result:{task_id}", html)

async def worker_loop(self):
"""Worker that processes tasks from Redis queue."""
while True:
task = await self.redis_client.brpop("scraping_queue", timeout=5)
if task:
task_id, url = task[1].decode().split(":")
await self.process_task(task_id, url)

# Example usage
async def main():
scraper = AsyncWebScraper(REDIS_URL)
await scraper.setup_session()

# Add tasks to queue
for i in range(5):
await scraper.redis_client.lpush("scraping_queue", f"{i}:https://example.com")

# Start worker
await scraper.worker_loop()

asyncio.run(main())


#Python #WebScraping #AsyncIO #Selenium #Redis #ProxyRotation #FaultTolerance #DistributedSystems #DynamicContent #RateLimiting #Scalability

By: @DataScienceQ 🚀
Please open Telegram to view this post
VIEW IN TELEGRAM
2