Building a Data Vault 2.0 warehouse with dbt is only half the picture. You need an orchestrator to schedule extractions, manage dependencies, handle retries, and — critically — pass the correct load datetime to every run. Apache Airflow is the most popular choice for this.

This guide walks through the full integration: project structure, DAG setup, passing Airflow variables to dbt, incremental loading by date, and backfill support.

Architecture Overview

The responsibilities split cleanly between Airflow and dbt:

  • Airflow — scheduling, extraction from source systems, dependency management, monitoring, alerts, and passing the execution date
  • dbt — staging (hashing, metadata), loading the Raw Vault (Hubs, Links, Satellites), Business Vault, and data marts

The pipeline flows like this:

AIRFLOW DBT Extract APIs, DBs, Files Stage Hash + Metadata Raw Vault Hubs, Links, Sats Biz Vault PIT, Bridge Marts Star Schemas {{ dag_run.start_date }} load_datetime Airflow execution date passed to every dbt layer

dbt Project Structure

Organize your dbt models by Data Vault layer, using tags so Airflow can run each layer independently:

Project structure
models/
├── staging/              # tag:staging
│   ├── stg_customers.sql
│   ├── stg_orders.sql
│   └── stg_products.sql
├── raw_vault/            # tag:raw_vault
│   ├── hubs/
│   │   ├── hub_customer.sql
│   │   ├── hub_order.sql
│   │   └── hub_product.sql
│   ├── links/
│   │   ├── link_customer_order.sql
│   │   └── link_order_product.sql
│   └── sats/
│       ├── sat_customer_details.sql
│       ├── sat_order_details.sql
│       └── sat_product_details.sql
├── business_vault/       # tag:business_vault
│   ├── pit_customer.sql
│   └── bridge_customer_order.sql
└── marts/                # tag:marts
    ├── dim_customer.sql
    ├── dim_product.sql
    └── fct_orders.sql

In your dbt_project.yml, apply tags by folder:

dbt_project.yml
models:
  my_project:
    staging:
      +tags: ['staging']
      +materialized: view
    raw_vault:
      +tags: ['raw_vault']
      +materialized: incremental
    business_vault:
      +tags: ['business_vault']
      +materialized: table
    marts:
      +tags: ['marts']
      +materialized: table

Why You Must Pass Load Datetime from Airflow

This is the most important concept in the entire integration. Many teams make the mistake of using CURRENT_TIMESTAMP() or {{ ts }} (the logical date) as the load datetime in their Data Vault. Both have problems.

In Data Vault 2.0, LOAD_DATETIME should reflect when the warehouse actually received the data — not the logical business date. The correct value is {{ dag_run.start_date }}.

There are two separate concepts that should not be confused:

  • Logical date ({{ ds }} / {{ prev_ds }}) — defines the source data window in the WHERE clause. This is what makes your pipeline idempotent: re-running for 2026-01-15 always selects the same source rows.
  • Load datetime ({{ dag_run.start_date }}) — the actual timestamp when the DAG run started and the data was physically loaded into the vault.
CURRENT_TIMESTAMP() {{ ts }} (logical date) {{ dag_run.start_date }} (correct)
Re-runs Different timestamp each call within the same run Same value on re-run — hides when data was actually reloaded New timestamp per run — shows exactly when data was refreshed
Backfills All records get "now" Gets the logical date, not the actual load time Gets the actual backfill run time — you can see when the backfill happened
Traceability Can't trace records to a DAG run Maps to a logical period, not an actual run Every record maps to an exact DAG run
Audit trail No audit trail No audit trail on re-runs Multiple load_datetime values for the same logical date = data was corrected
Idempotency Broken Preserved but incomplete Preserved — satellites use HASHDIFF to detect changes, unchanged data is not rewritten

Why this matters: When you re-run a DAG for a past logical date, the WHERE clause (using {{ prev_ds }} / {{ ds }}) selects the same source rows. If the data hasn't changed, the satellite's HASHDIFF matches and nothing is written. If the source data has changed, a new satellite record is inserted with the current dag_run.start_date — giving you a clear audit trail of when the correction was loaded.

