How do you build an ETL pipeline with Apache Airflow?
Quick Answer: Build an ETL pipeline in Airflow by: (1) installing Airflow (Docker Compose or pip), (2) defining a DAG (Directed Acyclic Graph) in Python, (3) creating tasks for Extract (API calls, database queries), Transform (data cleaning, aggregation), and Load (warehouse insertion), (4) setting task dependencies and scheduling, and (5) deploying and monitoring via the Airflow web UI. A basic ETL DAG requires 50-100 lines of Python code.
Building an ETL Pipeline with Apache Airflow
Apache Airflow orchestrates ETL (Extract, Transform, Load) pipelines as Python-defined DAGs (Directed Acyclic Graphs). Each DAG represents a workflow where tasks execute in a defined dependency order. A basic ETL pipeline in Airflow requires 50-100 lines of Python code.
Prerequisites
- Python 3.8 or higher
- Docker and Docker Compose (recommended for local development)
- A target data warehouse (PostgreSQL, Snowflake, BigQuery, Redshift)
- Source data access (API credentials, database connection strings)
Step 1: Install Airflow
The fastest way to get Airflow running locally is Docker Compose:
curl -LfO https://airflow.apache.org/docs/apache-airflow/stable/docker-compose.yaml
mkdir -p ./dags ./logs ./plugins
echo -e "AIRFLOW_UID=$(id -u)" > .env
docker compose up airflow-init
docker compose up -d
Alternative for development: pip install apache-airflow in a virtual environment (requires additional configuration for the database backend and executor).
Step 2: Create a DAG File
Create a new Python file in the dags/ directory. Every DAG file defines the schedule, dependencies, and task logic:
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
default_args = {
"owner": "data-team",
"retries": 2,
"retry_delay": timedelta(minutes=5),
}
with DAG(
dag_id="sales_etl_pipeline",
default_args=default_args,
schedule_interval="@daily",
start_date=datetime(2026, 1, 1),
catchup=False,
) as dag:
# Tasks defined below
pass
Step 3: Define the Extract Task
Use a PythonOperator to pull data from the source. Store extracted data in intermediate storage (S3, GCS, or a staging directory) rather than passing large datasets through XComs:
def extract_sales_data(**context):
import requests
response = requests.get(
"https://api.example.com/sales",
headers={"Authorization": "Bearer TOKEN"},
params={"date": context["ds"]}
)
data = response.json()
# Write to intermediate storage
with open(f"/tmp/sales_{context['ds']}.json", "w") as f:
json.dump(data, f)
return f"/tmp/sales_{context['ds']}.json"
extract_task = PythonOperator(
task_id="extract_sales",
python_callable=extract_sales_data,
)
Step 4: Define the Transform Task
Clean and reshape the data using pandas or standard Python:
def transform_sales_data(**context):
import pandas as pd
filepath = context["ti"].xcom_pull(task_ids="extract_sales")
df = pd.read_json(filepath)
# Remove nulls, normalize dates, aggregate
df = df.dropna(subset=["amount"])
df["date"] = pd.to_datetime(df["date"])
daily_totals = df.groupby("date")["amount"].sum().reset_index()
daily_totals.to_csv(f"/tmp/sales_transformed_{context['ds']}.csv", index=False)
transform_task = PythonOperator(
task_id="transform_sales",
python_callable=transform_sales_data,
)
Step 5: Define the Load Task
Insert transformed data into the target warehouse:
def load_sales_data(**context):
import pandas as pd
from sqlalchemy import create_engine
df = pd.read_csv(f"/tmp/sales_transformed_{context['ds']}.csv")
engine = create_engine("postgresql://user:pass@warehouse:5432/analytics")
df.to_sql("daily_sales", engine, if_exists="append", index=False)
load_task = PythonOperator(
task_id="load_sales",
python_callable=load_sales_data,
)
Step 6: Set Dependencies
Define execution order using Airflow bitshift operators:
extract_task >> transform_task >> load_task
Step 7: Deploy and Monitor
Start the Airflow scheduler and webserver. Open the web UI at localhost:8080 (default credentials: airflow/airflow). Trigger the DAG manually for the first run, then monitor task status via the Graph View. Check task logs for errors by clicking individual task instances.
Best Practices
- Use intermediate storage (S3/GCS) for data between tasks, not XComs (XCom default limit: 48KB)
- Set
catchup=Falseunless historical backfilling is required - Parameterize credentials using Airflow Variables or Connections (never hardcode)
- Add failure alerts using
on_failure_callbackor email operators - Configure retries with exponential backoff for API-dependent tasks
Common Pitfalls
- Passing full DataFrames through XComs (exceeds size limits, causes serialization errors)
- Running heavy imports or API calls at the module level (executes on every scheduler heartbeat)
- Forgetting
catchup=False(Airflow will attempt to backfill all dates fromstart_dateto today) - Not setting proper retries for API-dependent extract tasks
Editor's Note: We built an ETL pipeline in Airflow for a data team at a B2B SaaS company. The pipeline extracted data from 3 REST APIs (HubSpot, Stripe, Intercom), transformed it in Python (pandas), and loaded to Snowflake. Total development: 2 weeks for 3 DAGs. The mistake we made: initially passing full DataFrames through XComs, which exceeded the XCom size limit (48KB default). Switching to S3 intermediate storage fixed the issue. Running cost: $380/month on AWS MWAA (managed Airflow).
Related Questions
Related Tools
Apache Airflow
Programmatic authoring, scheduling, and monitoring of data workflows
ETL & Data PipelinesApify
Web scraping and browser automation platform with 2,000+ pre-built scrapers
ETL & Data PipelinesFivetran
Automated data integration platform for analytics pipelines.
ETL & Data PipelinesSupabase
Open-source Firebase alternative with PostgreSQL, auth, Edge Functions, and vector embeddings
ETL & Data Pipelines