Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[partition-status-cache][repro] Bug when using old AssetRecord in get_and_update_asset_status_cache_value #27203

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1975,7 +1975,10 @@ def get_latest_tags_by_partition(
return dict(latest_tags_by_partition)

def get_latest_asset_partition_materialization_attempts_without_materializations(
self, asset_key: AssetKey, after_storage_id: Optional[int] = None
self,
asset_key: AssetKey,
after_storage_id: Optional[int] = None,
before_storage_id: Optional[int] = None,
) -> Mapping[str, tuple[str, int]]:
"""Fetch the latest materialzation and materialization planned events for each partition of the given asset.
Return the partitions that have a materialization planned event but no matching (same run) materialization event.
Expand All @@ -1993,6 +1996,7 @@ def get_latest_asset_partition_materialization_attempts_without_materializations
DagsterEventType.ASSET_MATERIALIZATION_PLANNED,
],
after_cursor=after_storage_id,
before_cursor=before_storage_id,
)

latest_events_subquery = db_subquery(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,7 @@ def build_failed_and_in_progress_partition_subset(
incomplete_materializations = instance.event_log_storage.get_latest_asset_partition_materialization_attempts_without_materializations(
asset_key, after_storage_id=after_storage_id
)
print(incomplete_materializations)

failed_partitions: set[str] = set()

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import dagster as dg


@dg.observable_source_asset(automation_condition=dg.AutomationCondition.on_cron("@hourly"))
def obs() -> None: ...


@dg.asset(deps=[obs], automation_condition=dg.AutomationCondition.eager())
def mat() -> None: ...


defs = dg.Definitions(assets=[obs, mat])
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,19 @@
asset,
define_asset_job,
)
from dagster._core.asset_graph_view.asset_graph_view import AssetGraphView
from dagster._core.definitions.asset_graph import AssetGraph
from dagster._core.definitions.definitions_class import Definitions
from dagster._core.definitions.time_window_partitions import HourlyPartitionsDefinition
from dagster._core.events import (
AssetMaterializationPlannedData,
DagsterEvent,
StepMaterializationData,
)
from dagster._core.instance import DagsterInstance
from dagster._core.loader import LoadingContextForTest
from dagster._core.storage.dagster_run import DagsterRunStatus
from dagster._core.storage.event_log.base import AssetRecord
from dagster._core.storage.partition_status_cache import (
RUN_FETCH_BATCH_SIZE,
build_failed_and_in_progress_partition_subset,
Expand All @@ -46,6 +51,93 @@ def instance(self, request):
def delete_runs_instance(self, instance):
return instance

def test_get_cached_status_old_asset_record(self, instance: DagsterInstance) -> None:
partitions_def = StaticPartitionsDefinition(["a", "b"])

@asset(partitions_def=partitions_def)
def the_asset() -> None: ...

defs = Definitions(assets=[the_asset])

instance.report_runless_asset_event(
AssetMaterialization(asset_key=the_asset.key, partition="a")
)

asset_graph_view = AssetGraphView.for_test(defs, instance)

updated_cache = get_and_update_asset_status_cache_value(
instance, the_asset.key, partitions_def
)
assert updated_cache
subset = updated_cache.get_materialized_subset(
asset_graph_view, the_asset.key, partitions_def
)
assert subset.size == 1

run1 = create_run_for_test(instance)
# now partition b is in progress
instance.event_log_storage.store_event(
EventLogEntry(
error_info=None,
level="debug",
user_message="",
run_id=run1.run_id,
timestamp=time.time(),
dagster_event=DagsterEvent(
DagsterEventType.ASSET_MATERIALIZATION_PLANNED.value,
"job1",
event_specific_data=AssetMaterializationPlannedData(
asset_key=the_asset.key, partition="b"
),
),
)
)

updated_cache = get_and_update_asset_status_cache_value(
instance, the_asset.key, partitions_def
)
assert updated_cache
subset = updated_cache.get_in_progress_subset(
asset_graph_view, the_asset.key, partitions_def
)
assert subset.size == 1

# fetch the asset record early, which will cache the return value
loading_context = LoadingContextForTest(instance=instance)
AssetRecord.blocking_get(loading_context, the_asset.key)

# now partition b gets materialized for real
instance.event_log_storage.store_event(
EventLogEntry(
error_info=None,
level="debug",
user_message="",
run_id=run1.run_id,
timestamp=time.time(),
dagster_event=DagsterEvent(
DagsterEventType.ASSET_MATERIALIZATION.value,
"job1",
event_specific_data=StepMaterializationData(
AssetMaterialization(asset_key=the_asset.key, partition="b")
),
),
)
)

# because we're using the old asset record, we should not see the new materialization
updated_cache = get_and_update_asset_status_cache_value(
instance, the_asset.key, partitions_def, loading_context=loading_context
)
assert updated_cache is not None
in_progress_subset = updated_cache.get_in_progress_subset(
asset_graph_view, the_asset.key, partitions_def
)
materialized_subset = updated_cache.get_materialized_subset(
asset_graph_view, the_asset.key, partitions_def
)
assert in_progress_subset.size == 1
assert materialized_subset.size == 1

def test_get_cached_status_unpartitioned(self, instance):
@asset
def asset1():
Expand Down
Loading