Adjust dbt asset config for incremental models
With lineage in place, there’s one final adjustment to make to our scaffolding. In our dbt project, the daily_metrics model is an incremental model. Incremental models improve performance by avoiding full refreshes: instead of reprocessing everything, they only handle new or modified data based on a time filter.
{{
  config(
    materialized='incremental',
    unique_key='date_of_business',
    tags=["daily"]
  )
}}
with
    trips as (
        select *
        from {{ ref('stg_trips') }}
    ),
    daily_summary as (
        select
            date_trunc('day', pickup_datetime) as date_of_business,
            count(*) as trip_count,
            sum(duration) as total_duration,
            sum(duration) / count(*) as average_duration,
            sum(total_amount) as total_amount,
            sum(total_amount) / count(*) as average_amount,
            sum(case when duration > 30 then 1 else 0 end) / count(*) as pct_over_30_min
        from trips
        group by all
    )
select *
from daily_summary
{% if is_incremental() %}
    where date_of_business between '{{ var('min_date') }}' and '{{ var('max_date') }}'
{% endif %}
Here’s how incremental models work:
- First run: the model processes the full dataset with no filters.
- Subsequent runs: dbt applies the is_incremental()filter, usingmin_dateandmax_datevalues provided at runtime.
1. Include a template var
The first step is to add a new template var to your component. This will be used to define the partitions definition that will be used to partition the assets.
import dagster as dg
@dg.template_var
def daily_partitions_def() -> dg.DailyPartitionsDefinition:
    return dg.DailyPartitionsDefinition(start_date="2023-01-01")
2. Update dbt component configuration
The next step is to update the defs.yaml file to use the new template var and apply this partitions definition to all assets using the post_process field:
type: dagster_dbt.DbtProjectComponent
template_vars_module: .template_vars
attributes:
  project: '{{ project_root }}/src/project_dbt/analytics'
  select: "tag:daily"
  translation:
    key: "taxi_{{ node.name }}"
    group_name: dbt
post_processing:
  assets:
    - target: "*"
      attributes:
        partitions_def: "{{ daily_partitions_def }}"
Finally, we need to pass in new configuration to the cli_args field so that the dbt execution actually changes based on what partition is executing. In particular, we want to pass in values to the --vars configuration field that determine the range of time that our incremental models should process.
When the cli_args field is resolved, it has access to a context.partition_time_window object, which is Dagster's representation of the time range that should be processed on the current run. This can be converted into a format recognized by your dbt project using template variables:
type: dagster_dbt.DbtProjectComponent
template_vars_module: .template_vars
attributes:
  project: '{{ project_root }}/src/project_dbt/analytics'
  select: "tag:daily"
  translation:
    key: "taxi_{{ node.name }}"
    group_name: dbt
  cli_args:
    - build
    - --vars:
      min_date: "{{ context.partition_time_window.start.strftime('%Y-%m-%d') }}"
      max_date: "{{ context.partition_time_window.end.strftime('%Y-%m-%d') }}"
post_processing:
  assets:
    - target: "*"
      attributes:
        partitions_def: "{{ daily_partitions_def }}"
This design lets us:
- Automatically configure incremental filters with partition context.
- Keep partitioning consistent across upstream and downstream assets.
- Run incremental updates or full backfills as needed.
Best of all, this pattern applies automatically to every dbt model in the project. As you add more models — incremental or not — Dagster will handle them the same way, with no extra structural changes.
Next steps
- Continue this example with dbt tests.