DoclingDocling for IBM watsonx
This is a preview with content being developed and subject to changes. Rely on theofficial announcement and documentationabout the Docling for IBM watsonx product.
Cookbook

Pipeline Integration

Integrate Docling into data processing pipelines and automation workflows

Pipeline Integration

This page is not accurate! Each item needs to be validated.

Learn how to integrate Docling for IBM watsonx into your data processing pipelines for automated document conversion at scale.

Overview

Modern data pipelines need to process documents from various sources automatically. Docling provides APIs and tools to seamlessly integrate document conversion into your existing workflows, whether you're using Apache Airflow, AWS Step Functions, or custom automation.

What You'll Build

  • Automated document processing pipeline
  • Batch conversion workflows
  • Error handling and retry logic
  • Monitoring and logging

Prerequisites

  • Docling for IBM watsonx account with Service URL and API Key
  • Python 3.8+
  • Pipeline orchestration tool (Airflow, Prefect, or similar)
  • Cloud storage (S3, GCS, or Azure Blob)

Architecture

Basic Pipeline

Simple Batch Processor

import os
from pathlib import Path
from docling.service_client import DoclingServiceClient
from typing import List, Dict

SERVICE_URL = os.getenv("DOCLING_SERVICE_URL")
API_KEY = os.getenv("DOCLING_API_KEY")

def process_documents(input_dir: str, output_dir: str) -> List[Dict]:
    """Process all documents in a directory."""
    input_path = Path(input_dir)
    output_path = Path(output_dir)
    output_path.mkdir(parents=True, exist_ok=True)
    
    results = []
    
    with DoclingServiceClient(url=SERVICE_URL, api_key=API_KEY) as client:
        for doc_file in input_path.glob("*.pdf"):
            try:
                # Convert document
                result = client.convert(source=doc_file)
                
                # Save output
                output_file = output_path / f"{doc_file.stem}.md"
                output_file.write_text(result.document.export_to_markdown())
                
                results.append({
                    'input': str(doc_file),
                    'output': str(output_file),
                    'status': 'success'
                })
                
            except Exception as e:
                results.append({
                    'input': str(doc_file),
                    'error': str(e),
                    'status': 'failed'
                })
    
    return results

Apache Airflow Integration

Airflow DAG

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
from docling.service_client import DoclingServiceClient
import os

SERVICE_URL = os.getenv("DOCLING_SERVICE_URL")
API_KEY = os.getenv("DOCLING_API_KEY")

def convert_documents(**context):
    """Convert documents task."""
    input_files = context['task_instance'].xcom_pull(task_ids='list_files')
    
    results = []
    with DoclingServiceClient(url=SERVICE_URL, api_key=API_KEY) as client:
        for file_path in input_files:
            result = client.convert(source=file_path)
            results.append({
                'file': file_path,
                'content': result.document.export_to_markdown()
            })
    
    return results

def list_input_files(**context):
    """List files to process."""
    from pathlib import Path
    input_dir = Path("/data/input")
    return [str(f) for f in input_dir.glob("*.pdf")]

def save_results(**context):
    """Save converted documents."""
    results = context['task_instance'].xcom_pull(task_ids='convert')
    output_dir = Path("/data/output")
    
    for result in results:
        file_name = Path(result['file']).stem
        output_file = output_dir / f"{file_name}.md"
        output_file.write_text(result['content'])

# Define DAG
default_args = {
    'owner': 'data-team',
    'depends_on_past': False,
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 3,
}

with DAG(
    'docling_pipeline',
    default_args=default_args,
    description='Convert documents with Docling',
    schedule_interval='@daily',
    start_date=days_ago(1),
    catchup=False,
) as dag:
    
    list_task = PythonOperator(
        task_id='list_files',
        python_callable=list_input_files,
    )
    
    convert_task = PythonOperator(
        task_id='convert',
        python_callable=convert_documents,
    )
    
    save_task = PythonOperator(
        task_id='save',
        python_callable=save_results,
    )
    
    list_task >> convert_task >> save_task

AWS Integration

S3 to S3 Pipeline

import boto3
from docling.service_client import DoclingServiceClient
import os

