v1.8.91-d84675c
Web ScrapingArchitecture

Building a Distributed Scraping Pipeline with Rotating Proxies (Architecture Guide)

14 min read

By Hex Proxies Engineering Team

Building a Distributed Scraping Pipeline with Rotating Proxies (Architecture Guide)

A single-machine scraper with a for-loop and rotating proxies works until it does not. The moment you need to scrape millions of pages across hundreds of sites with varying anti-bot protection, you need a distributed pipeline with proper job orchestration, failure handling, and proxy management.

This guide presents a production architecture for distributed scraping systems. It is based on patterns used by Hex Proxies customers running large-scale data collection operations, distilled into a reference architecture you can adapt to your workload.

Architecture Overview

┌──────────────┐
                            │  Scheduler   │
                            │  (cron/API)  │
                            └──────┬───────┘
                                   │
                                   ▼
                          ┌────────────────┐
                          │  Job Queue     │
                          │  (Redis/SQS)   │
                          └────────┬───────┘
                                   │
                    ┌──────────────┼──────────────┐
                    │              │              │
                    ▼              ▼              ▼
              ┌──────────┐  ┌──────────┐  ┌──────────┐
              │ Worker 1 │  │ Worker 2 │  │ Worker N │
              │          │  │          │  │          │
              │  Proxy   │  │  Proxy   │  │  Proxy   │
              │  Manager │  │  Manager │  │  Manager │
              └────┬─────┘  └────┬─────┘  └────┬─────┘
                   │              │              │
                   ▼              ▼              ▼
              ┌──────────────────────────────────────┐
              │         Proxy Gateway                 │
              │    (gate.hexproxies.com)              │
              │                                       │
              │  Session routing, IP rotation,        │
              │  geo-targeting, protocol handling      │
              └──────────────────────────────────────┘
                                   │
                                   ▼
                          ┌────────────────┐
                          │ Target Sites   │
                          └────────────────┘
                                   │
                    ┌──────────────┼──────────────┐
                    │              │              │
                    ▼              ▼              ▼
              ┌──────────┐  ┌──────────┐  ┌──────────┐
              │  Parser  │  │  Parser  │  │  Parser  │
              │  Workers │  │  Workers │  │  Workers │
              └────┬─────┘  └────┬─────┘  └────┬─────┘
                   │              │              │
                   ▼              ▼              ▼
              ┌──────────────────────────────────────┐
              │         Storage Layer                 │
              │   (S3/Postgres/Elasticsearch)         │
              └──────────────────────────────────────┘

The architecture separates concerns into five layers: scheduling, queuing, fetching, parsing, and storage. Each layer scales independently.

Layer 1: Job Queue and Scheduling

URL Frontier Design

The URL frontier is the core data structure -- it manages which URLs to scrape, when to scrape them, and in what priority order.

import hashlib
import json
import time
from dataclasses import dataclass, field
from typing import Optional
import redis


@dataclass(frozen=True)
class ScrapeJob:
    """Immutable scrape job definition."""
    url: str
    domain: str
    priority: int  # 1 (highest) to 10 (lowest)
    retry_count: int = 0
    max_retries: int = 3
    created_at: float = field(default_factory=time.time)
    proxy_type: str = "residential"  # "residential" or "isp"
    geo_target: Optional[str] = None  # e.g., "us", "gb", "de"
    requires_browser: bool = False
    
    @property
    def job_id(self):
        """Deterministic job ID from URL for deduplication."""
        return hashlib.sha256(self.url.encode()).hexdigest()[:16]
    
    def with_retry(self):
        """Return a new job with incremented retry count."""
        return ScrapeJob(
            url=self.url,
            domain=self.domain,
            priority=self.priority,
            retry_count=self.retry_count + 1,
            max_retries=self.max_retries,
            created_at=self.created_at,
            proxy_type=self.proxy_type,
            geo_target=self.geo_target,
            requires_browser=self.requires_browser,
        )
    
    def to_json(self):
        return json.dumps({
            "url": self.url,
            "domain": self.domain,
            "priority": self.priority,
            "retry_count": self.retry_count,
            "max_retries": self.max_retries,
            "created_at": self.created_at,
            "proxy_type": self.proxy_type,
            "geo_target": self.geo_target,
            "requires_browser": self.requires_browser,
        })
    
    @staticmethod
    def from_json(data):
        parsed = json.loads(data) if isinstance(data, str) else data
        return ScrapeJob(**parsed)


