Implementing Data Retention for Dagster Assets

Last updated: November 6, 2025

This article explains how to implement data retention policies in Dagster to automatically delete old data while keeping the Software-Defined Assets (SDA) view synchronized with the underlying data state.

Problem Description

Users need to implement data retention policies that both delete old underlying data files and ensure Dagster's asset view accurately reflects the current state of available data. This requires coordinating data deletion with Dagster's partition management system.

Symptoms

  • Accumulating old data files consuming storage space

  • Dagster showing partitions for data that no longer exists

  • Misalignment between actual data availability and Dagster's partition view

Root Cause

Dagster doesn't automatically clean up partition metadata when underlying data is deleted, and there's no built-in retention policy mechanism that coordinates data deletion with partition management.

Solution

Step-by-Step Resolution

  1. Set up Dynamic Partitions for your assets:

    # Your asset with dynamic partitions
    @dg.asset(
        partitions_def=dg.DynamicPartitionsDefinition(name="my_data_partitions")
    )
    def my_partitioned_asset(context):
        partition_key = context.partition_key
        # Your asset logic
        return process_data_for_partition(partition_key)
  2. Create a data retention sensor:

    @dg.sensor(minimum_interval_seconds=86400)  # Daily
    def data_retention_sensor(context):
        """Delete old partitioned data based on retention policy"""
    
        retention_days = 90
        cutoff_date = datetime.now() - timedelta(days=retention_days)
    
        # Get partitions to delete
        partitions_to_delete = []
        for partition_key in context.instance.get_dynamic_partitions("my_data_partitions"):
            partition_date = datetime.strptime(partition_key, "%Y-%m-%d")
            if partition_date < cutoff_date:
                partitions_to_delete.append(partition_key)
    
        # Delete actual data
        for partition_key in partitions_to_delete:
            # Your deletion logic (S3, filesystem, database, etc.)
            delete_partition_data(partition_key)
    
            # Remove from Dagster's view
            context.instance.delete_dynamic_partition("my_data_partitions", partition_key)
    
            context.log.info(f"Deleted partition: {partition_key}")
    
        return dg.SkipReason(f"Processed {len(partitions_to_delete)} partitions for deletion")
  3. Implement a partition synchronization sensor:

    # Sensor to sync Dagster partitions with actual data
    @dg.sensor(minimum_interval_seconds=3600)  # Hourly
    def partition_sync_sensor(context):
        """Keep Dagster partitions aligned with actual data"""
    
        # Get current state
        dagster_partitions = set(context.instance.get_dynamic_partitions("my_data_partitions"))
        actual_partitions = get_existing_data_partitions()  # Your logic to scan actual data
    
        # Add new partitions that exist in data but not Dagster
        to_add = actual_partitions - dagster_partitions
        for partition_key in to_add:
            context.instance.add_dynamic_partition("my_data_partitions", partition_key)
            context.log.info(f"Added partition: {partition_key}")
    
        # Remove orphaned partitions (exist in Dagster but not in actual data)
        to_remove = dagster_partitions - actual_partitions
        for partition_key in to_remove:
            context.instance.delete_dynamic_partition("my_data_partitions", partition_key)
            context.log.info(f"Removed orphaned partition: {partition_key}")
    
        return dg.SkipReason(f"Synced partitions: +{len(to_add)}, -{len(to_remove)}")

Alternative Solutions

• Regularly schedule deletion of old files from your data storage (filesystem, S3, etc.) using external tools

• Use the DagsterInstance API to clean up old run/event logs if needed

• Configure tick retention in your instance YAML for schedules/sensors

Prevention

Set up monitoring for your retention sensors to ensure they're running successfully. Consider implementing alerts if partition counts grow unexpectedly or if the sync sensor detects significant misalignments.

Related Documentation