SERVICE_URL = os.getenv("DOCLING_SERVICE_URL")
API_KEY = os.getenv("DOCLING_API_KEY")

def process_s3_documents(
    input_bucket: str,
    output_bucket: str,
    prefix: str = ""
):
    """Process documents from S3."""
    s3 = boto3.client('s3')
    
    # List objects
    response = s3.list_objects_v2(
        Bucket=input_bucket,
        Prefix=prefix
    )
    
    with DoclingServiceClient(url=SERVICE_URL, api_key=API_KEY) as client:
        for obj in response.get('Contents', []):
            key = obj['Key']
            
            if not key.endswith('.pdf'):
                continue
            
            # Download from S3
            local_file = f"/tmp/{key.split('/')[-1]}"
            s3.download_file(input_bucket, key, local_file)
            
            # Convert
            result = client.convert(source=local_file)
            markdown = result.document.export_to_markdown()
            
            # Upload to S3
            output_key = key.replace('.pdf', '.md')
            s3.put_object(
                Bucket=output_bucket,
                Key=output_key,
                Body=markdown.encode('utf-8')
            )
            
            # Cleanup
            os.remove(local_file)

Lambda Function

import json
import boto3
from docling.service_client import DoclingServiceClient
import os

SERVICE_URL = os.getenv("DOCLING_SERVICE_URL")
API_KEY = os.getenv("DOCLING_API_KEY")

def lambda_handler(event, context):
    """Lambda function to process S3 uploads."""
    s3 = boto3.client('s3')
    
    # Get S3 event details
    bucket = event['Records'][0]['s3']['bucket']['name']
    key = event['Records'][0]['s3']['object']['key']
    
    # Download file
    local_file = f"/tmp/{key.split('/')[-1]}"
    s3.download_file(bucket, key, local_file)
    
    # Convert with Docling
    with DoclingServiceClient(url=SERVICE_URL, api_key=API_KEY) as client:
        result = client.convert(source=local_file)
        markdown = result.document.export_to_markdown()
    
    # Upload result
    output_key = key.replace('.pdf', '.md')
    s3.put_object(
        Bucket=bucket,
        Key=f"converted/{output_key}",
        Body=markdown.encode('utf-8')
    )
    
    return {
        'statusCode': 200,
        'body': json.dumps(f'Processed {key}')
    }

Error Handling

Retry Logic

from tenacity import retry, stop_after_attempt, wait_exponential
from docling.service_client import DoclingServiceClient

@retry(
    stop=stop_after_attempt(3),
    wait=wait_exponential(multiplier=1, min=4, max=10)
)
def convert_with_retry(file_path: str) -> str:
    """Convert document with automatic retry."""
    with DoclingServiceClient(url=SERVICE_URL, api_key=API_KEY) as client:
        result = client.convert(source=file_path)
        return result.document.export_to_markdown()

Error Tracking

import logging
from typing import Dict, List

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def process_with_error_tracking(
    files: List[str]
) -> Dict[str, List[str]]:
    """Process files with error tracking."""
    results = {
        'success': [],
        'failed': [],
        'errors': []
    }
    
    with DoclingServiceClient(url=SERVICE_URL, api_key=API_KEY) as client:
        for file_path in files:
            try:
                result = client.convert(source=file_path)
                results['success'].append(file_path)
                logger.info(f"Successfully converted {file_path}")
                
            except Exception as e:
                results['failed'].append(file_path)
                results['errors'].append({
                    'file': file_path,
                    'error': str(e)
                })
                logger.error(f"Failed to convert {file_path}: {e}")
    
    return results

Monitoring

Metrics Collection

from prometheus_client import Counter, Histogram
import time

# Define metrics
conversion_counter = Counter(
    'docling_conversions_total',
    'Total number of document conversions',
    ['status']
)

conversion_duration = Histogram(
    'docling_conversion_duration_seconds',
    'Time spent converting documents'
)