class URLFrontier:
    """Priority-based URL frontier with per-domain rate limiting.
    
    Uses Redis sorted sets for priority queuing and per-domain
    rate limiting to prevent overwhelming individual targets.
    """
    
    def __init__(self, redis_client, domain_delay_seconds=2.0):
        self.redis = redis_client
        self.domain_delay = domain_delay_seconds
        self.queue_key = "scrape:queue"
        self.seen_key = "scrape:seen"
        self.domain_lock_prefix = "scrape:domain_lock:"
    
    def enqueue(self, job):
        """Add a job to the frontier if not already seen."""
        if self.redis.sismember(self.seen_key, job.job_id):
            return False  # Already scraped or in queue
        
        # Add to priority queue (lower score = higher priority)
        score = job.priority * 1e10 + job.created_at
        self.redis.zadd(self.queue_key, {job.to_json(): score})
        self.redis.sadd(self.seen_key, job.job_id)
        return True
    
    def dequeue(self):
        """Get the highest-priority job whose domain is not rate-limited.
        
        Returns None if no eligible job is available.
        """
        # Get candidates from the priority queue
        candidates = self.redis.zrange(self.queue_key, 0, 49)
        
        for candidate_data in candidates:
            job = ScrapeJob.from_json(candidate_data)
            lock_key = f"{self.domain_lock_prefix}{job.domain}"
            
            # Check if domain is rate-limited
            if self.redis.exists(lock_key):
                continue  # Domain is cooling down
            
            # Claim this job (atomic remove)
            removed = self.redis.zrem(self.queue_key, candidate_data)
            if removed == 0:
                continue  # Another worker claimed it
            
            # Set domain cooldown
            self.redis.setex(lock_key, int(self.domain_delay), "1")
            
            return job
        
        return None  # No eligible jobs right now
    
    def requeue(self, job):
        """Requeue a failed job with incremented retry count."""
        retried = job.with_retry()
        if retried.retry_count > retried.max_retries:
            self._send_to_dead_letter(retried)
            return False
        
        # Requeue with degraded priority
        score = (retried.priority + retried.retry_count) * 1e10 + time.time()
        self.redis.zadd(self.queue_key, {retried.to_json(): score})
        return True
    
    def _send_to_dead_letter(self, job):
        """Move permanently failed jobs to dead letter queue."""
        self.redis.rpush("scrape:dead_letter", job.to_json())
    
    def stats(self):
        """Return current frontier statistics."""
        return {
            "queue_size": self.redis.zcard(self.queue_key),
            "seen_count": self.redis.scard(self.seen_key),
            "dead_letter_count": self.redis.llen("scrape:dead_letter"),
        }

Per-Domain Rate Limiting

The domain_delay parameter in the frontier is critical. Without per-domain rate limiting, a distributed system with 50 workers will hammer a single target with 50 concurrent requests -- guaranteed to trigger rate limiting and IP blocks.

Rule of thumb: Set the domain delay to the inverse of your target's tolerance. If a site blocks after 10 requests per second, set domain_delay to 0.2 seconds. For well-protected sites, 2-5 seconds between requests per domain is safer.

Per-domain rate limiting is independent of per-IP rate limiting. Even with rotating proxies providing fresh IPs, the target site still sees rapid requests from any IP if your aggregate rate is too high.

Layer 2: Worker Architecture

Worker Design

Each worker runs an event loop: dequeue a job, configure the proxy, fetch the page, handle errors, and report results.

import asyncio
import logging
from dataclasses import dataclass, field
from typing import Optional

import aiohttp
from aiohttp_socks import ProxyConnector


logger = logging.getLogger(__name__)


@dataclass(frozen=True)
class FetchResult:
    """Immutable result from a fetch operation."""
    url: str
    status_code: int
    body: str
    headers: dict
    proxy_ip: str
    latency_ms: float
    success: bool
    error: Optional[str] = None


@dataclass(frozen=True)
class ProxyConfig:
    """Immutable proxy configuration for a single request."""
    url: str  # Full proxy URL with auth
    protocol: str  # "http" or "socks5"
    session_id: Optional[str] = None


