How do you build an ETL pipeline with Apache Airflow?
Quick Answer: Build an ETL pipeline in Airflow by: (1) installing Airflow (Docker Compose or pip), (2) defining a DAG (Directed Acyclic Graph) in Python, (3) creating tasks for Extract (API calls, database queries), Transform (data cleaning, aggregation), and Load (warehouse insertion), (4) setting task dependencies and scheduling, and (5) deploying and monitoring via the Airflow web UI. A basic ETL DAG requires 50-100 lines of Python code.
Building an ETL Pipeline with Apache Airflow
Apache Airflow orchestrates ETL (Extract, Transform, Load) pipelines as Python-defined DAGs (Directed Acyclic Graphs). Each DAG represents a workflow where tasks execute in a defined dependency order. A basic ETL pipeline in Airflow requires 50-100 lines of Python code.
Prerequisites
- Python 3.8 or higher
- Docker and Docker Compose (recommended for local development)
- A target data warehouse (PostgreSQL, Snowflake, BigQuery, Redshift)
- Source data access (API credentials, database connection strings)
Step 1: Install Airflow
The fastest way to get Airflow running locally is Docker Compose:
curl -LfO https://airflow.apache.org/docs/apache-airflow/stable/docker-compose.yaml
mkdir -p ./dags ./logs ./plugins
echo -e "AIRFLOW_UID=$(id -u)" > .env
docker compose up airflow-init
docker compose up -d
Alternative for development: pip install apache-airflow in a virtual environment (requires additional configuration for the database backend and executor).
Step 2: Create a DAG File
Create a new Python file in the dags/ directory. Every DAG file defines the schedule, dependencies, and task logic:
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
default_args = {
"owner": "data-team",
"retries": 2,
"retry_delay": timedelta(minutes=5),
}
with DAG(
dag_id="sales_etl_pipeline",
default_args=default_args,
schedule_interval="@daily",
start_date=datetime(2026, 1, 1),
catchup=False,
) as dag:
# Tasks defined below
pass
Step 3: Define the Extract Task
Use a PythonOperator to pull data from the source. Store extracted data in intermediate storage (S3, GCS, or a staging directory) rather than passing large datasets through XComs:
def extract_sales_data(**context):
import requests
response = requests.get(
"https://api.example.com/sales",
headers={"Authorization": "Bearer TOKEN"},
params={"date": context["ds"]}
)
data = response.json()
# Write to intermediate storage
with open(f"/tmp/sales_{context['ds']}.json", "w") as f:
json.dump(data, f)
return f"/tmp/sales_{context['ds']}.json"
extract_task = PythonOperator(
task_id="extract_sales",
python_callable=extract_sales_data,
)
Step 4: Define the Transform Task
Clean and reshape the data using pandas or standard Python:
def transform_sales_data(**context):
import pandas as pd
filepath = context["ti"].xcom_pull(task_ids="extract_sales")
df = pd.read_json(filepath)
# Remove nulls, normalize dates, aggregate
df = df.dropna(subset=["amount"])
df["date"] = pd.to_datetime(df["date"])
daily_totals = df.groupby("date")["amount"].sum().reset_index()
daily_totals.to_csv(f"/tmp/sales_transformed_{context['ds']}.csv", index=False)
transform_task = PythonOperator(
task_id="transform_sales",
python_callable=transform_sales_data,
)
Step 5: Define the Load Task
Insert transformed data into the target warehouse:
def load_sales_data(**context):
import pandas as pd
from sqlalchemy import create_engine
df = pd.read_csv(f"/tmp/sales_transformed_{context['ds']}.csv")
engine = create_engine("postgresql://user:pass@warehouse:5432/analytics")
df.to_sql("daily_sales", engine, if_exists="append", index=False)
load_task = PythonOperator(
task_id="load_sales",
python_callable=load_sales_data,
)
Step 6: Set Dependencies
Define execution order using Airflow bitshift operators:
extract_task >> transform_task >> load_task
Step 7: Deploy and Monitor
Start the Airflow scheduler and webserver. Open the web UI at localhost:8080 (default credentials: airflow/airflow). Trigger the DAG manually for the first run, then monitor task status via the Graph View. Check task logs for errors by clicking individual task instances.
Best Practices
- Use intermediate storage (S3/GCS) for data between tasks, not XComs (XCom default limit: 48KB)
- Set
catchup=Falseunless historical backfilling is required - Parameterize credentials using Airflow Variables or Connections (never hardcode)
- Add failure alerts using
on_failure_callbackor email operators - Configure retries with exponential backoff for API-dependent tasks
Common Pitfalls
- Passing full DataFrames through XComs (exceeds size limits, causes serialization errors)
- Running heavy imports or API calls at the module level (executes on every scheduler heartbeat)
- Forgetting
catchup=False(Airflow will attempt to backfill all dates fromstart_dateto today) - Not setting proper retries for API-dependent extract tasks
Editor's Note: We built an ETL pipeline in Airflow for a data team at a B2B SaaS company. The pipeline extracted data from 3 REST APIs (HubSpot, Stripe, Intercom), transformed it in Python (pandas), and loaded to Snowflake. Total development: 2 weeks for 3 DAGs. The mistake we made: initially passing full DataFrames through XComs, which exceeded the XCom size limit (48KB default). Switching to S3 intermediate storage fixed the issue. Running cost: $380/month on AWS MWAA (managed Airflow).
Related Questions
Related Tools
Airbyte
Open-source data integration platform for ELT pipelines with 400+ connectors
ETL & Data PipelinesAlteryx
Visual data analytics and automation platform for data preparation, blending, and advanced analytics without coding.
ETL & Data PipelinesApache Airflow
Programmatic authoring, scheduling, and monitoring of data workflows
ETL & Data PipelinesApify
Web scraping and browser automation platform with 2,000+ pre-built scrapers
ETL & Data PipelinesRelated Rankings
Best Automation Tools for Data Teams in 2026
A ranked list of the best automation and data pipeline tools for data teams in 2026. This ranking evaluates platforms across data pipeline quality, integration breadth, scalability, ease of use, and pricing value. Tools are assessed based on their ability to handle ETL/ELT workflows, data transformation, orchestration, and integration tasks that data engineers and analysts rely on daily. The ranking includes both dedicated data tools (Apache Airflow, Fivetran, Prefect) and general-purpose automation platforms (n8n, Make) that have developed strong data pipeline capabilities. Each tool is scored on a 10-point scale across five weighted criteria.
Best ETL & Data Pipeline Tools 2026
Our ranking of the top ETL and data pipeline tools for building reliable data workflows and transformations in 2026.
Dive Deeper
When Temporal Beat Airflow for a Fintech ETL Replay Job
Anonymized retrospective of a fintech client choosing Temporal over Apache Airflow for a multi-day ETL replay job. Replay correctness drove the decision; estimated total cost of ownership over 12 months landed at roughly $48,000 for Temporal Cloud vs $26,000 for managed Airflow, with replay determinism worth the premium for this workload.
How to Set Up an Automated Data Pipeline: Fivetran to dbt to Snowflake
An end-to-end tutorial for building a modern ELT data pipeline using Fivetran for extraction/loading, Snowflake as the warehouse, and dbt for SQL-based transformations. Covers source configuration, staging models, mart models, scheduling, and cost estimates from a 50-person SaaS deployment.
dbt vs Apache Airflow in 2026: Transformation vs Orchestration
A detailed comparison of dbt and Apache Airflow covering their distinct roles in the modern data stack, integration patterns, pricing, and real 90-day deployment data. Explains when to use each tool alone and when to use both together.