Use {{ dag_run.start_date }} as your LOAD_DATETIME, and {{ ds }} / {{ prev_ds }} for your source data window. This is the correct separation of concerns in a production Data Vault.

The Airflow DAG — Full Example

Here's the DAG structure visualized as it appears in the Airflow UI:

extract sources dbt_staging tag:staging dbt_raw_vault tag:raw_vault dbt_biz_vault tag:business_vault dbt_marts tag:marts dbt_test all models

Here's the complete, production-ready DAG code:

Airflow DAG — data_vault_pipeline.py
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
from datetime import datetime, timedelta

DBT_DIR = "/opt/dbt/data_vault_project"

default_args = {
    "owner": "data-team",
    "retries": 2,
    "retry_delay": timedelta(minutes=5),
}

with DAG(
    dag_id="data_vault_pipeline",
    default_args=default_args,
    schedule_interval="0 6 * * *",   # daily at 6 AM UTC
    start_date=datetime(2026, 1, 1),
    catchup=False,
    tags=["data-vault", "dbt"],
) as dag:

    # -------------------------------------------
    # Step 1: Extract from source systems
    # -------------------------------------------
    # Replace with your extraction logic:
    # Airbyte, Fivetran, custom Python, etc.
    extract_sources = BashOperator(
        task_id="extract_sources",
        bash_command=(
            "python /opt/scripts/extract.py "
            "--start '{{ prev_ds }}' "
            "--end '{{ ds }}' "
        ),
    )

    # -------------------------------------------
    # Step 2: dbt staging
    # Hash keys, add load_datetime, record_source
    # -------------------------------------------
    dbt_staging = BashOperator(
        task_id="dbt_staging",
        bash_command=(
            f"cd {DBT_DIR} && dbt run "
            "--select tag:staging "
            "--vars '"
            '{"load_datetime": "{{ dag_run.start_date }}", '
            '"load_date_start": "{{ prev_ds }}", '
            '"load_date_end": "{{ ds }}"}'
            "'"
        ),
    )

    # -------------------------------------------
    # Step 3: Load Raw Vault
    # Hubs, Links, Satellites
    # -------------------------------------------
    dbt_raw_vault = BashOperator(
        task_id="dbt_raw_vault",
        bash_command=(
            f"cd {DBT_DIR} && dbt run "
            "--select tag:raw_vault "
            "--vars '"
            '{"load_datetime": "{{ dag_run.start_date }}"}'
            "'"
        ),
    )

    # -------------------------------------------
    # Step 4: Business Vault
    # PIT tables, Bridge tables, derived sats
    # -------------------------------------------
    dbt_business_vault = BashOperator(
        task_id="dbt_business_vault",
        bash_command=(
            f"cd {DBT_DIR} && dbt run "
            "--select tag:business_vault "
            "--vars '"
            '{"load_datetime": "{{ dag_run.start_date }}"}'
            "'"
        ),
    )

    # -------------------------------------------
    # Step 5: Data Marts
    # Star schemas for reporting
    # -------------------------------------------
    dbt_marts = BashOperator(
        task_id="dbt_marts",
        bash_command=(
            f"cd {DBT_DIR} && dbt run "
            "--select tag:marts"
        ),
    )

    # -------------------------------------------
    # Step 6: Test everything
    # -------------------------------------------
    dbt_test = BashOperator(
        task_id="dbt_test",
        bash_command=f"cd {DBT_DIR} && dbt test",
    )

    # -------------------------------------------
    # DAG dependencies
    # -------------------------------------------
    (
        extract_sources
        >> dbt_staging
        >> dbt_raw_vault
        >> dbt_business_vault
        >> dbt_marts
        >> dbt_test
    )

Staging Models — Using the Load Datetime

Your staging models receive the actual DAG run timestamp via var("load_datetime") and use it as the LOAD_DATETIME for every record. The source data window is controlled separately by var("load_date_start") and var("load_date_end"):

