Building a Data Vault 2.0 warehouse with dbt is only half the picture. You need an orchestrator to schedule extractions, manage dependencies, handle retries, and — critically — pass the correct load datetime to every run. Apache Airflow is the most popular choice for this.
This guide walks through the full integration: project structure, DAG setup, passing Airflow variables to dbt, incremental loading by date, and backfill support.
Architecture Overview
The responsibilities split cleanly between Airflow and dbt:
- Airflow — scheduling, extraction from source systems, dependency management, monitoring, alerts, and passing the execution date
- dbt — staging (hashing, metadata), loading the Raw Vault (Hubs, Links, Satellites), Business Vault, and data marts
The pipeline flows like this:
dbt Project Structure
Organize your dbt models by Data Vault layer, using tags so Airflow can run each layer independently:
Project structure
models/
├── staging/ # tag:staging
│ ├── stg_customers.sql
│ ├── stg_orders.sql
│ └── stg_products.sql
├── raw_vault/ # tag:raw_vault
│ ├── hubs/
│ │ ├── hub_customer.sql
│ │ ├── hub_order.sql
│ │ └── hub_product.sql
│ ├── links/
│ │ ├── link_customer_order.sql
│ │ └── link_order_product.sql
│ └── sats/
│ ├── sat_customer_details.sql
│ ├── sat_order_details.sql
│ └── sat_product_details.sql
├── business_vault/ # tag:business_vault
│ ├── pit_customer.sql
│ └── bridge_customer_order.sql
└── marts/ # tag:marts
├── dim_customer.sql
├── dim_product.sql
└── fct_orders.sql
In your dbt_project.yml, apply tags by folder:
dbt_project.yml
models:
my_project:
staging:
+tags: ['staging']
+materialized: view
raw_vault:
+tags: ['raw_vault']
+materialized: incremental
business_vault:
+tags: ['business_vault']
+materialized: table
marts:
+tags: ['marts']
+materialized: table
Why You Must Pass Load Datetime from Airflow
This is the most important concept in the entire integration. Many teams make the mistake of using CURRENT_TIMESTAMP() or {{ ts }} (the logical date) as the load datetime in their Data Vault. Both have problems.
In Data Vault 2.0, LOAD_DATETIME should reflect when the warehouse actually received the data — not the logical business date. The correct value is {{ dag_run.start_date }}.
There are two separate concepts that should not be confused:
- Logical date (
{{ ds }}/{{ prev_ds }}) — defines the source data window in theWHEREclause. This is what makes your pipeline idempotent: re-running for 2026-01-15 always selects the same source rows. - Load datetime (
{{ dag_run.start_date }}) — the actual timestamp when the DAG run started and the data was physically loaded into the vault.
| CURRENT_TIMESTAMP() | {{ ts }} (logical date) | {{ dag_run.start_date }} (correct) | |
|---|---|---|---|
| Re-runs | Different timestamp each call within the same run | Same value on re-run — hides when data was actually reloaded | New timestamp per run — shows exactly when data was refreshed |
| Backfills | All records get "now" | Gets the logical date, not the actual load time | Gets the actual backfill run time — you can see when the backfill happened |
| Traceability | Can't trace records to a DAG run | Maps to a logical period, not an actual run | Every record maps to an exact DAG run |
| Audit trail | No audit trail | No audit trail on re-runs | Multiple load_datetime values for the same logical date = data was corrected |
| Idempotency | Broken | Preserved but incomplete | Preserved — satellites use HASHDIFF to detect changes, unchanged data is not rewritten |
Why this matters: When you re-run a DAG for a past logical date, the WHERE clause (using {{ prev_ds }} / {{ ds }}) selects the same source rows. If the data hasn't changed, the satellite's HASHDIFF matches and nothing is written. If the source data has changed, a new satellite record is inserted with the current dag_run.start_date — giving you a clear audit trail of when the correction was loaded.
Use {{ dag_run.start_date }} as your LOAD_DATETIME, and {{ ds }} / {{ prev_ds }} for your source data window. This is the correct separation of concerns in a production Data Vault.
The Airflow DAG — Full Example
Here's the DAG structure visualized as it appears in the Airflow UI:
Here's the complete, production-ready DAG code:
Airflow DAG — data_vault_pipeline.py
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
from datetime import datetime, timedelta
DBT_DIR = "/opt/dbt/data_vault_project"
default_args = {
"owner": "data-team",
"retries": 2,
"retry_delay": timedelta(minutes=5),
}
with DAG(
dag_id="data_vault_pipeline",
default_args=default_args,
schedule_interval="0 6 * * *", # daily at 6 AM UTC
start_date=datetime(2026, 1, 1),
catchup=False,
tags=["data-vault", "dbt"],
) as dag:
# -------------------------------------------
# Step 1: Extract from source systems
# -------------------------------------------
# Replace with your extraction logic:
# Airbyte, Fivetran, custom Python, etc.
extract_sources = BashOperator(
task_id="extract_sources",
bash_command=(
"python /opt/scripts/extract.py "
"--start '{{ prev_ds }}' "
"--end '{{ ds }}' "
),
)
# -------------------------------------------
# Step 2: dbt staging
# Hash keys, add load_datetime, record_source
# -------------------------------------------
dbt_staging = BashOperator(
task_id="dbt_staging",
bash_command=(
f"cd {DBT_DIR} && dbt run "
"--select tag:staging "
"--vars '"
'{"load_datetime": "{{ dag_run.start_date }}", '
'"load_date_start": "{{ prev_ds }}", '
'"load_date_end": "{{ ds }}"}'
"'"
),
)
# -------------------------------------------
# Step 3: Load Raw Vault
# Hubs, Links, Satellites
# -------------------------------------------
dbt_raw_vault = BashOperator(
task_id="dbt_raw_vault",
bash_command=(
f"cd {DBT_DIR} && dbt run "
"--select tag:raw_vault "
"--vars '"
'{"load_datetime": "{{ dag_run.start_date }}"}'
"'"
),
)
# -------------------------------------------
# Step 4: Business Vault
# PIT tables, Bridge tables, derived sats
# -------------------------------------------
dbt_business_vault = BashOperator(
task_id="dbt_business_vault",
bash_command=(
f"cd {DBT_DIR} && dbt run "
"--select tag:business_vault "
"--vars '"
'{"load_datetime": "{{ dag_run.start_date }}"}'
"'"
),
)
# -------------------------------------------
# Step 5: Data Marts
# Star schemas for reporting
# -------------------------------------------
dbt_marts = BashOperator(
task_id="dbt_marts",
bash_command=(
f"cd {DBT_DIR} && dbt run "
"--select tag:marts"
),
)
# -------------------------------------------
# Step 6: Test everything
# -------------------------------------------
dbt_test = BashOperator(
task_id="dbt_test",
bash_command=f"cd {DBT_DIR} && dbt test",
)
# -------------------------------------------
# DAG dependencies
# -------------------------------------------
(
extract_sources
>> dbt_staging
>> dbt_raw_vault
>> dbt_business_vault
>> dbt_marts
>> dbt_test
)
Staging Models — Using the Load Datetime
Your staging models receive the actual DAG run timestamp via var("load_datetime") and use it as the LOAD_DATETIME for every record. The source data window is controlled separately by var("load_date_start") and var("load_date_end"):
stg_customers.sql — staging model
-- models/staging/stg_customers.sql
WITH source AS (
SELECT *
FROM {{ source('raw', 'customers') }}
WHERE _extracted_at >= '{{ var("load_date_start") }}'
AND _extracted_at < '{{ var("load_date_end") }}'
),
staged AS (
SELECT
{{ dbtvault.hash('CUSTOMER_ID') }}
AS CUSTOMER_HK,
{{ dbtvault.hashdiff(
columns=['CUSTOMER_NAME', 'CUSTOMER_EMAIL',
'CUSTOMER_PHONE', 'CUSTOMER_TIER'],
alias='HASHDIFF'
) }},
CUSTOMER_ID,
CUSTOMER_NAME,
CUSTOMER_EMAIL,
CUSTOMER_PHONE,
CUSTOMER_TIER,
'{{ var("load_datetime") }}'::TIMESTAMP
AS LOAD_DATETIME,
TO_DATE('{{ var("load_date_start") }}')
AS EFFECTIVE_FROM,
'CRM_SYSTEM'
AS RECORD_SOURCE
FROM source
)
SELECT * FROM staged
Key points:
var("load_datetime")— the actual DAG run start time (dag_run.start_date), stamped asLOAD_DATETIMEon every record. This tells you exactly when the data was physically loaded into the vault.var("load_date_start")/var("load_date_end")— the logical date window (prev_ds/ds) that filters source data to only the current batch. This is what makes re-runs idempotent — the same logical date always selects the same source rows.EFFECTIVE_FROM— the business date this record is valid from, derived from the logical date. Used by PIT tables and timeline queries. This is notLOAD_DATETIME— a re-run on March 14th for Jan 15th data still hasEFFECTIVE_FROM = 2026-01-15.CUSTOMER_HK— hash key generated by dbtvault from the business keyHASHDIFF— hash of descriptive columns, used by satellites to detect changes
Raw Vault Models
The diagram below shows how the Raw Vault entities relate in our example — two Hubs connected by a Link, each with its own Satellite:
The Raw Vault models use dbtvault macros. They automatically handle insert-only logic and deduplication:
hub_customer.sql
-- models/raw_vault/hubs/hub_customer.sql
{{ config(materialized='incremental') }}
{%- set src_pk = 'CUSTOMER_HK' -%}
{%- set src_nk = 'CUSTOMER_ID' -%}
{%- set src_ldts = 'LOAD_DATETIME' -%}
{%- set src_source = 'RECORD_SOURCE' -%}
{%- set source_model = ref('stg_customers') -%}
{{ dbtvault.hub(
src_pk=src_pk,
src_nk=src_nk,
src_ldts=src_ldts,
src_source=src_source,
source_model=source_model
) }}
sat_customer_details.sql
-- models/raw_vault/sats/sat_customer_details.sql
{{ config(materialized='incremental') }}
{%- set src_pk = 'CUSTOMER_HK' -%}
{%- set src_hashdiff = 'HASHDIFF' -%}
{%- set src_payload = ['CUSTOMER_NAME', 'CUSTOMER_EMAIL',
'CUSTOMER_PHONE', 'CUSTOMER_TIER'] -%}
{%- set src_ldts = 'LOAD_DATETIME' -%}
{%- set src_source = 'RECORD_SOURCE' -%}
{%- set source_model = ref('stg_customers') -%}
{{ dbtvault.sat(
src_pk=src_pk,
src_hashdiff=src_hashdiff,
src_payload=src_payload,
src_ldts=src_ldts,
src_source=src_source,
source_model=source_model
) }}
The satellite macro compares the incoming HASHDIFF against existing records. If the hash changed, a new row is inserted. If not, nothing happens. This is how Data Vault tracks full change history automatically.
Link Model Example
link_customer_order.sql
-- models/raw_vault/links/link_customer_order.sql
{{ config(materialized='incremental') }}
{%- set src_pk = 'CUSTOMER_ORDER_HK' -%}
{%- set src_fk = ['CUSTOMER_HK', 'ORDER_HK'] -%}
{%- set src_ldts = 'LOAD_DATETIME' -%}
{%- set src_source = 'RECORD_SOURCE' -%}
{%- set source_model = ref('stg_orders') -%}
{{ dbtvault.link(
src_pk=src_pk,
src_fk=src_fk,
src_ldts=src_ldts,
src_source=src_source,
source_model=source_model
) }}
Backfills — Why This All Matters
The real payoff of passing Airflow dates comes when you need to backfill. Say your pipeline was down for 3 days, or you need to reload historical data:
Backfill command
# Backfill the last 7 days
airflow dags backfill data_vault_pipeline \
--start-date 2026-03-01 \
--end-date 2026-03-07 \
--reset-dagruns
Airflow will execute one DAG run per day, each with its own logical date ({{ ds }}) and run timestamp ({{ dag_run.start_date }}). Because your staging models filter source data by load_date_start and load_date_end, each run processes exactly the right slice of data. And because the Raw Vault is insert-only with hash-based deduplication, re-running is completely safe — no duplicates, no data loss.
Each backfill run gets its own dag_run.start_date, so you can see in your satellite records exactly when the backfill happened — distinct from the original load and from any future re-runs.
Alternative: Astronomer Cosmos
If you want deeper Airflow + dbt integration, Cosmos by Astronomer converts each dbt model into its own Airflow task. This gives you per-model retries, parallelism, and visibility in the Airflow UI:
Cosmos alternative — DbtDag
from cosmos import DbtDag, ProjectConfig, ProfileConfig, ExecutionConfig
data_vault_dag = DbtDag(
project_config=ProjectConfig("/opt/dbt/data_vault_project"),
profile_config=ProfileConfig(
profile_name="data_vault",
target_name="prod",
),
execution_config=ExecutionConfig(
dbt_executable_path="/usr/local/bin/dbt",
),
operator_args={
"vars": '{"load_datetime": "{{ dag_run.start_date }}"}',
},
schedule_interval="0 6 * * *",
start_date=datetime(2026, 1, 1),
dag_id="data_vault_cosmos",
catchup=False,
)
With Cosmos, you'll see every hub, link, and satellite as a separate task in the Airflow graph view — making it easy to identify which specific model failed and retry just that one.
Summary
- Use Airflow for orchestration — scheduling, extraction, monitoring
- Use dbt for transformation — staging, Raw Vault, Business Vault, marts
- Pass
{{ dag_run.start_date }}as load datetime — this is when data was actually loaded, giving you a real audit trail. Never useCURRENT_TIMESTAMP()or{{ ts }} - Filter source data by logical date window using
{{ prev_ds }}and{{ ds }}— this is what makes re-runs idempotent - Tag your dbt models by layer so Airflow can run each stage independently
- Backfills just work — the logical date selects the same source rows, and satellites only write new records if data actually changed
Need help building your Data Vault pipeline?
We design and implement production-ready Data Vault 2.0 pipelines with Airflow and dbt. From architecture to deployment.
Get Started