diff --git a/tests/rptest/tests/data_migrations_api_test.py b/tests/rptest/tests/data_migrations_api_test.py index 06c3b76d1a3f1..3be40921a577a 100644 --- a/tests/rptest/tests/data_migrations_api_test.py +++ b/tests/rptest/tests/data_migrations_api_test.py @@ -9,7 +9,8 @@ import random import threading import time -from typing import Callable +from enum import Enum +from typing import Callable, NamedTuple, Literal, List import typing from requests.exceptions import ConnectionError @@ -68,6 +69,31 @@ def _loop(self): ) +class CancellationStage(NamedTuple): + dir: Literal['in', 'out'] + stage: Literal['planned', 'preparing', 'prepared', 'executing', 'executed'] + + +class TmtpdiParams(NamedTuple): + cancellation: CancellationStage | None + use_alias: bool + + +def generate_tmptpdi_params() -> List[TmtpdiParams]: + cancelation_stages = [ + CancellationStage(dir, stage) + for dir in typing.get_args(CancellationStage.dir) + for stage in typing.get_args(CancellationStage.stage) + ] + return [ + TmtpdiParams(cancelation, use_alias) + for cancelation in [None] + cancelation_stages + for use_alias in (True, False) + # alias only affects inbound, pointless to vary if cancel earlier + if not use_alias or cancelation is None or cancelation.dir == 'in' + ] + + class DataMigrationsApiTest(RedpandaTest): log_segment_size = 10 * 1024 @@ -420,14 +446,10 @@ def cancel_inbound(self, migration_id, topic_name): self.assert_no_topics() @cluster(num_nodes=4, log_allow_list=MIGRATION_LOG_ALLOW_LIST) - @matrix(use_alias=[True, False], - transfer_leadership=[True, False], - cancellation=[None] + - [(dir, stage) for dir in ('in', 'out') - for stage in ('planned', 'preparing', 'prepared', 'executing', - 'executed')]) - def test_migrated_topic_data_integrity(self, use_alias, - transfer_leadership, cancellation): + @matrix(transfer_leadership=[True, False], + params=generate_tmptpdi_params()) + def test_migrated_topic_data_integrity(self, transfer_leadership: bool, + params: TmtpdiParams): rpk = RpkTool(self.redpanda) self.redpanda.si_settings.set_expected_damage( {"ntr_no_topic_manifest", "missing_segments"}) @@ -448,13 +470,13 @@ def test_migrated_topic_data_integrity(self, use_alias, consumer_groups=[]) out_migration_id = self.create_and_wait(out_migration) - if cancellation == ('out', 'planned'): + if params.cancellation == CancellationStage('out', 'planned'): return self.cancel_outbound(out_migration_id, workload_topic.name, producer) admin.execute_data_migration_action(out_migration_id, MigrationAction.prepare) - if cancellation == ('out', 'preparing'): + if params.cancellation == CancellationStage('out', 'preparing'): self.wait_for_migration_states(out_migration_id, ['preparing', 'prepared']) return self.cancel_outbound(out_migration_id, @@ -471,13 +493,13 @@ def test_migrated_topic_data_integrity(self, use_alias, metadata_locked=True, read_blocked=False, produce_blocked=False) - if cancellation == ('out', 'prepared'): + if params.cancellation == CancellationStage('out', 'prepared'): return self.cancel_outbound(out_migration_id, workload_topic.name, producer) admin.execute_data_migration_action(out_migration_id, MigrationAction.execute) - if cancellation == ('out', 'executing'): + if params.cancellation == CancellationStage('out', 'executing'): self.wait_for_migration_states(out_migration_id, ['executing', 'executed']) return self.cancel_outbound(out_migration_id, @@ -493,7 +515,7 @@ def test_migrated_topic_data_integrity(self, use_alias, metadata_locked=True, read_blocked=False, produce_blocked=True) - if cancellation == ('out', 'executed'): + if params.cancellation == CancellationStage('out', 'executed'): return self.cancel_outbound(out_migration_id, workload_topic.name, producer) @@ -523,9 +545,9 @@ def test_migrated_topic_data_integrity(self, use_alias, admin.delete_data_migration(out_migration_id) # attach topic back - inbound_topic_name = "aliased-workload-topic" if use_alias else workload_topic.name + inbound_topic_name = "aliased-workload-topic" if params.use_alias else workload_topic.name alias = None - if use_alias: + if params.use_alias: alias = NamespacedTopic(topic=inbound_topic_name) in_tl_thread = TransferLeadersBackgroundThread( @@ -549,7 +571,7 @@ def test_migrated_topic_data_integrity(self, use_alias, expected_to_pass=False, operation=lambda topic: rpk.create_topic(topic=topic, replicas=3)) - if cancellation == ('in', 'planned'): + if params.cancellation == CancellationStage('in', 'planned'): cancellation = None self.cancel_inbound(in_migration_id, inbound_topic_name) continue @@ -557,7 +579,7 @@ def test_migrated_topic_data_integrity(self, use_alias, admin.execute_data_migration_action(in_migration_id, MigrationAction.prepare) - if cancellation == ('in', 'preparing'): + if params.cancellation == CancellationStage('in', 'preparing'): cancellation = None self.wait_for_migration_states(in_migration_id, ['preparing', 'prepared']) @@ -577,7 +599,7 @@ def test_migrated_topic_data_integrity(self, use_alias, read_blocked=True, produce_blocked=True) - if cancellation == ('in', 'prepared'): + if params.cancellation == CancellationStage('in', 'prepared'): cancellation = None self.cancel_inbound(in_migration_id, inbound_topic_name) continue @@ -590,7 +612,7 @@ def test_migrated_topic_data_integrity(self, use_alias, admin.execute_data_migration_action(in_migration_id, MigrationAction.execute) - if cancellation == ('in', 'executing'): + if params.cancellation == CancellationStage('in', 'executing'): cancellation = None self.wait_for_migration_states(in_migration_id, ['executing', 'executed']) @@ -608,7 +630,7 @@ def test_migrated_topic_data_integrity(self, use_alias, metadata_locked=True, read_blocked=True, produce_blocked=True) - if cancellation == ('in', 'executed'): + if params.cancellation == CancellationStage('in', 'executed'): cancellation = None self.cancel_inbound(in_migration_id, inbound_topic_name) continue