Implementing Data Retention for Dagster Assets
Last updated: November 6, 2025
This article explains how to implement data retention policies in Dagster to automatically delete old data while keeping the Software-Defined Assets (SDA) view synchronized with the underlying data state.
Problem Description
Users need to implement data retention policies that both delete old underlying data files and ensure Dagster's asset view accurately reflects the current state of available data. This requires coordinating data deletion with Dagster's partition management system.
Symptoms
Accumulating old data files consuming storage space
Dagster showing partitions for data that no longer exists
Misalignment between actual data availability and Dagster's partition view
Root Cause
Dagster doesn't automatically clean up partition metadata when underlying data is deleted, and there's no built-in retention policy mechanism that coordinates data deletion with partition management.
Solution
Step-by-Step Resolution
Set up Dynamic Partitions for your assets:
# Your asset with dynamic partitions @dg.asset( partitions_def=dg.DynamicPartitionsDefinition(name="my_data_partitions") ) def my_partitioned_asset(context): partition_key = context.partition_key # Your asset logic return process_data_for_partition(partition_key)Create a data retention sensor:
@dg.sensor(minimum_interval_seconds=86400) # Daily def data_retention_sensor(context): """Delete old partitioned data based on retention policy""" retention_days = 90 cutoff_date = datetime.now() - timedelta(days=retention_days) # Get partitions to delete partitions_to_delete = [] for partition_key in context.instance.get_dynamic_partitions("my_data_partitions"): partition_date = datetime.strptime(partition_key, "%Y-%m-%d") if partition_date < cutoff_date: partitions_to_delete.append(partition_key) # Delete actual data for partition_key in partitions_to_delete: # Your deletion logic (S3, filesystem, database, etc.) delete_partition_data(partition_key) # Remove from Dagster's view context.instance.delete_dynamic_partition("my_data_partitions", partition_key) context.log.info(f"Deleted partition: {partition_key}") return dg.SkipReason(f"Processed {len(partitions_to_delete)} partitions for deletion")Implement a partition synchronization sensor:
# Sensor to sync Dagster partitions with actual data @dg.sensor(minimum_interval_seconds=3600) # Hourly def partition_sync_sensor(context): """Keep Dagster partitions aligned with actual data""" # Get current state dagster_partitions = set(context.instance.get_dynamic_partitions("my_data_partitions")) actual_partitions = get_existing_data_partitions() # Your logic to scan actual data # Add new partitions that exist in data but not Dagster to_add = actual_partitions - dagster_partitions for partition_key in to_add: context.instance.add_dynamic_partition("my_data_partitions", partition_key) context.log.info(f"Added partition: {partition_key}") # Remove orphaned partitions (exist in Dagster but not in actual data) to_remove = dagster_partitions - actual_partitions for partition_key in to_remove: context.instance.delete_dynamic_partition("my_data_partitions", partition_key) context.log.info(f"Removed orphaned partition: {partition_key}") return dg.SkipReason(f"Synced partitions: +{len(to_add)}, -{len(to_remove)}")
Alternative Solutions
• Regularly schedule deletion of old files from your data storage (filesystem, S3, etc.) using external tools
• Use the DagsterInstance API to clean up old run/event logs if needed
• Configure tick retention in your instance YAML for schedules/sensors
Prevention
Set up monitoring for your retention sensors to ensure they're running successfully. Consider implementing alerts if partition counts grow unexpectedly or if the sync sensor detects significant misalignments.
Related Documentation