How do you build an ETL pipeline with Apache Airflow?

Quick Answer: Build an ETL pipeline in Airflow by: (1) installing Airflow (Docker Compose or pip), (2) defining a DAG (Directed Acyclic Graph) in Python, (3) creating tasks for Extract (API calls, database queries), Transform (data cleaning, aggregation), and Load (warehouse insertion), (4) setting task dependencies and scheduling, and (5) deploying and monitoring via the Airflow web UI. A basic ETL DAG requires 50-100 lines of Python code.

Building an ETL Pipeline with Apache Airflow

Apache Airflow orchestrates ETL (Extract, Transform, Load) pipelines as Python-defined DAGs (Directed Acyclic Graphs). Each DAG represents a workflow where tasks execute in a defined dependency order. A basic ETL pipeline in Airflow requires 50-100 lines of Python code.

Prerequisites

  • Python 3.8 or higher
  • Docker and Docker Compose (recommended for local development)
  • A target data warehouse (PostgreSQL, Snowflake, BigQuery, Redshift)
  • Source data access (API credentials, database connection strings)

Step 1: Install Airflow

The fastest way to get Airflow running locally is Docker Compose:

curl -LfO https://airflow.apache.org/docs/apache-airflow/stable/docker-compose.yaml
mkdir -p ./dags ./logs ./plugins
echo -e "AIRFLOW_UID=$(id -u)" > .env
docker compose up airflow-init
docker compose up -d

Alternative for development: pip install apache-airflow in a virtual environment (requires additional configuration for the database backend and executor).

Step 2: Create a DAG File

Create a new Python file in the dags/ directory. Every DAG file defines the schedule, dependencies, and task logic:

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

default_args = {
    "owner": "data-team",
    "retries": 2,
    "retry_delay": timedelta(minutes=5),
}

with DAG(
    dag_id="sales_etl_pipeline",
    default_args=default_args,
    schedule_interval="@daily",
    start_date=datetime(2026, 1, 1),
    catchup=False,
) as dag:
    # Tasks defined below
    pass

Step 3: Define the Extract Task

Use a PythonOperator to pull data from the source. Store extracted data in intermediate storage (S3, GCS, or a staging directory) rather than passing large datasets through XComs:

def extract_sales_data(**context):
    import requests
    response = requests.get(
        "https://api.example.com/sales",
        headers={"Authorization": "Bearer TOKEN"},
        params={"date": context["ds"]}
    )
    data = response.json()
    # Write to intermediate storage
    with open(f"/tmp/sales_{context['ds']}.json", "w") as f:
        json.dump(data, f)
    return f"/tmp/sales_{context['ds']}.json"

extract_task = PythonOperator(
    task_id="extract_sales",
    python_callable=extract_sales_data,
)

Step 4: Define the Transform Task

Clean and reshape the data using pandas or standard Python:

def transform_sales_data(**context):
    import pandas as pd
    filepath = context["ti"].xcom_pull(task_ids="extract_sales")
    df = pd.read_json(filepath)
    # Remove nulls, normalize dates, aggregate
    df = df.dropna(subset=["amount"])
    df["date"] = pd.to_datetime(df["date"])
    daily_totals = df.groupby("date")["amount"].sum().reset_index()
    daily_totals.to_csv(f"/tmp/sales_transformed_{context['ds']}.csv", index=False)

transform_task = PythonOperator(
    task_id="transform_sales",
    python_callable=transform_sales_data,
)

Step 5: Define the Load Task

Insert transformed data into the target warehouse:

def load_sales_data(**context):
    import pandas as pd
    from sqlalchemy import create_engine
    df = pd.read_csv(f"/tmp/sales_transformed_{context['ds']}.csv")
    engine = create_engine("postgresql://user:pass@warehouse:5432/analytics")
    df.to_sql("daily_sales", engine, if_exists="append", index=False)

load_task = PythonOperator(
    task_id="load_sales",
    python_callable=load_sales_data,
)

Step 6: Set Dependencies

Define execution order using Airflow bitshift operators:

extract_task >> transform_task >> load_task

Step 7: Deploy and Monitor

Start the Airflow scheduler and webserver. Open the web UI at localhost:8080 (default credentials: airflow/airflow). Trigger the DAG manually for the first run, then monitor task status via the Graph View. Check task logs for errors by clicking individual task instances.

Best Practices

  • Use intermediate storage (S3/GCS) for data between tasks, not XComs (XCom default limit: 48KB)
  • Set catchup=False unless historical backfilling is required
  • Parameterize credentials using Airflow Variables or Connections (never hardcode)
  • Add failure alerts using on_failure_callback or email operators
  • Configure retries with exponential backoff for API-dependent tasks

Common Pitfalls

  • Passing full DataFrames through XComs (exceeds size limits, causes serialization errors)
  • Running heavy imports or API calls at the module level (executes on every scheduler heartbeat)
  • Forgetting catchup=False (Airflow will attempt to backfill all dates from start_date to today)
  • Not setting proper retries for API-dependent extract tasks

Editor's Note: We built an ETL pipeline in Airflow for a data team at a B2B SaaS company. The pipeline extracted data from 3 REST APIs (HubSpot, Stripe, Intercom), transformed it in Python (pandas), and loaded to Snowflake. Total development: 2 weeks for 3 DAGs. The mistake we made: initially passing full DataFrames through XComs, which exceeded the XCom size limit (48KB default). Switching to S3 intermediate storage fixed the issue. Running cost: $380/month on AWS MWAA (managed Airflow).

Related Questions

Last updated: | By Rafal Fila

Related Tools

Related Rankings

Dive Deeper