def build_proxy_config(job, worker_id):
    """Build a proxy configuration for a given job.
    
    Uses Hex Proxies session-based routing to control IP assignment.
    Each unique session_id gets a unique IP; same session_id gets 
    the same IP (sticky session).
    """
    base_user = "USER"
    base_pass = "PASS"
    
    # Build session identifier for sticky sessions
    session_id = f"w{worker_id}-{job.domain}-{int(time.time() // 300)}"
    
    # Construct proxy URL with session routing
    geo_part = f"-country-{job.geo_target}" if job.geo_target else ""
    type_part = f"-type-{job.proxy_type}"
    session_part = f"-session-{session_id}"
    
    proxy_user = f"{base_user}{type_part}{geo_part}{session_part}"
    
    return ProxyConfig(
        url=f"http://{proxy_user}:{base_pass}@gate.hexproxies.com:8080",
        protocol="http",
        session_id=session_id,
    )


async def fetch_page(job, proxy_config, timeout_seconds=30):
    """Fetch a single page through the configured proxy.
    
    Returns an immutable FetchResult regardless of success or failure.
    """
    start = asyncio.get_event_loop().time()
    
    try:
        connector = aiohttp.TCPConnector(ssl=False)
        timeout = aiohttp.ClientTimeout(total=timeout_seconds)
        
        async with aiohttp.ClientSession(
            connector=connector, timeout=timeout
        ) as session:
            async with session.get(
                job.url,
                proxy=proxy_config.url,
                headers={
                    "User-Agent": (
                        "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
                        "AppleWebKit/537.36 (KHTML, like Gecko) "
                        "Chrome/124.0.0.0 Safari/537.36"
                    ),
                    "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
                    "Accept-Language": "en-US,en;q=0.9",
                    "Accept-Encoding": "gzip, deflate, br",
                },
            ) as response:
                body = await response.text()
                elapsed = (asyncio.get_event_loop().time() - start) * 1000
                
                # Check for soft blocks
                is_blocked = _detect_soft_block(body)
                
                return FetchResult(
                    url=job.url,
                    status_code=response.status,
                    body=body,
                    headers=dict(response.headers),
                    proxy_ip=proxy_config.session_id or "unknown",
                    latency_ms=round(elapsed, 1),
                    success=response.status == 200 and not is_blocked,
                    error="soft_block" if is_blocked else None,
                )
    
    except asyncio.TimeoutError:
        elapsed = (asyncio.get_event_loop().time() - start) * 1000
        return FetchResult(
            url=job.url,
            status_code=0,
            body="",
            headers={},
            proxy_ip=proxy_config.session_id or "unknown",
            latency_ms=round(elapsed, 1),
            success=False,
            error="timeout",
        )
    
    except Exception as exc:
        elapsed = (asyncio.get_event_loop().time() - start) * 1000
        logger.error("Fetch error for %s: %s", job.url, str(exc))
        return FetchResult(
            url=job.url,
            status_code=0,
            body="",
            headers={},
            proxy_ip=proxy_config.session_id or "unknown",
            latency_ms=round(elapsed, 1),
            success=False,
            error=str(exc),
        )


def _detect_soft_block(body):
    """Check response body for anti-bot challenge indicators."""
    if not body or len(body) < 100:
        return True  # Suspiciously empty
    
    indicators = [
        "captcha", "cf-challenge", "challenge-platform",
        "px-captcha", "hcaptcha.com", "recaptcha",
    ]
    body_lower = body.lower()
    return any(ind in body_lower for ind in indicators)


async def worker_loop(worker_id, frontier, result_queue):
    """Main worker event loop.
    
    Continuously dequeues jobs, fetches pages, and handles results.
    """
    logger.info("Worker %d started", worker_id)
    
    while True:
        job = frontier.dequeue()
        
        if job is None:
            await asyncio.sleep(1)  # No eligible jobs, back off
            continue
        
        proxy_config = build_proxy_config(job, worker_id)
        result = await fetch_page(job, proxy_config)
        
        if result.success:
            await result_queue.put(result)
            logger.info(
                "Worker %d: OK %s (%.0fms)",
                worker_id, job.url, result.latency_ms,
            )
        else:
            logger.warning(
                "Worker %d: FAIL %s (error=%s, retries=%d/%d)",
                worker_id, job.url, result.error,
                job.retry_count, job.max_retries,
            )
            frontier.requeue(job)

Worker Scaling

Scraping VolumeWorkersProxy TypeEstimated Throughput
< 10K pages/day2-5Residential rotating5-10 pages/min
10K - 100K pages/day10-20Residential rotating50-100 pages/min
100K - 1M pages/day20-50Mixed (ISP + residential)200-500 pages/min
> 1M pages/day50-200ISP (sticky) + residential (rotating)500+ pages/min
These estimates assume per-domain rate limiting of 1 request every 2-3 seconds and diverse target domains. Single-domain scraping requires fewer workers but more aggressive proxy rotation.