stg_customers.sql — staging model
-- models/staging/stg_customers.sql

WITH source AS (
    SELECT *
    FROM {{ source('raw', 'customers') }}
    WHERE _extracted_at >= '{{ var("load_date_start") }}'
      AND _extracted_at < '{{ var("load_date_end") }}'
),

staged AS (
    SELECT
        {{ dbtvault.hash('CUSTOMER_ID') }}
            AS CUSTOMER_HK,

        {{ dbtvault.hashdiff(
            columns=['CUSTOMER_NAME', 'CUSTOMER_EMAIL',
                     'CUSTOMER_PHONE', 'CUSTOMER_TIER'],
            alias='HASHDIFF'
        ) }},

        CUSTOMER_ID,
        CUSTOMER_NAME,
        CUSTOMER_EMAIL,
        CUSTOMER_PHONE,
        CUSTOMER_TIER,

        '{{ var("load_datetime") }}'::TIMESTAMP
            AS LOAD_DATETIME,

        TO_DATE('{{ var("load_date_start") }}')
            AS EFFECTIVE_FROM,

        'CRM_SYSTEM'
            AS RECORD_SOURCE

    FROM source
)

SELECT * FROM staged

Key points:

  • var("load_datetime") — the actual DAG run start time (dag_run.start_date), stamped as LOAD_DATETIME on every record. This tells you exactly when the data was physically loaded into the vault.
  • var("load_date_start") / var("load_date_end") — the logical date window (prev_ds / ds) that filters source data to only the current batch. This is what makes re-runs idempotent — the same logical date always selects the same source rows.
  • EFFECTIVE_FROM — the business date this record is valid from, derived from the logical date. Used by PIT tables and timeline queries. This is not LOAD_DATETIME — a re-run on March 14th for Jan 15th data still has EFFECTIVE_FROM = 2026-01-15.
  • CUSTOMER_HK — hash key generated by dbtvault from the business key
  • HASHDIFF — hash of descriptive columns, used by satellites to detect changes

Raw Vault Models

The diagram below shows how the Raw Vault entities relate in our example — two Hubs connected by a Link, each with its own Satellite:

HUB_CUSTOMER CUSTOMER_HK CUSTOMER_ID LOAD_DATETIME RECORD_SOURCE HUB_ORDER ORDER_HK ORDER_ID LOAD_DATETIME RECORD_SOURCE LINK_CUSTOMER_ORDER CUSTOMER_ORDER_HK CUSTOMER_HK | ORDER_HK LOAD_DATETIME SAT_CUSTOMER_DETAILS CUSTOMER_HK | HASHDIFF | NAME | EMAIL | PHONE SAT_ORDER_DETAILS ORDER_HK | HASHDIFF | STATUS | AMOUNT | DATE Hub Link Satellite

The Raw Vault models use dbtvault macros. They automatically handle insert-only logic and deduplication:

hub_customer.sql
-- models/raw_vault/hubs/hub_customer.sql

{{ config(materialized='incremental') }}

{%- set src_pk = 'CUSTOMER_HK' -%}
{%- set src_nk = 'CUSTOMER_ID' -%}
{%- set src_ldts = 'LOAD_DATETIME' -%}
{%- set src_source = 'RECORD_SOURCE' -%}
{%- set source_model = ref('stg_customers') -%}

{{ dbtvault.hub(
    src_pk=src_pk,
    src_nk=src_nk,
    src_ldts=src_ldts,
    src_source=src_source,
    source_model=source_model
) }}
sat_customer_details.sql
-- models/raw_vault/sats/sat_customer_details.sql

{{ config(materialized='incremental') }}

{%- set src_pk = 'CUSTOMER_HK' -%}
{%- set src_hashdiff = 'HASHDIFF' -%}
{%- set src_payload = ['CUSTOMER_NAME', 'CUSTOMER_EMAIL',
                        'CUSTOMER_PHONE', 'CUSTOMER_TIER'] -%}
{%- set src_ldts = 'LOAD_DATETIME' -%}
{%- set src_source = 'RECORD_SOURCE' -%}
{%- set source_model = ref('stg_customers') -%}