def convert_with_metrics(file_path: str) -> str:
    """Convert document with metrics."""
    start_time = time.time()
    
    try:
        with DoclingServiceClient(url=SERVICE_URL, api_key=API_KEY) as client:
            result = client.convert(source=file_path)
            
        conversion_counter.labels(status='success').inc()
        return result.document.export_to_markdown()
        
    except Exception as e:
        conversion_counter.labels(status='failed').inc()
        raise
        
    finally:
        duration = time.time() - start_time
        conversion_duration.observe(duration)

Logging

import logging
import json
from datetime import datetime

def setup_logging():
    """Configure structured logging."""
    logging.basicConfig(
        level=logging.INFO,
        format='%(message)s'
    )
    return logging.getLogger(__name__)

logger = setup_logging()

def log_conversion(file_path: str, status: str, duration: float):
    """Log conversion event."""
    log_entry = {
        'timestamp': datetime.utcnow().isoformat(),
        'event': 'document_conversion',
        'file': file_path,
        'status': status,
        'duration_seconds': duration
    }
    logger.info(json.dumps(log_entry))

Batch Processing

Parallel Processing

from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import List, Dict

def process_batch_parallel(
    files: List[str],
    max_workers: int = 5
) -> List[Dict]:
    """Process multiple files in parallel."""
    results = []
    
    def convert_file(file_path: str) -> Dict:
        try:
            with DoclingServiceClient(url=SERVICE_URL, api_key=API_KEY) as client:
                result = client.convert(source=file_path)
                return {
                    'file': file_path,
                    'status': 'success',
                    'content': result.document.export_to_markdown()
                }
        except Exception as e:
            return {
                'file': file_path,
                'status': 'failed',
                'error': str(e)
            }
    
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = {
            executor.submit(convert_file, f): f 
            for f in files
        }
        
        for future in as_completed(futures):
            results.append(future.result())
    
    return results

Complete Pipeline Example

import os
import logging
from pathlib import Path
from typing import List, Dict
from docling.service_client import DoclingServiceClient
from concurrent.futures import ThreadPoolExecutor

# Configuration
SERVICE_URL = os.getenv("DOCLING_SERVICE_URL")
API_KEY = os.getenv("DOCLING_API_KEY")
INPUT_DIR = Path("/data/input")
OUTPUT_DIR = Path("/data/output")
MAX_WORKERS = 5

# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def process_pipeline():
    """Complete document processing pipeline."""
    
    # Step 1: List input files
    logger.info("Listing input files...")
    input_files = list(INPUT_DIR.glob("*.pdf"))
    logger.info(f"Found {len(input_files)} files to process")
    
    # Step 2: Process files in parallel
    logger.info("Converting documents...")
    results = []
    
    def convert_file(file_path: Path) -> Dict:
        try:
            with DoclingServiceClient(url=SERVICE_URL, api_key=API_KEY) as client:
                result = client.convert(source=file_path)
                
                # Save output
                output_file = OUTPUT_DIR / f"{file_path.stem}.md"
                output_file.write_text(result.document.export_to_markdown())
                
                return {
                    'file': str(file_path),
                    'status': 'success',
                    'output': str(output_file)
                }
        except Exception as e:
            logger.error(f"Failed to convert {file_path}: {e}")
            return {
                'file': str(file_path),
                'status': 'failed',
                'error': str(e)
            }
    
    with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
        results = list(executor.map(convert_file, input_files))
    
    # Step 3: Report results
    success_count = sum(1 for r in results if r['status'] == 'success')
    failed_count = len(results) - success_count
    
    logger.info(f"Pipeline complete: {success_count} succeeded, {failed_count} failed")
    
    return results

if __name__ == "__main__":
    process_pipeline()

Best Practices

Pipeline Design

  1. Idempotency - Ensure pipeline can be safely re-run
  2. Error handling - Implement comprehensive error handling
  3. Monitoring - Track success rates and performance
  4. Scalability - Design for horizontal scaling

Performance

  1. Parallel processing - Process multiple documents simultaneously
  2. Batch operations - Group related operations
  3. Resource limits - Set appropriate worker limits
  4. Caching - Cache results when appropriate

Reliability

  1. Retry logic - Implement exponential backoff
  2. Dead letter queues - Handle persistent failures
  3. Health checks - Monitor pipeline health
  4. Alerting - Set up failure notifications

Next Steps

Resources

On this page