ETL Pipeline Basics with Apache Airflow

Last updated: Dec 5, 2025

ETL Pipeline Basics with Apache Airflow

ETL (Extract, Transform, Load) pipelines are the backbone of modern data infrastructure, moving data from source systems to data warehouses, lakes, and analytics platforms. Apache Airflow has emerged as the de facto standard for orchestrating these complex workflows, providing a powerful, flexible platform for scheduling, monitoring, and maintaining data pipelines.

This comprehensive guide introduces Apache Airflow for ETL pipeline development, covering core concepts, practical implementation, and best practices for building reliable data workflows.

1. Understanding ETL and Data Orchestration

1.1 What is ETL?

ETL stands for Extract, Transform, Load—a three-stage process for data integration:

  1. Extract: Collect data from various sources (databases, APIs, files, streams)
  2. Transform: Clean, enrich, aggregate, and structure the data
  3. Load: Insert the transformed data into a target system (data warehouse, database, etc.)

1.2 The Need for Workflow Orchestration

As data pipelines grow in complexity, manual execution becomes unsustainable. Orchestration tools like Airflow provide:

  • Scheduling: Automated execution at specified intervals
  • Dependency Management: Ensure tasks run in correct order
  • Monitoring: Track pipeline health and performance
  • Error Handling: Automatic retries and alerting
  • Scalability: Handle growing data volumes and complexity

1.3 Apache Airflow: The Workflow Orchestrator

Apache Airflow is an open-source platform to programmatically author, schedule, and monitor workflows. Key features:

  • Dynamic Pipeline Generation: Python code defines workflows
  • Extensible: Rich ecosystem of operators and hooks
  • Scalable: Distributed architecture with executors
  • Visual Interface: Web UI for monitoring and management
  • Community: Large, active community with extensive integrations

2. Core Airflow Concepts

2.1 Directed Acyclic Graph (DAG)

The fundamental Airflow concept is the DAG—a collection of tasks with directional dependencies that ensure no cycles (hence “acyclic”).

from airflow import DAG
from datetime import datetime, timedelta

