Building a Distributed Scraping Pipeline with Rotating Proxies (Architecture Guide)
A single-machine scraper with a for-loop and rotating proxies works until it does not. The moment you need to scrape millions of pages across hundreds of sites with varying anti-bot protection, you need a distributed pipeline with proper job orchestration, failure handling, and proxy management.
This guide presents a production architecture for distributed scraping systems. It is based on patterns used by Hex Proxies customers running large-scale data collection operations, distilled into a reference architecture you can adapt to your workload.
Architecture Overview
┌──────────────┐
│ Scheduler │
│ (cron/API) │
└──────┬───────┘
│
▼
┌────────────────┐
│ Job Queue │
│ (Redis/SQS) │
└────────┬───────┘
│
┌──────────────┼──────────────┐
│ │ │
▼ ▼ ▼
┌──────────┐ ┌──────────┐ ┌──────────┐
│ Worker 1 │ │ Worker 2 │ │ Worker N │
│ │ │ │ │ │
│ Proxy │ │ Proxy │ │ Proxy │
│ Manager │ │ Manager │ │ Manager │
└────┬─────┘ └────┬─────┘ └────┬─────┘
│ │ │
▼ ▼ ▼
┌──────────────────────────────────────┐
│ Proxy Gateway │
│ (gate.hexproxies.com) │
│ │
│ Session routing, IP rotation, │
│ geo-targeting, protocol handling │
└──────────────────────────────────────┘
│
▼
┌────────────────┐
│ Target Sites │
└────────────────┘
│
┌──────────────┼──────────────┐
│ │ │
▼ ▼ ▼
┌──────────┐ ┌──────────┐ ┌──────────┐
│ Parser │ │ Parser │ │ Parser │
│ Workers │ │ Workers │ │ Workers │
└────┬─────┘ └────┬─────┘ └────┬─────┘
│ │ │
▼ ▼ ▼
┌──────────────────────────────────────┐
│ Storage Layer │
│ (S3/Postgres/Elasticsearch) │
└──────────────────────────────────────┘
The architecture separates concerns into five layers: scheduling, queuing, fetching, parsing, and storage. Each layer scales independently.
Layer 1: Job Queue and Scheduling
URL Frontier Design
The URL frontier is the core data structure -- it manages which URLs to scrape, when to scrape them, and in what priority order.
import hashlib
import json
import time
from dataclasses import dataclass, field
from typing import Optional
import redis
@dataclass(frozen=True)
class ScrapeJob:
"""Immutable scrape job definition."""
url: str
domain: str
priority: int # 1 (highest) to 10 (lowest)
retry_count: int = 0
max_retries: int = 3
created_at: float = field(default_factory=time.time)
proxy_type: str = "residential" # "residential" or "isp"
geo_target: Optional[str] = None # e.g., "us", "gb", "de"
requires_browser: bool = False
@property
def job_id(self):
"""Deterministic job ID from URL for deduplication."""
return hashlib.sha256(self.url.encode()).hexdigest()[:16]
def with_retry(self):
"""Return a new job with incremented retry count."""
return ScrapeJob(
url=self.url,
domain=self.domain,
priority=self.priority,
retry_count=self.retry_count + 1,
max_retries=self.max_retries,
created_at=self.created_at,
proxy_type=self.proxy_type,
geo_target=self.geo_target,
requires_browser=self.requires_browser,
)
def to_json(self):
return json.dumps({
"url": self.url,
"domain": self.domain,
"priority": self.priority,
"retry_count": self.retry_count,
"max_retries": self.max_retries,
"created_at": self.created_at,
"proxy_type": self.proxy_type,
"geo_target": self.geo_target,
"requires_browser": self.requires_browser,
})
@staticmethod
def from_json(data):
parsed = json.loads(data) if isinstance(data, str) else data
return ScrapeJob(**parsed)
class URLFrontier:
"""Priority-based URL frontier with per-domain rate limiting.
Uses Redis sorted sets for priority queuing and per-domain
rate limiting to prevent overwhelming individual targets.
"""
def __init__(self, redis_client, domain_delay_seconds=2.0):
self.redis = redis_client
self.domain_delay = domain_delay_seconds
self.queue_key = "scrape:queue"
self.seen_key = "scrape:seen"
self.domain_lock_prefix = "scrape:domain_lock:"
def enqueue(self, job):
"""Add a job to the frontier if not already seen."""
if self.redis.sismember(self.seen_key, job.job_id):
return False # Already scraped or in queue
# Add to priority queue (lower score = higher priority)
score = job.priority * 1e10 + job.created_at
self.redis.zadd(self.queue_key, {job.to_json(): score})
self.redis.sadd(self.seen_key, job.job_id)
return True
def dequeue(self):
"""Get the highest-priority job whose domain is not rate-limited.
Returns None if no eligible job is available.
"""
# Get candidates from the priority queue
candidates = self.redis.zrange(self.queue_key, 0, 49)
for candidate_data in candidates:
job = ScrapeJob.from_json(candidate_data)
lock_key = f"{self.domain_lock_prefix}{job.domain}"
# Check if domain is rate-limited
if self.redis.exists(lock_key):
continue # Domain is cooling down
# Claim this job (atomic remove)
removed = self.redis.zrem(self.queue_key, candidate_data)
if removed == 0:
continue # Another worker claimed it
# Set domain cooldown
self.redis.setex(lock_key, int(self.domain_delay), "1")
return job
return None # No eligible jobs right now
def requeue(self, job):
"""Requeue a failed job with incremented retry count."""
retried = job.with_retry()
if retried.retry_count > retried.max_retries:
self._send_to_dead_letter(retried)
return False
# Requeue with degraded priority
score = (retried.priority + retried.retry_count) * 1e10 + time.time()
self.redis.zadd(self.queue_key, {retried.to_json(): score})
return True
def _send_to_dead_letter(self, job):
"""Move permanently failed jobs to dead letter queue."""
self.redis.rpush("scrape:dead_letter", job.to_json())
def stats(self):
"""Return current frontier statistics."""
return {
"queue_size": self.redis.zcard(self.queue_key),
"seen_count": self.redis.scard(self.seen_key),
"dead_letter_count": self.redis.llen("scrape:dead_letter"),
}
Per-Domain Rate Limiting
The domain_delay parameter in the frontier is critical. Without per-domain rate limiting, a distributed system with 50 workers will hammer a single target with 50 concurrent requests -- guaranteed to trigger rate limiting and IP blocks.
Rule of thumb: Set the domain delay to the inverse of your target's tolerance. If a site blocks after 10 requests per second, set domain_delay to 0.2 seconds. For well-protected sites, 2-5 seconds between requests per domain is safer.
Per-domain rate limiting is independent of per-IP rate limiting. Even with rotating proxies providing fresh IPs, the target site still sees rapid requests from any IP if your aggregate rate is too high.
Layer 2: Worker Architecture
Worker Design
Each worker runs an event loop: dequeue a job, configure the proxy, fetch the page, handle errors, and report results.
import asyncio
import logging
from dataclasses import dataclass, field
from typing import Optional
import aiohttp
from aiohttp_socks import ProxyConnector
logger = logging.getLogger(__name__)
@dataclass(frozen=True)
class FetchResult:
"""Immutable result from a fetch operation."""
url: str
status_code: int
body: str
headers: dict
proxy_ip: str
latency_ms: float
success: bool
error: Optional[str] = None
@dataclass(frozen=True)
class ProxyConfig:
"""Immutable proxy configuration for a single request."""
url: str # Full proxy URL with auth
protocol: str # "http" or "socks5"
session_id: Optional[str] = None
def build_proxy_config(job, worker_id):
"""Build a proxy configuration for a given job.
Uses Hex Proxies session-based routing to control IP assignment.
Each unique session_id gets a unique IP; same session_id gets
the same IP (sticky session).
"""
base_user = "USER"
base_pass = "PASS"
# Build session identifier for sticky sessions
session_id = f"w{worker_id}-{job.domain}-{int(time.time() // 300)}"
# Construct proxy URL with session routing
geo_part = f"-country-{job.geo_target}" if job.geo_target else ""
type_part = f"-type-{job.proxy_type}"
session_part = f"-session-{session_id}"
proxy_user = f"{base_user}{type_part}{geo_part}{session_part}"
return ProxyConfig(
url=f"http://{proxy_user}:{base_pass}@gate.hexproxies.com:8080",
protocol="http",
session_id=session_id,
)
async def fetch_page(job, proxy_config, timeout_seconds=30):
"""Fetch a single page through the configured proxy.
Returns an immutable FetchResult regardless of success or failure.
"""
start = asyncio.get_event_loop().time()
try:
connector = aiohttp.TCPConnector(ssl=False)
timeout = aiohttp.ClientTimeout(total=timeout_seconds)
async with aiohttp.ClientSession(
connector=connector, timeout=timeout
) as session:
async with session.get(
job.url,
proxy=proxy_config.url,
headers={
"User-Agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/124.0.0.0 Safari/537.36"
),
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.9",
"Accept-Encoding": "gzip, deflate, br",
},
) as response:
body = await response.text()
elapsed = (asyncio.get_event_loop().time() - start) * 1000
# Check for soft blocks
is_blocked = _detect_soft_block(body)
return FetchResult(
url=job.url,
status_code=response.status,
body=body,
headers=dict(response.headers),
proxy_ip=proxy_config.session_id or "unknown",
latency_ms=round(elapsed, 1),
success=response.status == 200 and not is_blocked,
error="soft_block" if is_blocked else None,
)
except asyncio.TimeoutError:
elapsed = (asyncio.get_event_loop().time() - start) * 1000
return FetchResult(
url=job.url,
status_code=0,
body="",
headers={},
proxy_ip=proxy_config.session_id or "unknown",
latency_ms=round(elapsed, 1),
success=False,
error="timeout",
)
except Exception as exc:
elapsed = (asyncio.get_event_loop().time() - start) * 1000
logger.error("Fetch error for %s: %s", job.url, str(exc))
return FetchResult(
url=job.url,
status_code=0,
body="",
headers={},
proxy_ip=proxy_config.session_id or "unknown",
latency_ms=round(elapsed, 1),
success=False,
error=str(exc),
)
def _detect_soft_block(body):
"""Check response body for anti-bot challenge indicators."""
if not body or len(body) < 100:
return True # Suspiciously empty
indicators = [
"captcha", "cf-challenge", "challenge-platform",
"px-captcha", "hcaptcha.com", "recaptcha",
]
body_lower = body.lower()
return any(ind in body_lower for ind in indicators)
async def worker_loop(worker_id, frontier, result_queue):
"""Main worker event loop.
Continuously dequeues jobs, fetches pages, and handles results.
"""
logger.info("Worker %d started", worker_id)
while True:
job = frontier.dequeue()
if job is None:
await asyncio.sleep(1) # No eligible jobs, back off
continue
proxy_config = build_proxy_config(job, worker_id)
result = await fetch_page(job, proxy_config)
if result.success:
await result_queue.put(result)
logger.info(
"Worker %d: OK %s (%.0fms)",
worker_id, job.url, result.latency_ms,
)
else:
logger.warning(
"Worker %d: FAIL %s (error=%s, retries=%d/%d)",
worker_id, job.url, result.error,
job.retry_count, job.max_retries,
)
frontier.requeue(job)
Worker Scaling
| Scraping Volume | Workers | Proxy Type | Estimated Throughput |
|---|---|---|---|
| < 10K pages/day | 2-5 | Residential rotating | 5-10 pages/min |
| 10K - 100K pages/day | 10-20 | Residential rotating | 50-100 pages/min |
| 100K - 1M pages/day | 20-50 | Mixed (ISP + residential) | 200-500 pages/min |
| > 1M pages/day | 50-200 | ISP (sticky) + residential (rotating) | 500+ pages/min |
Layer 3: Proxy Management
Rotation Strategies
The proxy management layer decides which proxy IP to use for each request. The strategy depends on the target site's detection level.
Per-request rotation (low-protection targets):
# Hex Proxies: omit session parameter for automatic per-request rotation
proxy_url = "http://USER:PASS@gate.hexproxies.com:8080"
# Every request through this proxy gets a different IP
Sticky sessions with timed rotation (medium-protection targets):
import time
def get_sticky_proxy(domain, rotation_interval_seconds=300):
"""Get a proxy with sticky session that rotates on a schedule.
Same domain + same time window = same IP.
New time window = new IP.
"""
time_bucket = int(time.time() // rotation_interval_seconds)
session_id = f"{domain}-{time_bucket}"
return f"http://USER-session-{session_id}:PASS@gate.hexproxies.com:8080"
Per-domain sticky sessions (high-protection targets):
def get_domain_sticky_proxy(domain, worker_id):
"""Assign a dedicated IP to each domain per worker.
The worker maintains the same IP for all requests to this domain
for the duration of the session, mimicking a real user's browsing
pattern.
"""
session_id = f"w{worker_id}-{domain}"
return f"http://USER-session-{session_id}:PASS@gate.hexproxies.com:8080"
IP Health Tracking
Track the success rate per proxy session to detect IPs that have been flagged:
from collections import defaultdict
from dataclasses import dataclass, field
import time
@dataclass(frozen=True)
class IPHealthSnapshot:
"""Immutable snapshot of an IP's health metrics."""
session_id: str
total_requests: int
successful_requests: int
last_success_at: float
last_failure_at: float
consecutive_failures: int
@property
def success_rate(self):
if self.total_requests == 0:
return 1.0
return self.successful_requests / self.total_requests
@property
def is_healthy(self):
"""An IP is unhealthy if success rate drops below 50%
or it has 5+ consecutive failures."""
if self.total_requests < 5:
return True # Not enough data
return (
self.success_rate >= 0.5
and self.consecutive_failures < 5
)
class IPHealthTracker:
"""Track proxy IP health metrics to detect burned IPs."""
def __init__(self):
self._data = {} # session_id -> mutable internal state
def record_result(self, session_id, success):
"""Record a request result for a proxy session."""
now = time.time()
if session_id not in self._data:
self._data[session_id] = {
"total": 0,
"successes": 0,
"last_success": 0.0,
"last_failure": 0.0,
"consecutive_failures": 0,
}
state = self._data[session_id]
# Create new state (conceptual immutability)
new_state = {
"total": state["total"] + 1,
"successes": state["successes"] + (1 if success else 0),
"last_success": now if success else state["last_success"],
"last_failure": now if not success else state["last_failure"],
"consecutive_failures": (
0 if success else state["consecutive_failures"] + 1
),
}
self._data[session_id] = new_state
def get_snapshot(self, session_id):
"""Get an immutable health snapshot for a session."""
state = self._data.get(session_id)
if state is None:
return None
return IPHealthSnapshot(
session_id=session_id,
total_requests=state["total"],
successful_requests=state["successes"],
last_success_at=state["last_success"],
last_failure_at=state["last_failure"],
consecutive_failures=state["consecutive_failures"],
)
def should_rotate(self, session_id):
"""Check if a session's IP should be rotated due to poor health."""
snapshot = self.get_snapshot(session_id)
if snapshot is None:
return False
return not snapshot.is_healthy
Layer 4: Parsing and Storage
Separating Fetch from Parse
Parse operations should be decoupled from fetch operations for three reasons:
- Different resource profiles. Fetching is I/O-bound (network); parsing is CPU-bound (DOM traversal, regex). They scale differently.
- Retry isolation. If parsing fails, you can re-parse the cached raw HTML without re-fetching (which consumes proxy bandwidth).
- Schema evolution. When you change what data you extract, you re-parse stored HTML without re-scraping.
Fetch Workers → Raw HTML → S3/Blob Storage
│
▼
Parse Workers → Structured Data → PostgreSQL
Storage Strategy
| Data Type | Storage | Retention |
|---|---|---|
| Raw HTML responses | S3 / object storage | 30-90 days |
| Parsed structured data | PostgreSQL / Elasticsearch | Permanent |
| Fetch metadata (latency, status, proxy used) | ClickHouse / TimescaleDB | 90 days |
| Dead letter jobs (permanently failed) | Redis → periodic export | Until reviewed |
Scaling Patterns
Horizontal Scaling
Add more workers behind the same job queue. Redis and SQS both support multiple consumers natively. Each worker is stateless -- it gets a job, does the work, reports the result.
Auto-scaling trigger: Scale workers based on queue depth. If the queue grows faster than workers drain it, add workers. If workers sit idle (queue is empty), remove workers.
Vertical Scaling
Before adding workers, maximize each worker's throughput:
- Use async I/O (aiohttp, httpx async) to handle multiple requests concurrently within a single worker
- Parse in a separate process pool to avoid blocking the event loop
- Use connection pooling to the proxy gateway (persistent connections reduce handshake overhead)
Geographic Distribution
For global scraping, deploy workers near your proxy exit points:
US targets → US worker cluster → Hex Proxies US pool
EU targets → EU worker cluster → Hex Proxies EU pool
Asia targets → Asia worker cluster → Hex Proxies Asia pool
This minimizes the double-hop latency (worker → proxy → target) by colocating workers with the proxy exit points.
Monitoring and Observability
Key Metrics to Track
| Metric | Alert Threshold | Action |
|---|---|---|
| Queue depth | > 100K pending jobs | Scale up workers |
| Success rate (global) | < 85% | Investigate target blocks |
| Success rate (per domain) | < 50% | Pause domain, adjust strategy |
| P95 latency | > 5 seconds | Check proxy performance |
| Dead letter rate | > 5% of total jobs | Review failure patterns |
| Proxy bandwidth usage | > 80% of plan | Upgrade plan or optimize |
Error Classification
Classify failures to identify systemic issues:
def classify_failure(result):
"""Classify a failed fetch result for monitoring."""
if result.error == "timeout":
return "timeout" # Proxy or target slow
if result.error == "soft_block":
return "anti_bot" # Detected and challenged
if result.status_code == 403:
return "ip_blocked" # IP reputation issue
if result.status_code == 429:
return "rate_limited" # Too fast
if result.status_code >= 500:
return "target_error" # Target site issue
if "connection" in str(result.error).lower():
return "proxy_error" # Proxy connectivity issue
return "unknown"
If "ip_blocked" errors spike, your proxy IPs may be overused -- switch to ISP proxies with sticky sessions or increase rotation frequency. If "rate_limited" errors spike, slow down your per-domain rate. If "anti_bot" errors spike, consider upgrading to browser-based fetching for that domain.
For more on optimizing proxy performance in scraping pipelines, see our performance optimization guide and web automation use case page.
Frequently Asked Questions
How many concurrent connections should I use per proxy IP?
For rotating proxies, 1 connection per IP at a time is ideal since each request gets a fresh IP. For sticky sessions, limit to 3-5 concurrent connections per IP to avoid appearing non-human. See our concurrent connections guide for detailed analysis.
Should I use residential or ISP proxies for large-scale scraping?
Use residential rotating proxies for breadth (millions of pages across many sites at moderate protection). Use ISP sticky proxies for depth (thousands of pages on heavily protected sites). Many pipelines use both: residential for discovery crawling, ISP for targeted extraction. See our ISP vs. residential comparison.
How do I handle JavaScript-rendered pages in a distributed pipeline?
Replace aiohttp fetch workers with headless browser workers (Playwright/Puppeteer). These are heavier (one browser instance per worker vs. hundreds of connections per HTTP worker), so use them only for targets that require JavaScript rendering. Tag jobs with requires_browser=True and route them to browser workers.
What is the cost to scrape 1 million pages?
With residential proxies at $4.25/GB and an average page size of 200 KB: 1M pages 200 KB = 200 GB $4.25 = $850. With ISP proxies (unlimited bandwidth) at $2.08/IP, you need enough IPs for your concurrency and rotation needs. For 50 concurrent connections with hourly rotation: 50 IPs * $2.08 = $104/month. The optimal choice depends on your target mix and rotation requirements.
A well-architected scraping pipeline turns proxy bandwidth into structured data reliably and cost-effectively. Hex Proxies provides the proxy layer -- residential rotation at $4.25/GB and ISP sticky sessions at $2.08/IP, with the session-based routing shown in this guide. See pricing or explore our Scrapy integration to start building.