Cookbook
Pipeline Integration
Integrate Docling into data processing pipelines and automation workflows
Pipeline Integration
This page is not accurate! Each item needs to be validated.
Learn how to integrate Docling for IBM watsonx into your data processing pipelines for automated document conversion at scale.
Overview
Modern data pipelines need to process documents from various sources automatically. Docling provides APIs and tools to seamlessly integrate document conversion into your existing workflows, whether you're using Apache Airflow, AWS Step Functions, or custom automation.
What You'll Build
- Automated document processing pipeline
- Batch conversion workflows
- Error handling and retry logic
- Monitoring and logging
Prerequisites
- Docling for IBM watsonx account with Service URL and API Key
- Python 3.8+
- Pipeline orchestration tool (Airflow, Prefect, or similar)
- Cloud storage (S3, GCS, or Azure Blob)
Architecture
Basic Pipeline
Simple Batch Processor
import os
from pathlib import Path
from docling.service_client import DoclingServiceClient
from typing import List, Dict
SERVICE_URL = os.getenv("DOCLING_SERVICE_URL")
API_KEY = os.getenv("DOCLING_API_KEY")
def process_documents(input_dir: str, output_dir: str) -> List[Dict]:
"""Process all documents in a directory."""
input_path = Path(input_dir)
output_path = Path(output_dir)
output_path.mkdir(parents=True, exist_ok=True)
results = []
with DoclingServiceClient(url=SERVICE_URL, api_key=API_KEY) as client:
for doc_file in input_path.glob("*.pdf"):
try:
# Convert document
result = client.convert(source=doc_file)
# Save output
output_file = output_path / f"{doc_file.stem}.md"
output_file.write_text(result.document.export_to_markdown())
results.append({
'input': str(doc_file),
'output': str(output_file),
'status': 'success'
})
except Exception as e:
results.append({
'input': str(doc_file),
'error': str(e),
'status': 'failed'
})
return resultsApache Airflow Integration
Airflow DAG
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
from docling.service_client import DoclingServiceClient
import os
SERVICE_URL = os.getenv("DOCLING_SERVICE_URL")
API_KEY = os.getenv("DOCLING_API_KEY")
def convert_documents(**context):
"""Convert documents task."""
input_files = context['task_instance'].xcom_pull(task_ids='list_files')
results = []
with DoclingServiceClient(url=SERVICE_URL, api_key=API_KEY) as client:
for file_path in input_files:
result = client.convert(source=file_path)
results.append({
'file': file_path,
'content': result.document.export_to_markdown()
})
return results
def list_input_files(**context):
"""List files to process."""
from pathlib import Path
input_dir = Path("/data/input")
return [str(f) for f in input_dir.glob("*.pdf")]
def save_results(**context):
"""Save converted documents."""
results = context['task_instance'].xcom_pull(task_ids='convert')
output_dir = Path("/data/output")
for result in results:
file_name = Path(result['file']).stem
output_file = output_dir / f"{file_name}.md"
output_file.write_text(result['content'])
# Define DAG
default_args = {
'owner': 'data-team',
'depends_on_past': False,
'email_on_failure': True,
'email_on_retry': False,
'retries': 3,
}
with DAG(
'docling_pipeline',
default_args=default_args,
description='Convert documents with Docling',
schedule_interval='@daily',
start_date=days_ago(1),
catchup=False,
) as dag:
list_task = PythonOperator(
task_id='list_files',
python_callable=list_input_files,
)
convert_task = PythonOperator(
task_id='convert',
python_callable=convert_documents,
)
save_task = PythonOperator(
task_id='save',
python_callable=save_results,
)
list_task >> convert_task >> save_taskAWS Integration
S3 to S3 Pipeline
import boto3
from docling.service_client import DoclingServiceClient
import os
SERVICE_URL = os.getenv("DOCLING_SERVICE_URL")
API_KEY = os.getenv("DOCLING_API_KEY")
def process_s3_documents(
input_bucket: str,
output_bucket: str,
prefix: str = ""
):
"""Process documents from S3."""
s3 = boto3.client('s3')
# List objects
response = s3.list_objects_v2(
Bucket=input_bucket,
Prefix=prefix
)
with DoclingServiceClient(url=SERVICE_URL, api_key=API_KEY) as client:
for obj in response.get('Contents', []):
key = obj['Key']
if not key.endswith('.pdf'):
continue
# Download from S3
local_file = f"/tmp/{key.split('/')[-1]}"
s3.download_file(input_bucket, key, local_file)
# Convert
result = client.convert(source=local_file)
markdown = result.document.export_to_markdown()
# Upload to S3
output_key = key.replace('.pdf', '.md')
s3.put_object(
Bucket=output_bucket,
Key=output_key,
Body=markdown.encode('utf-8')
)
# Cleanup
os.remove(local_file)Lambda Function
import json
import boto3
from docling.service_client import DoclingServiceClient
import os
SERVICE_URL = os.getenv("DOCLING_SERVICE_URL")
API_KEY = os.getenv("DOCLING_API_KEY")
def lambda_handler(event, context):
"""Lambda function to process S3 uploads."""
s3 = boto3.client('s3')
# Get S3 event details
bucket = event['Records'][0]['s3']['bucket']['name']
key = event['Records'][0]['s3']['object']['key']
# Download file
local_file = f"/tmp/{key.split('/')[-1]}"
s3.download_file(bucket, key, local_file)
# Convert with Docling
with DoclingServiceClient(url=SERVICE_URL, api_key=API_KEY) as client:
result = client.convert(source=local_file)
markdown = result.document.export_to_markdown()
# Upload result
output_key = key.replace('.pdf', '.md')
s3.put_object(
Bucket=bucket,
Key=f"converted/{output_key}",
Body=markdown.encode('utf-8')
)
return {
'statusCode': 200,
'body': json.dumps(f'Processed {key}')
}Error Handling
Retry Logic
from tenacity import retry, stop_after_attempt, wait_exponential
from docling.service_client import DoclingServiceClient
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=4, max=10)
)
def convert_with_retry(file_path: str) -> str:
"""Convert document with automatic retry."""
with DoclingServiceClient(url=SERVICE_URL, api_key=API_KEY) as client:
result = client.convert(source=file_path)
return result.document.export_to_markdown()Error Tracking
import logging
from typing import Dict, List
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def process_with_error_tracking(
files: List[str]
) -> Dict[str, List[str]]:
"""Process files with error tracking."""
results = {
'success': [],
'failed': [],
'errors': []
}
with DoclingServiceClient(url=SERVICE_URL, api_key=API_KEY) as client:
for file_path in files:
try:
result = client.convert(source=file_path)
results['success'].append(file_path)
logger.info(f"Successfully converted {file_path}")
except Exception as e:
results['failed'].append(file_path)
results['errors'].append({
'file': file_path,
'error': str(e)
})
logger.error(f"Failed to convert {file_path}: {e}")
return resultsMonitoring
Metrics Collection
from prometheus_client import Counter, Histogram
import time
# Define metrics
conversion_counter = Counter(
'docling_conversions_total',
'Total number of document conversions',
['status']
)
conversion_duration = Histogram(
'docling_conversion_duration_seconds',
'Time spent converting documents'
)
def convert_with_metrics(file_path: str) -> str:
"""Convert document with metrics."""
start_time = time.time()
try:
with DoclingServiceClient(url=SERVICE_URL, api_key=API_KEY) as client:
result = client.convert(source=file_path)
conversion_counter.labels(status='success').inc()
return result.document.export_to_markdown()
except Exception as e:
conversion_counter.labels(status='failed').inc()
raise
finally:
duration = time.time() - start_time
conversion_duration.observe(duration)Logging
import logging
import json
from datetime import datetime
def setup_logging():
"""Configure structured logging."""
logging.basicConfig(
level=logging.INFO,
format='%(message)s'
)
return logging.getLogger(__name__)
logger = setup_logging()
def log_conversion(file_path: str, status: str, duration: float):
"""Log conversion event."""
log_entry = {
'timestamp': datetime.utcnow().isoformat(),
'event': 'document_conversion',
'file': file_path,
'status': status,
'duration_seconds': duration
}
logger.info(json.dumps(log_entry))Batch Processing
Parallel Processing
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import List, Dict
def process_batch_parallel(
files: List[str],
max_workers: int = 5
) -> List[Dict]:
"""Process multiple files in parallel."""
results = []
def convert_file(file_path: str) -> Dict:
try:
with DoclingServiceClient(url=SERVICE_URL, api_key=API_KEY) as client:
result = client.convert(source=file_path)
return {
'file': file_path,
'status': 'success',
'content': result.document.export_to_markdown()
}
except Exception as e:
return {
'file': file_path,
'status': 'failed',
'error': str(e)
}
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {
executor.submit(convert_file, f): f
for f in files
}
for future in as_completed(futures):
results.append(future.result())
return resultsComplete Pipeline Example
import os
import logging
from pathlib import Path
from typing import List, Dict
from docling.service_client import DoclingServiceClient
from concurrent.futures import ThreadPoolExecutor
# Configuration
SERVICE_URL = os.getenv("DOCLING_SERVICE_URL")
API_KEY = os.getenv("DOCLING_API_KEY")
INPUT_DIR = Path("/data/input")
OUTPUT_DIR = Path("/data/output")
MAX_WORKERS = 5
# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def process_pipeline():
"""Complete document processing pipeline."""
# Step 1: List input files
logger.info("Listing input files...")
input_files = list(INPUT_DIR.glob("*.pdf"))
logger.info(f"Found {len(input_files)} files to process")
# Step 2: Process files in parallel
logger.info("Converting documents...")
results = []
def convert_file(file_path: Path) -> Dict:
try:
with DoclingServiceClient(url=SERVICE_URL, api_key=API_KEY) as client:
result = client.convert(source=file_path)
# Save output
output_file = OUTPUT_DIR / f"{file_path.stem}.md"
output_file.write_text(result.document.export_to_markdown())
return {
'file': str(file_path),
'status': 'success',
'output': str(output_file)
}
except Exception as e:
logger.error(f"Failed to convert {file_path}: {e}")
return {
'file': str(file_path),
'status': 'failed',
'error': str(e)
}
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
results = list(executor.map(convert_file, input_files))
# Step 3: Report results
success_count = sum(1 for r in results if r['status'] == 'success')
failed_count = len(results) - success_count
logger.info(f"Pipeline complete: {success_count} succeeded, {failed_count} failed")
return results
if __name__ == "__main__":
process_pipeline()Best Practices
Pipeline Design
- Idempotency - Ensure pipeline can be safely re-run
- Error handling - Implement comprehensive error handling
- Monitoring - Track success rates and performance
- Scalability - Design for horizontal scaling
Performance
- Parallel processing - Process multiple documents simultaneously
- Batch operations - Group related operations
- Resource limits - Set appropriate worker limits
- Caching - Cache results when appropriate
Reliability
- Retry logic - Implement exponential backoff
- Dead letter queues - Handle persistent failures
- Health checks - Monitor pipeline health
- Alerting - Set up failure notifications
Next Steps
- Learn about Agentic Workflows for interactive processing
- Check RAG Applications for downstream use cases