# Define a DAG
default_args = {
    'owner': 'data_team',
    'depends_on_past': False,
    'start_date': datetime(2025, 1, 1),
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG(
    'etl_pipeline',
    default_args=default_args,
    description='A simple ETL pipeline',
    schedule_interval='@daily',  # Run once per day
    catchup=False,
    tags=['etl', 'data-pipeline'],
)

2.2 Operators

Operators define individual tasks within a DAG. Each operator performs a specific action:

from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator

# Python operator for custom logic
extract_task = PythonOperator(
    task_id='extract_data',
    python_callable=extract_function,
    dag=dag,
)

# Bash operator for shell commands
transform_task = BashOperator(
    task_id='transform_data',
    bash_command='python /scripts/transform.py',
    dag=dag,
)

# PostgreSQL operator for database operations
load_task = PostgresOperator(
    task_id='load_data',
    postgres_conn_id='postgres_default',
    sql='INSERT INTO table SELECT * FROM staging;',
    dag=dag,
)

2.3 Tasks and Task Instances

  • Task: A unit of work defined by an operator
  • Task Instance: A specific run of a task at a particular time
  • Execution Date: Logical date when a DAG run is scheduled

2.4 Hooks

Hooks provide interfaces to external systems without handling connection management:

from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.providers.http.hooks.http import HttpHook

# Database hook
postgres_hook = PostgresHook(postgres_conn_id='postgres_default')
connection = postgres_hook.get_conn()
cursor = connection.cursor()

# API hook
http_hook = HttpHook(http_conn_id='api_connection', method='GET')
response = http_hook.run(endpoint='/data')

3. Building Your First ETL Pipeline

3.1 Pipeline Architecture

Let’s build a complete ETL pipeline that:

  1. Extracts data from a REST API
  2. Transforms the data (cleaning, validation)
  3. Loads it into a PostgreSQL database
  4. Sends notifications on completion

3.2 Project Structure

etl_project/
├── dags/
│   └── financial_etl.py
├── plugins/
│   └── custom_operators.py
├── scripts/
│   ├── extract.py
│   ├── transform.py
│   └── load.py
├── sql/
│   └── create_tables.sql
└── config/
    └── connections.yaml

3.3 Complete Pipeline Implementation

# dags/financial_etl.py
from datetime import datetime, timedelta
import pandas as pd
import requests
import json
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.empty import EmptyOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator

default_args = {
    'owner': 'data_engineering',
    'depends_on_past': False,
    'start_date': datetime(2025, 1, 1),
    'email': ['data-team@example.com'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 2,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG(
    'financial_data_etl',
    default_args=default_args,
    description='ETL pipeline for financial data',
    schedule_interval='0 6 * * *',  # Daily at 6 AM
    catchup=False,
    tags=['financial', 'etl', 'daily'],
)

# Task 1: Create tables if they don't exist
create_tables = PostgresOperator(
    task_id='create_tables',
    postgres_conn_id='postgres_warehouse',
    sql='''
        CREATE TABLE IF NOT EXISTS financial_staging (
            id SERIAL PRIMARY KEY,
            symbol VARCHAR(10),
            date DATE,
            open DECIMAL(10,2),
            high DECIMAL(10,2),
            low DECIMAL(10,2),
            close DECIMAL(10,2),
            volume BIGINT,
            extracted_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        );
        
        CREATE TABLE IF NOT EXISTS financial_final (
            id SERIAL PRIMARY KEY,
            symbol VARCHAR(10),
            date DATE UNIQUE,
            open DECIMAL(10,2),
            high DECIMAL(10,2),
            low DECIMAL(10,2),
            close DECIMAL(10,2),
            volume BIGINT,
            daily_return DECIMAL(10,4),
            moving_avg_7 DECIMAL(10,2),
            moving_avg_30 DECIMAL(10,2),
            loaded_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        );
    ''',
    dag=dag,
)

# Task 2: Extract data from API
def extract_data(**context):
    """Extract financial data from external API"""
    symbols = ['AAPL', 'GOOGL', 'MSFT', 'AMZN']
    api_key = context['var']['value'].get('api_key')
    all_data = []
    
    for symbol in symbols:
        url = f'https://api.example.com/financial/{symbol}?apikey={api_key}'
        response = requests.get(url)
        
        if response.status_code == 200:
            data = response.json()
            data['symbol'] = symbol
            all_data.append(data)
        else:
            raise Exception(f"Failed to fetch data for {symbol}")
    
    # Push data to XCom for downstream tasks
    context['ti'].xcom_push(key='extracted_data', value=all_data)
    
    return f"Extracted {len(all_data)} records"

extract = PythonOperator(
    task_id='extract_financial_data',
    python_callable=extract_data,
    provide_context=True,
    dag=dag,
)

# Task 3: Transform data
def transform_data(**context):
    """Clean and transform extracted data"""
    extracted_data = context['ti'].xcom_pull(
        task_ids='extract_financial_data',
        key='extracted_data'
    )
    
    transformed_records = []
    
    for record in extracted_data:
        # Basic cleaning
        cleaned_record = {
            'symbol': record['symbol'],
            'date': pd.to_datetime(record['date']).date(),
            'open': float(record['open']),
            'high': float(record['high']),
            'low': float(record['low']),
            'close': float(record['close']),
            'volume': int(record['volume'])
        }
        
        # Calculate derived metrics
        cleaned_record['daily_return'] = (
            (cleaned_record['close'] - cleaned_record['open']) 
            / cleaned_record['open']
        )
        
        transformed_records.append(cleaned_record)
    
    # Push transformed data to XCom
    context['ti'].xcom_push(
        key='transformed_data', 
        value=transformed_records
    )
    
    return f"Transformed {len(transformed_records)} records"

transform = PythonOperator(
    task_id='transform_financial_data',
    python_callable=transform_data,
    provide_context=True,
    dag=dag,
)

# Task 4: Load data to staging
def load_to_staging(**context):
    """Load transformed data to staging table"""
    transformed_data = context['ti'].xcom_pull(
        task_ids='transform_financial_data',
        key='transformed_data'
    )
    
    postgres_hook = PostgresHook(postgres_conn_id='postgres_warehouse')
    connection = postgres_hook.get_conn()
    cursor = connection.cursor()
    
    for record in transformed_data:
        cursor.execute("""
            INSERT INTO financial_staging 
            (symbol, date, open, high, low, close, volume)
            VALUES (%s, %s, %s, %s, %s, %s, %s)
            ON CONFLICT (symbol, date) DO UPDATE SET
                open = EXCLUDED.open,
                high = EXCLUDED.high,
                low = EXCLUDED.low,
                close = EXCLUDED.close,
                volume = EXCLUDED.volume,
                extracted_at = CURRENT_TIMESTAMP
        """, (
            record['symbol'],
            record['date'],
            record['open'],
            record['high'],
            record['low'],
            record['close'],
            record['volume']
        ))
    
    connection.commit()
    cursor.close()
    connection.close()
    
    return f"Loaded {len(transformed_data)} records to staging"

load_staging = PythonOperator(
    task_id='load_to_staging',
    python_callable=load_to_staging,
    provide_context=True,
    dag=dag,
)

# Task 5: Process and load to final table
process_final = PostgresOperator(
    task_id='process_final_table',
    postgres_conn_id='postgres_warehouse',
    sql='''
        -- Calculate moving averages and insert into final table
        INSERT INTO financial_final
        SELECT 
            fs.symbol,
            fs.date,
            fs.open,
            fs.high,
            fs.low,
            fs.close,
            fs.volume,
            ROUND((fs.close - fs.open) / fs.open, 4) as daily_return,
            ROUND(AVG(fs.close) OVER (
                PARTITION BY fs.symbol 
                ORDER BY fs.date 
                ROWS BETWEEN 6 PRECEDING AND CURRENT ROW
            ), 2) as moving_avg_7,
            ROUND(AVG(fs.close) OVER (
                PARTITION BY fs.symbol 
                ORDER BY fs.date 
                ROWS BETWEEN 29 PRECEDING AND CURRENT ROW
            ), 2) as moving_avg_30,
            CURRENT_TIMESTAMP
        FROM financial_staging fs
        WHERE fs.date = CURRENT_DATE - INTERVAL '1 day'
        ON CONFLICT (date) DO UPDATE SET
            open = EXCLUDED.open,
            high = EXCLUDED.high,
            low = EXCLUDED.low,
            close = EXCLUDED.close,
            volume = EXCLUDED.volume,
            daily_return = EXCLUDED.daily_return,
            moving_avg_7 = EXCLUDED.moving_avg_7,
            moving_avg_30 = EXCLUDED.moving_avg_30,
            loaded_at = CURRENT_TIMESTAMP;
        
        -- Clean up old staging data (keep 7 days)
        DELETE FROM financial_staging 
        WHERE date < CURRENT_DATE - INTERVAL '7 days';
    ''',
    dag=dag,
)

# Task 6: Send success notification
success_notification = SlackWebhookOperator(
    task_id='send_success_notification',
    slack_webhook_conn_id='slack_webhook',
    message="""
        :white_check_mark: *ETL Pipeline Completed Successfully*
        *Pipeline*: financial_data_etl
        *Execution Date*: {{ ds }}
        *Records Processed*: {{ ti.xcom_pull(task_ids='load_to_staging') }}
        *Status*: All tasks completed successfully
    """,
    dag=dag,
)

# Task 7: Define task dependencies
start = EmptyOperator(task_id='start', dag=dag)
end = EmptyOperator(task_id='end', dag=dag)

# Set up the workflow
start >> create_tables >> extract >> transform >> load_staging
load_staging >> process_final >> success_notification >> end

4. Advanced Airflow Features for ETL

4.1 Task Groups for Organization

Organize related tasks into logical groups:

from airflow.utils.task_group import TaskGroup

with TaskGroup("extract_group", tooltip="Extract tasks") as extract_group:
    extract_api = PythonOperator(task_id="extract_api", ...)
    extract_database = PythonOperator(task_id="extract_database", ...)
    extract_files = PythonOperator(task_id="extract_files", ...)
    
    # Dependencies within group
    extract_api >> [extract_database, extract_files]

with TaskGroup("transform_group") as transform_group:
    # Transformation tasks...
    pass

# Group dependencies
extract_group >> transform_group

4.2 Dynamic Task Generation

Generate tasks dynamically based on parameters:

def create_dynamic_tasks(**context):
    symbols = ['AAPL', 'GOOGL', 'MSFT', 'AMZN']
    
    for symbol in symbols:
        task_id = f'process_{symbol.lower()}'
        
        task = PythonOperator(
            task_id=task_id,
            python_callable=process_symbol,
            op_kwargs={'symbol': symbol},
            dag=dag,
        )
        
        # Set dependencies dynamically
        previous_task >> task

4.3 Custom Operators

Create reusable operators for specific ETL patterns:

from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults

class DataQualityOperator(BaseOperator):
    """Custom operator for data quality checks"""
    
    @apply_defaults
    def __init__(self, sql_check, expected_result, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.sql_check = sql_check
        self.expected_result = expected_result
    
    def execute(self, context):
        hook = PostgresHook(postgres_conn_id='postgres_warehouse')
        records = hook.get_records(self.sql_check)
        
        if not records:
            raise ValueError(f"Data quality check failed. SQL returned no results")
        
        if records[0][0] != self.expected_result:
            raise ValueError(
                f"Data quality check failed. "
                f"Expected {self.expected_result}, got {records[0][0]}"
            )
        
        self.log.info("Data quality check passed")

4.4 XCom for Cross-Task Communication

Share data between tasks using XCom:

def push_data(**context):
    data = {'key': 'value'}
    context['ti'].xcom_push(key='sample_data', value=data)

def pull_data(**context):
    data = context['ti'].xcom_pull(
        task_ids='push_task', 
        key='sample_data'
    )

5. Monitoring and Error Handling

5.1 Airflow UI Features

  • Tree View: Visualize DAG runs over time
  • Graph View: See task dependencies and status
  • Gantt Chart: Analyze task duration and overlap
  • Task Duration: Identify performance bottlenecks
  • Logs: Detailed execution logs for debugging

5.2 Error Handling Strategies

from airflow.exceptions import AirflowException

def robust_etl_task(**context):
    try:
        # Primary execution
        result = perform_etl_operation()
        
        # Validate result
        if not validate_result(result):
            raise AirflowException("Data validation failed")
        
        return result
        
    except TemporaryError as e:
        # Log and retry
        self.log.warning(f"Temporary error: {e}")
        raise AirflowException("Retry triggered")
        
    except PermanentError as e:
        # Alert and fail
        send_alert(f"Permanent failure: {e}")
        raise AirflowException("Permanent failure")

5.3 Alerting and Notifications

Configure alerts for:

  • Task failures
  • SLA misses
  • DAG duration thresholds
  • Custom metrics breaches
from airflow.operators.email import EmailOperator

alert_email = EmailOperator(
    task_id='send_failure_alert',
    to='data-team@example.com',
    subject='Airflow ETL Pipeline Failure',
    html_content="""
    <h3>ETL Pipeline Failure</h3>
    <p><strong>DAG:</strong> {{ dag.dag_id }}</p>
    <p><strong>Task:</strong> {{ ti.task_id }}</p>
    <p><strong>Execution Date:</strong> {{ ds }}</p>
    <p><strong>Error:</strong> {{ exception }}</p>
    """,
    trigger_rule='one_failed',  # Send only on failure
    dag=dag,
)

6. Best Practices for Production ETL Pipelines

6.1 Idempotency and Retry Logic

  • Design tasks to be idempotent (repeatable without side effects)
  • Implement appropriate retry strategies with exponential backoff
  • Use unique identifiers to prevent duplicate processing

6.2 Performance Optimization

  • Task Granularity: Balance between too many small tasks and too few large ones
  • Parallel Execution: Use ParallelPodOperator or CeleryExecutor for concurrency
  • Resource Management: Set appropriate CPU/memory limits
  • Data Chunking: Process large datasets in batches

6.3 Testing and Validation

  • Unit Tests: Test individual operators and functions
  • Integration Tests: Test complete DAGs in staging
  • Data Quality Checks: Validate data at each stage
  • Schema Validation: Ensure data structure consistency

6.4 Security and Credential Management

  • Use Airflow Connections for sensitive data
  • Implement secrets backends (Hashicorp Vault, AWS Secrets Manager)
  • Restrict DAG access with Airflow RBAC
  • Encrypt communication between components

7. Common ETL Patterns with Airflow

7.1 Incremental Loading

def get_last_processed_date(table_name):
    """Retrieve the last successful processing date"""
    hook = PostgresHook(postgres_conn_id='warehouse')
    sql = f"SELECT MAX(processed_date) FROM {table_name}"
    result = hook.get_first(sql)
    return result[0] if result[0] else datetime.min.date()

def incremental_extract(**context):
    last_date = get_last_processed_date('financial_data')
    current_date = context['execution_date'].date()
    
    # Extract only new data since last run
    new_data = extract_data_since(last_date)
    
    return new_data

7.2 Fan-out/Fan-in Pattern

# Fan-out: Process multiple partitions in parallel
with TaskGroup("partition_processing") as partition_group:
    for partition in partitions:
        process_partition = PythonOperator(
            task_id=f"process_{partition}",
            python_callable=process_single_partition,
            op_kwargs={'partition': partition},
        )

# Fan-in: Aggregate results
aggregate_results = PythonOperator(
    task_id='aggregate_results',
    python_callable=combine_partition_results,
    trigger_rule='all_done',  # Run even if some partitions fail
)

7.3 Data Quality Pipeline

data_quality_checks = [
    ("SELECT COUNT(*) FROM table WHERE value IS NULL", 0),
    ("SELECT COUNT(DISTINCT date) FROM table", "{{ ds }}"),
    ("SELECT MIN(value) FROM table", 0),
]

with TaskGroup("data_quality") as dq_group:
    for i, (sql, expected) in enumerate(data_quality_checks):
        check = DataQualityOperator(
            task_id=f"dq_check_{i}",
            sql_check=sql,
            expected_result=expected,
        )

8. Integration with Modern Data Stack

8.1 Cloud Data Warehouses

  • Snowflake: Use SnowflakeOperator and SnowflakeHook
  • BigQuery: BigQueryOperator for Google Cloud
  • Redshift: RedshiftSQLOperator for AWS
  • Databricks: DatabricksSubmitRunOperator

8.2 Stream Processing

  • Integrate with Kafka using KafkaProducerOperator
  • Process streams with Spark via SparkSubmitOperator
  • Real-time transformations with StreamingOperator

8.3 Data Lakes and Cloud Storage

  • S3: S3CopyObjectOperator, S3TransformOperator
  • GCS: GoogleCloudStorageToBigQueryOperator
  • ADLS: AzureDataLakeStorageListOperator

9. Deployment and Scaling Considerations

9.1 Deployment Options

  • Local Executor: Development and testing
  • Celery Executor: Production with multiple workers
  • Kubernetes Executor: Containerized, scalable deployment
  • Cloud Composer: Managed Airflow on Google Cloud
  • MWAA: Managed Workflows for Apache Airflow on AWS
  • Astronomer: Enterprise Airflow platform

9.2 Monitoring and Observability

  • Metrics Export: Prometheus metrics from Airflow
  • Log Aggregation: Centralized logging with ELK/EFK stack
  • Distributed Tracing: OpenTelemetry integration
  • Custom Dashboards: Grafana for operational insights

9.3 Cost Optimization

  • Scale workers based on workload
  • Use spot instances for non-critical tasks
  • Implement efficient scheduling to reduce idle time
  • Monitor and optimize resource utilization

10. Conclusion

Apache Airflow has revolutionized ETL pipeline development by providing a powerful, flexible, and scalable orchestration platform. By understanding Airflow’s core concepts—DAGs, operators, tasks, and hooks—you can build robust, maintainable data pipelines that handle the complexities of modern data ecosystems.

Key takeaways for successful Airflow ETL implementation:

  1. Start Simple: Begin with basic pipelines and gradually add complexity
  2. Embrace Idempotency: Design tasks that can be safely retried
  3. Monitor Rigorously: Implement comprehensive monitoring and alerting
  4. Test Thoroughly: Validate both data and pipeline logic
  5. Iterate and Improve: Continuously refine pipelines based on operational insights

As data volumes and complexity grow, Airflow’s extensibility and community support make it an ideal choice for organizations of all sizes. By following the patterns and best practices outlined in this guide, you can build ETL pipelines that are not only functional but also reliable, scalable, and maintainable.

Key Takeaways

  1. DAG Fundamentals: Directed Acyclic Graphs define workflow structure and dependencies
  2. Operator Variety: Choose from built-in operators or create custom ones for specific needs
  3. Task Management: Proper task granularity and dependency definition are crucial
  4. Error Handling: Implement robust retry logic and alerting mechanisms
  5. Idempotency: Design tasks to be repeatable without adverse effects
  6. Monitoring: Leverage Airflow UI and external tools for pipeline observability
  7. Testing: Comprehensive testing at all pipeline stages ensures reliability
  8. Scalability: Choose appropriate executors and infrastructure for workload demands
  9. Integration: Airflow works seamlessly with modern data platforms and tools
  10. Best Practices: Follow established patterns for maintainable, production-ready pipelines

Additional Resources

Related Articles on InfoBytes.guru

External Resources