{{ dbtvault.sat(
    src_pk=src_pk,
    src_hashdiff=src_hashdiff,
    src_payload=src_payload,
    src_ldts=src_ldts,
    src_source=src_source,
    source_model=source_model
) }}

The satellite macro compares the incoming HASHDIFF against existing records. If the hash changed, a new row is inserted. If not, nothing happens. This is how Data Vault tracks full change history automatically.

link_customer_order.sql
-- models/raw_vault/links/link_customer_order.sql

{{ config(materialized='incremental') }}

{%- set src_pk = 'CUSTOMER_ORDER_HK' -%}
{%- set src_fk = ['CUSTOMER_HK', 'ORDER_HK'] -%}
{%- set src_ldts = 'LOAD_DATETIME' -%}
{%- set src_source = 'RECORD_SOURCE' -%}
{%- set source_model = ref('stg_orders') -%}

{{ dbtvault.link(
    src_pk=src_pk,
    src_fk=src_fk,
    src_ldts=src_ldts,
    src_source=src_source,
    source_model=source_model
) }}

Backfills — Why This All Matters

The real payoff of passing Airflow dates comes when you need to backfill. Say your pipeline was down for 3 days, or you need to reload historical data:

Backfill command
# Backfill the last 7 days
airflow dags backfill data_vault_pipeline \
    --start-date 2026-03-01 \
    --end-date 2026-03-07 \
    --reset-dagruns

Airflow will execute one DAG run per day, each with its own logical date ({{ ds }}) and run timestamp ({{ dag_run.start_date }}). Because your staging models filter source data by load_date_start and load_date_end, each run processes exactly the right slice of data. And because the Raw Vault is insert-only with hash-based deduplication, re-running is completely safe — no duplicates, no data loss.

Each backfill run gets its own dag_run.start_date, so you can see in your satellite records exactly when the backfill happened — distinct from the original load and from any future re-runs.

Alternative: Astronomer Cosmos

If you want deeper Airflow + dbt integration, Cosmos by Astronomer converts each dbt model into its own Airflow task. This gives you per-model retries, parallelism, and visibility in the Airflow UI:

Cosmos alternative — DbtDag
from cosmos import DbtDag, ProjectConfig, ProfileConfig, ExecutionConfig

data_vault_dag = DbtDag(
    project_config=ProjectConfig("/opt/dbt/data_vault_project"),
    profile_config=ProfileConfig(
        profile_name="data_vault",
        target_name="prod",
    ),
    execution_config=ExecutionConfig(
        dbt_executable_path="/usr/local/bin/dbt",
    ),
    operator_args={
        "vars": '{"load_datetime": "{{ dag_run.start_date }}"}',
    },
    schedule_interval="0 6 * * *",
    start_date=datetime(2026, 1, 1),
    dag_id="data_vault_cosmos",
    catchup=False,
)

With Cosmos, you'll see every hub, link, and satellite as a separate task in the Airflow graph view — making it easy to identify which specific model failed and retry just that one.

Summary

  1. Use Airflow for orchestration — scheduling, extraction, monitoring
  2. Use dbt for transformation — staging, Raw Vault, Business Vault, marts
  3. Pass {{ dag_run.start_date }} as load datetime — this is when data was actually loaded, giving you a real audit trail. Never use CURRENT_TIMESTAMP() or {{ ts }}
  4. Filter source data by logical date window using {{ prev_ds }} and {{ ds }} — this is what makes re-runs idempotent
  5. Tag your dbt models by layer so Airflow can run each stage independently
  6. Backfills just work — the logical date selects the same source rows, and satellites only write new records if data actually changed

Need help building your Data Vault pipeline?

We design and implement production-ready Data Vault 2.0 pipelines with Airflow and dbt. From architecture to deployment.

Get Started