diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py index 811442f7da14a..6bd96f3aa7eee 100644 --- a/airflow/models/dagrun.py +++ b/airflow/models/dagrun.py @@ -41,7 +41,6 @@ from airflow.exceptions import AirflowException, TaskNotFound from airflow.models.base import ID_LEN, Base from airflow.models.taskinstance import TaskInstance as TI -from airflow.settings import task_instance_mutation_hook from airflow.stats import Stats from airflow.ti_deps.dep_context import DepContext from airflow.ti_deps.dependencies_states import SCHEDULEABLE_STATES @@ -633,6 +632,8 @@ def verify_integrity(self, session: Session = None): :param session: Sqlalchemy ORM Session :type session: Session """ + from airflow.settings import task_instance_mutation_hook + dag = self.get_dag() tis = self.get_task_instances(session=session) diff --git a/tests/models/test_dagrun.py b/tests/models/test_dagrun.py index ad2bbcbfd6be8..78991999ffde1 100644 --- a/tests/models/test_dagrun.py +++ b/tests/models/test_dagrun.py @@ -611,7 +611,7 @@ def test_already_added_task_instances_can_be_ignored(self): assert State.NONE == first_ti.state @parameterized.expand([(state,) for state in State.task_states]) - @mock.patch('airflow.models.dagrun.task_instance_mutation_hook') + @mock.patch('airflow.settings.task_instance_mutation_hook') def test_task_instance_mutation_hook(self, state, mock_hook): def mutate_task_instance(task_instance): if task_instance.queue == 'queue1':