diff --git a/python_modules/dagster/dagster/_core/storage/event_log/sql_event_log.py b/python_modules/dagster/dagster/_core/storage/event_log/sql_event_log.py index e8b67b15c3768..22d4919a458e8 100644 --- a/python_modules/dagster/dagster/_core/storage/event_log/sql_event_log.py +++ b/python_modules/dagster/dagster/_core/storage/event_log/sql_event_log.py @@ -1975,7 +1975,10 @@ def get_latest_tags_by_partition( return dict(latest_tags_by_partition) def get_latest_asset_partition_materialization_attempts_without_materializations( - self, asset_key: AssetKey, after_storage_id: Optional[int] = None + self, + asset_key: AssetKey, + after_storage_id: Optional[int] = None, + before_storage_id: Optional[int] = None, ) -> Mapping[str, tuple[str, int]]: """Fetch the latest materialzation and materialization planned events for each partition of the given asset. Return the partitions that have a materialization planned event but no matching (same run) materialization event. @@ -1993,6 +1996,7 @@ def get_latest_asset_partition_materialization_attempts_without_materializations DagsterEventType.ASSET_MATERIALIZATION_PLANNED, ], after_cursor=after_storage_id, + before_cursor=before_storage_id, ) latest_events_subquery = db_subquery( diff --git a/python_modules/dagster/dagster/_core/storage/partition_status_cache.py b/python_modules/dagster/dagster/_core/storage/partition_status_cache.py index c615824b0701e..7141d12401951 100644 --- a/python_modules/dagster/dagster/_core/storage/partition_status_cache.py +++ b/python_modules/dagster/dagster/_core/storage/partition_status_cache.py @@ -418,6 +418,7 @@ def build_failed_and_in_progress_partition_subset( incomplete_materializations = instance.event_log_storage.get_latest_asset_partition_materialization_attempts_without_materializations( asset_key, after_storage_id=after_storage_id ) + print(incomplete_materializations) failed_partitions: set[str] = set() diff --git a/python_modules/dagster/dagster_tests/declarative_automation_tests/daemon_tests/definitions/observable_to_eager.py b/python_modules/dagster/dagster_tests/declarative_automation_tests/daemon_tests/definitions/observable_to_eager.py new file mode 100644 index 0000000000000..3065c2f646f0e --- /dev/null +++ b/python_modules/dagster/dagster_tests/declarative_automation_tests/daemon_tests/definitions/observable_to_eager.py @@ -0,0 +1,12 @@ +import dagster as dg + + +@dg.observable_source_asset(automation_condition=dg.AutomationCondition.on_cron("@hourly")) +def obs() -> None: ... + + +@dg.asset(deps=[obs], automation_condition=dg.AutomationCondition.eager()) +def mat() -> None: ... + + +defs = dg.Definitions(assets=[obs, mat]) diff --git a/python_modules/dagster/dagster_tests/storage_tests/utils/partition_status_cache.py b/python_modules/dagster/dagster_tests/storage_tests/utils/partition_status_cache.py index fdc37412975db..49cc22fbffcab 100644 --- a/python_modules/dagster/dagster_tests/storage_tests/utils/partition_status_cache.py +++ b/python_modules/dagster/dagster_tests/storage_tests/utils/partition_status_cache.py @@ -15,14 +15,19 @@ asset, define_asset_job, ) +from dagster._core.asset_graph_view.asset_graph_view import AssetGraphView from dagster._core.definitions.asset_graph import AssetGraph +from dagster._core.definitions.definitions_class import Definitions from dagster._core.definitions.time_window_partitions import HourlyPartitionsDefinition from dagster._core.events import ( AssetMaterializationPlannedData, DagsterEvent, StepMaterializationData, ) +from dagster._core.instance import DagsterInstance +from dagster._core.loader import LoadingContextForTest from dagster._core.storage.dagster_run import DagsterRunStatus +from dagster._core.storage.event_log.base import AssetRecord from dagster._core.storage.partition_status_cache import ( RUN_FETCH_BATCH_SIZE, build_failed_and_in_progress_partition_subset, @@ -46,6 +51,93 @@ def instance(self, request): def delete_runs_instance(self, instance): return instance + def test_get_cached_status_old_asset_record(self, instance: DagsterInstance) -> None: + partitions_def = StaticPartitionsDefinition(["a", "b"]) + + @asset(partitions_def=partitions_def) + def the_asset() -> None: ... + + defs = Definitions(assets=[the_asset]) + + instance.report_runless_asset_event( + AssetMaterialization(asset_key=the_asset.key, partition="a") + ) + + asset_graph_view = AssetGraphView.for_test(defs, instance) + + updated_cache = get_and_update_asset_status_cache_value( + instance, the_asset.key, partitions_def + ) + assert updated_cache + subset = updated_cache.get_materialized_subset( + asset_graph_view, the_asset.key, partitions_def + ) + assert subset.size == 1 + + run1 = create_run_for_test(instance) + # now partition b is in progress + instance.event_log_storage.store_event( + EventLogEntry( + error_info=None, + level="debug", + user_message="", + run_id=run1.run_id, + timestamp=time.time(), + dagster_event=DagsterEvent( + DagsterEventType.ASSET_MATERIALIZATION_PLANNED.value, + "job1", + event_specific_data=AssetMaterializationPlannedData( + asset_key=the_asset.key, partition="b" + ), + ), + ) + ) + + updated_cache = get_and_update_asset_status_cache_value( + instance, the_asset.key, partitions_def + ) + assert updated_cache + subset = updated_cache.get_in_progress_subset( + asset_graph_view, the_asset.key, partitions_def + ) + assert subset.size == 1 + + # fetch the asset record early, which will cache the return value + loading_context = LoadingContextForTest(instance=instance) + AssetRecord.blocking_get(loading_context, the_asset.key) + + # now partition b gets materialized for real + instance.event_log_storage.store_event( + EventLogEntry( + error_info=None, + level="debug", + user_message="", + run_id=run1.run_id, + timestamp=time.time(), + dagster_event=DagsterEvent( + DagsterEventType.ASSET_MATERIALIZATION.value, + "job1", + event_specific_data=StepMaterializationData( + AssetMaterialization(asset_key=the_asset.key, partition="b") + ), + ), + ) + ) + + # because we're using the old asset record, we should not see the new materialization + updated_cache = get_and_update_asset_status_cache_value( + instance, the_asset.key, partitions_def, loading_context=loading_context + ) + assert updated_cache is not None + in_progress_subset = updated_cache.get_in_progress_subset( + asset_graph_view, the_asset.key, partitions_def + ) + materialized_subset = updated_cache.get_materialized_subset( + asset_graph_view, the_asset.key, partitions_def + ) + assert in_progress_subset.size == 1 + assert materialized_subset.size == 1 + def test_get_cached_status_unpartitioned(self, instance): @asset def asset1():