Layer 3: Proxy Management

Rotation Strategies

The proxy management layer decides which proxy IP to use for each request. The strategy depends on the target site's detection level.

Per-request rotation (low-protection targets):

# Hex Proxies: omit session parameter for automatic per-request rotation
proxy_url = "http://USER:PASS@gate.hexproxies.com:8080"
# Every request through this proxy gets a different IP

Sticky sessions with timed rotation (medium-protection targets):

import time

def get_sticky_proxy(domain, rotation_interval_seconds=300):
    """Get a proxy with sticky session that rotates on a schedule.
    
    Same domain + same time window = same IP.
    New time window = new IP.
    """
    time_bucket = int(time.time() // rotation_interval_seconds)
    session_id = f"{domain}-{time_bucket}"
    return f"http://USER-session-{session_id}:PASS@gate.hexproxies.com:8080"

Per-domain sticky sessions (high-protection targets):

def get_domain_sticky_proxy(domain, worker_id):
    """Assign a dedicated IP to each domain per worker.
    
    The worker maintains the same IP for all requests to this domain
    for the duration of the session, mimicking a real user's browsing
    pattern.
    """
    session_id = f"w{worker_id}-{domain}"
    return f"http://USER-session-{session_id}:PASS@gate.hexproxies.com:8080"

IP Health Tracking

Track the success rate per proxy session to detect IPs that have been flagged:

from collections import defaultdict
from dataclasses import dataclass, field
import time


@dataclass(frozen=True)
class IPHealthSnapshot:
    """Immutable snapshot of an IP's health metrics."""
    session_id: str
    total_requests: int
    successful_requests: int
    last_success_at: float
    last_failure_at: float
    consecutive_failures: int
    
    @property
    def success_rate(self):
        if self.total_requests == 0:
            return 1.0
        return self.successful_requests / self.total_requests
    
    @property
    def is_healthy(self):
        """An IP is unhealthy if success rate drops below 50%
        or it has 5+ consecutive failures."""
        if self.total_requests < 5:
            return True  # Not enough data
        return (
            self.success_rate >= 0.5
            and self.consecutive_failures < 5
        )


class IPHealthTracker:
    """Track proxy IP health metrics to detect burned IPs."""
    
    def __init__(self):
        self._data = {}  # session_id -> mutable internal state
    
    def record_result(self, session_id, success):
        """Record a request result for a proxy session."""
        now = time.time()
        
        if session_id not in self._data:
            self._data[session_id] = {
                "total": 0,
                "successes": 0,
                "last_success": 0.0,
                "last_failure": 0.0,
                "consecutive_failures": 0,
            }
        
        state = self._data[session_id]
        
        # Create new state (conceptual immutability)
        new_state = {
            "total": state["total"] + 1,
            "successes": state["successes"] + (1 if success else 0),
            "last_success": now if success else state["last_success"],
            "last_failure": now if not success else state["last_failure"],
            "consecutive_failures": (
                0 if success else state["consecutive_failures"] + 1
            ),
        }
        self._data[session_id] = new_state
    
    def get_snapshot(self, session_id):
        """Get an immutable health snapshot for a session."""
        state = self._data.get(session_id)
        if state is None:
            return None
        
        return IPHealthSnapshot(
            session_id=session_id,
            total_requests=state["total"],
            successful_requests=state["successes"],
            last_success_at=state["last_success"],
            last_failure_at=state["last_failure"],
            consecutive_failures=state["consecutive_failures"],
        )
    
    def should_rotate(self, session_id):
        """Check if a session's IP should be rotated due to poor health."""
        snapshot = self.get_snapshot(session_id)
        if snapshot is None:
            return False
        return not snapshot.is_healthy

Layer 4: Parsing and Storage

Separating Fetch from Parse

Parse operations should be decoupled from fetch operations for three reasons:

  1. Different resource profiles. Fetching is I/O-bound (network); parsing is CPU-bound (DOM traversal, regex). They scale differently.
  2. Retry isolation. If parsing fails, you can re-parse the cached raw HTML without re-fetching (which consumes proxy bandwidth).
  3. Schema evolution. When you change what data you extract, you re-parse stored HTML without re-scraping.
Fetch Workers → Raw HTML → S3/Blob Storage
                              │
                              ▼
                         Parse Workers → Structured Data → PostgreSQL

Storage Strategy

Data TypeStorageRetention
Raw HTML responsesS3 / object storage30-90 days
Parsed structured dataPostgreSQL / ElasticsearchPermanent
Fetch metadata (latency, status, proxy used)ClickHouse / TimescaleDB90 days
Dead letter jobs (permanently failed)Redis → periodic exportUntil reviewed

Scaling Patterns

Horizontal Scaling

Add more workers behind the same job queue. Redis and SQS both support multiple consumers natively. Each worker is stateless -- it gets a job, does the work, reports the result.

Auto-scaling trigger: Scale workers based on queue depth. If the queue grows faster than workers drain it, add workers. If workers sit idle (queue is empty), remove workers.

Vertical Scaling

Before adding workers, maximize each worker's throughput:


  • Use async I/O (aiohttp, httpx async) to handle multiple requests concurrently within a single worker

  • Parse in a separate process pool to avoid blocking the event loop

  • Use connection pooling to the proxy gateway (persistent connections reduce handshake overhead)


Geographic Distribution

For global scraping, deploy workers near your proxy exit points:

US targets  → US worker cluster → Hex Proxies US pool
EU targets  → EU worker cluster → Hex Proxies EU pool
Asia targets → Asia worker cluster → Hex Proxies Asia pool

This minimizes the double-hop latency (worker → proxy → target) by colocating workers with the proxy exit points.

Monitoring and Observability

Key Metrics to Track

MetricAlert ThresholdAction
Queue depth> 100K pending jobsScale up workers
Success rate (global)< 85%Investigate target blocks
Success rate (per domain)< 50%Pause domain, adjust strategy
P95 latency> 5 secondsCheck proxy performance
Dead letter rate> 5% of total jobsReview failure patterns
Proxy bandwidth usage> 80% of planUpgrade plan or optimize

Error Classification

Classify failures to identify systemic issues:

def classify_failure(result):
    """Classify a failed fetch result for monitoring."""
    if result.error == "timeout":
        return "timeout"  # Proxy or target slow
    if result.error == "soft_block":
        return "anti_bot"  # Detected and challenged
    if result.status_code == 403:
        return "ip_blocked"  # IP reputation issue
    if result.status_code == 429:
        return "rate_limited"  # Too fast
    if result.status_code >= 500:
        return "target_error"  # Target site issue
    if "connection" in str(result.error).lower():
        return "proxy_error"  # Proxy connectivity issue
    return "unknown"

If "ip_blocked" errors spike, your proxy IPs may be overused -- switch to ISP proxies with sticky sessions or increase rotation frequency. If "rate_limited" errors spike, slow down your per-domain rate. If "anti_bot" errors spike, consider upgrading to browser-based fetching for that domain.

For more on optimizing proxy performance in scraping pipelines, see our performance optimization guide and web automation use case page.

Frequently Asked Questions

How many concurrent connections should I use per proxy IP?

For rotating proxies, 1 connection per IP at a time is ideal since each request gets a fresh IP. For sticky sessions, limit to 3-5 concurrent connections per IP to avoid appearing non-human. See our concurrent connections guide for detailed analysis.

Should I use residential or ISP proxies for large-scale scraping?

Use residential rotating proxies for breadth (millions of pages across many sites at moderate protection). Use ISP sticky proxies for depth (thousands of pages on heavily protected sites). Many pipelines use both: residential for discovery crawling, ISP for targeted extraction. See our ISP vs. residential comparison.

How do I handle JavaScript-rendered pages in a distributed pipeline?

Replace aiohttp fetch workers with headless browser workers (Playwright/Puppeteer). These are heavier (one browser instance per worker vs. hundreds of connections per HTTP worker), so use them only for targets that require JavaScript rendering. Tag jobs with requires_browser=True and route them to browser workers.

What is the cost to scrape 1 million pages?

With residential proxies at $4.25/GB and an average page size of 200 KB: 1M pages 200 KB = 200 GB $4.25 = $850. With ISP proxies (unlimited bandwidth) at $2.08/IP, you need enough IPs for your concurrency and rotation needs. For 50 concurrent connections with hourly rotation: 50 IPs * $2.08 = $104/month. The optimal choice depends on your target mix and rotation requirements.


A well-architected scraping pipeline turns proxy bandwidth into structured data reliably and cost-effectively. Hex Proxies provides the proxy layer -- residential rotation at $4.25/GB and ISP sticky sessions at $2.08/IP, with the session-based routing shown in this guide. See pricing or explore our Scrapy integration to start building.

Cookie Preferences

We use cookies to ensure the best experience. You can customize your preferences below. Learn more