Introduction
Data extraction pipelines are the foundation of modern data-driven applications. When they fail or slow down, entire business processes grind to a halt.
Prerequisites
Architecture Overview
A production-grade pipeline consists of these layers:
python
# pipeline/extractor.py
import asyncio
from dataclasses import dataclass
from typing import AsyncIterator
@dataclass
class ExtractionConfig:
source_url: str
batch_size: int = 1000
max_retries: int = 3
timeout: float = 30.0
async def extract_batch(
config: ExtractionConfig,
offset: int
) -> list[dict]:
"""Extract a single batch with retry logic."""
for attempt in range(config.max_retries):
try:
async with aiohttp.ClientSession() as session:
async with session.get(
config.source_url,
params={'offset': offset, 'limit': config.batch_size},
timeout=aiohttp.ClientTimeout(total=config.timeout)
) as response:
return await response.json()
except Exception as e:
if attempt == config.max_retries - 1:
raise
await asyncio.sleep(2 ** attempt)Horizontal Scaling with Celery
python
# tasks/extraction_tasks.py
from celery import Celery
app = Celery('pipeline', broker='redis://localhost:6379/0')
@app.task(bind=True, max_retries=3)
def extract_partition(self, partition_id: int, config: dict):
"""Process a single partition."""
try:
results = run_extraction(partition_id, config)
store_results(results)
except Exception as exc:
raise self.retry(exc=exc, countdown=60)Monitoring and Observability
python
import prometheus_client as prom
records_processed = prom.Counter(
'pipeline_records_processed_total',
'Total records processed',
['partition', 'status']
)
processing_duration = prom.Histogram(
'pipeline_processing_seconds',
'Time to process a batch'
)Conclusion
Production pipelines require retry logic, horizontal scaling, monitoring, and graceful degradation. Build these in from the start.