Skip to content

Commit

Permalink
Stop reserializing DAGs during db migration (apache#45362)
Browse files Browse the repository at this point in the history
At first, this may seem strange, but we already have to deal with older
versions of serialized DAGs - so the necessity of reserializing during
migrations isn't there any longer. The DAG processor can simply do it
once Airflow has started back up.
  • Loading branch information
jedcunningham authored and Lefteris Gilmaz committed Jan 5, 2025
1 parent faa3723 commit 946bd01
Show file tree
Hide file tree
Showing 5 changed files with 9 additions and 41 deletions.
9 changes: 0 additions & 9 deletions airflow/cli/cli_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -588,14 +588,6 @@ def string_lower_type(val):
type=int,
default=60,
)
ARG_DB_RESERIALIZE_DAGS = Arg(
("--no-reserialize-dags",),
# Not intended for user, so dont show in help
help=argparse.SUPPRESS,
action="store_false",
default=True,
dest="reserialize_dags",
)
ARG_DB_VERSION__UPGRADE = Arg(
("-n", "--to-version"),
help=(
Expand Down Expand Up @@ -1473,7 +1465,6 @@ class GroupCommand(NamedTuple):
ARG_DB_SQL_ONLY,
ARG_DB_FROM_REVISION,
ARG_DB_FROM_VERSION,
ARG_DB_RESERIALIZE_DAGS,
ARG_VERBOSE,
),
),
Expand Down
22 changes: 7 additions & 15 deletions airflow/cli/commands/local_commands/db_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def _get_version_revision(
return _get_version_revision(new_version, recursion_limit)


def run_db_migrate_command(args, command, revision_heads_map: dict[str, str], reserialize_dags: bool = True):
def run_db_migrate_command(args, command, revision_heads_map: dict[str, str]):
"""
Run the db migrate command.
Expand Down Expand Up @@ -122,19 +122,11 @@ def run_db_migrate_command(args, command, revision_heads_map: dict[str, str], re
print(f"Performing upgrade to the metadata database {settings.engine.url!r}")
else:
print("Generating sql for upgrade -- upgrade commands will *not* be submitted.")
if reserialize_dags:
command(
to_revision=to_revision,
from_revision=from_revision,
show_sql_only=args.show_sql_only,
reserialize_dags=True,
)
else:
command(
to_revision=to_revision,
from_revision=from_revision,
show_sql_only=args.show_sql_only,
)
command(
to_revision=to_revision,
from_revision=from_revision,
show_sql_only=args.show_sql_only,
)
if not args.show_sql_only:
print("Database migrating done!")

Expand Down Expand Up @@ -202,7 +194,7 @@ def migratedb(args):
raise SystemExit(f"Invalid version {args.from_version!r} supplied as `--from-version`.")
if parsed_version < parse_version("2.0.0"):
raise SystemExit("--from-version must be greater or equal to 2.0.0")
run_db_migrate_command(args, db.upgradedb, _REVISION_HEADS_MAP, reserialize_dags=True)
run_db_migrate_command(args, db.upgradedb, _REVISION_HEADS_MAP)


@cli_utils.action_cli(check_db=False)
Expand Down
13 changes: 0 additions & 13 deletions airflow/utils/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -921,14 +921,6 @@ def check_and_run_migrations():
sys.exit(1)


def _reserialize_dags(*, session: Session) -> None:
from airflow.models.dagbag import DagBag

dagbag = DagBag(collect_dags=False)
dagbag.collect_dags(only_if_updated=False)
dagbag.sync_to_db(session=session)


@provide_session
def synchronize_log_template(*, session: Session = NEW_SESSION) -> None:
"""
Expand Down Expand Up @@ -1167,8 +1159,6 @@ def upgradedb(
with create_global_lock(session=session, lock=DBLocks.MIGRATIONS):
import sqlalchemy.pool

previous_revision = _get_current_revision(session=session)

log.info("Migrating the Airflow database")
val = os.environ.get("AIRFLOW__DATABASE__SQL_ALCHEMY_MAX_SIZE")
try:
Expand All @@ -1192,9 +1182,6 @@ def upgradedb(
os.environ["AIRFLOW__DATABASE__SQL_ALCHEMY_MAX_SIZE"] = val
settings.reconfigure_orm()

if reserialize_dags and current_revision != previous_revision:
log.info("Reserializing the DAGs")
_reserialize_dags(session=session)
add_default_pool_if_not_exists(session=session)
synchronize_log_template(session=session)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,7 @@ def migratedb(args):
"""Migrates the metadata database."""
session = settings.Session()
upgrade_command = FABDBManager(session).upgradedb
run_db_migrate_command(
args, upgrade_command, revision_heads_map=_REVISION_HEADS_MAP, reserialize_dags=False
)
run_db_migrate_command(args, upgrade_command, revision_heads_map=_REVISION_HEADS_MAP)


@cli_utils.action_cli(check_db=False)
Expand Down
2 changes: 1 addition & 1 deletion tests/cli/commands/local_commands/test_db_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ def test_cli_check_migrations(self, mock_wait_for_migrations):
def test_cli_upgrade_success(self, mock_upgradedb, args, called_with):
# TODO(ephraimbuddy): Revisit this when we add more migration files and use other versions/revisions other than 2.10.0/22ed7efa9da2
db_command.migratedb(self.parser.parse_args(["db", "migrate", *args]))
mock_upgradedb.assert_called_once_with(**called_with, reserialize_dags=True)
mock_upgradedb.assert_called_once_with(**called_with)

@pytest.mark.parametrize(
"args, pattern",
Expand Down

0 comments on commit 946bd01

Please sign in to comment.