From 31ec3f2618f8c17b6f3f36165c02d4b72f21d6aa Mon Sep 17 00:00:00 2001 From: alexey bashtanov Date: Wed, 18 Sep 2024 10:12:09 +0100 Subject: [PATCH] tests/migrations: add failure injector to the end2end test --- .../rptest/tests/data_migrations_api_test.py | 345 +++++++++--------- 1 file changed, 180 insertions(+), 165 deletions(-) diff --git a/tests/rptest/tests/data_migrations_api_test.py b/tests/rptest/tests/data_migrations_api_test.py index 5d269a79bf9ab..297fb3f9cbae2 100644 --- a/tests/rptest/tests/data_migrations_api_test.py +++ b/tests/rptest/tests/data_migrations_api_test.py @@ -19,6 +19,7 @@ from rptest.clients.rpk import RpkTool from ducktape.mark import matrix, parametrize, ok_to_fail from contextlib import nullcontext +from rptest.tests.e2e_finjector import Finjector from rptest.tests.data_migrations import DataMigrationsTest, make_namespaced_topic @@ -105,192 +106,206 @@ def test_creating_and_listing_migrations(self, topics_count, params=generate_tmptpdi_params()) def test_migrated_topic_data_integrity(self, transfer_leadership: bool, params: TmtpdiParams): - rpk = RpkTool(self.redpanda) - self.redpanda.si_settings.set_expected_damage( - {"ntr_no_topic_manifest", "missing_segments"}) + with Finjector(self.redpanda, self.scale).finj_thread(): + rpk = RpkTool(self.redpanda) + self.redpanda.si_settings.set_expected_damage( + {"ntr_no_topic_manifest", "missing_segments"}) - workload_topic = TopicSpec(partition_count=32) + workload_topic = TopicSpec(partition_count=32) - self.client().create_topic(workload_topic) + self.client().create_topic(workload_topic) - producer = self.start_producer(workload_topic.name) + producer = self.start_producer(workload_topic.name) - out_tl_thread = TransferLeadersBackgroundThread( - self.redpanda, - workload_topic.name) if transfer_leadership else nullcontext() - with out_tl_thread: - admin = Admin(self.redpanda) - workload_ns_topic = make_namespaced_topic(workload_topic.name) - out_migration = OutboundDataMigration(topics=[workload_ns_topic], - consumer_groups=[]) - out_migration_id = self.create_and_wait(out_migration) + out_tl_thread = TransferLeadersBackgroundThread( + self.redpanda, + workload_topic.name) if transfer_leadership else nullcontext() + with out_tl_thread: + admin = Admin(self.redpanda) + workload_ns_topic = make_namespaced_topic(workload_topic.name) + out_migration = OutboundDataMigration( + topics=[workload_ns_topic], consumer_groups=[]) + out_migration_id = self.create_and_wait(out_migration) - admin.execute_data_migration_action(out_migration_id, - MigrationAction.prepare) - if params.cancellation == CancellationStage('out', 'preparing'): - self.wait_for_migration_states(out_migration_id, - ['preparing', 'prepared']) - return self.cancel_outbound(out_migration_id, - workload_topic.name, producer) - - self.validate_topic_access(topic=workload_topic.name, - metadata_locked=True, - read_blocked=False, - produce_blocked=False) - - self.wait_for_migration_states(out_migration_id, ['prepared']) - - self.validate_topic_access(topic=workload_topic.name, - metadata_locked=True, - read_blocked=False, - produce_blocked=False) - if params.cancellation == CancellationStage('out', 'prepared'): - return self.cancel_outbound(out_migration_id, - workload_topic.name, producer) - - admin.execute_data_migration_action(out_migration_id, - MigrationAction.execute) - if params.cancellation == CancellationStage('out', 'executing'): - self.wait_for_migration_states(out_migration_id, - ['executing', 'executed']) - return self.cancel_outbound(out_migration_id, - workload_topic.name, producer) - - self.validate_topic_access(topic=workload_topic.name, - metadata_locked=True, - read_blocked=False, - produce_blocked=True) - - self.wait_for_migration_states(out_migration_id, ['executed']) - self.validate_topic_access(topic=workload_topic.name, - metadata_locked=True, - read_blocked=False, - produce_blocked=True) - if params.cancellation == CancellationStage('out', 'executed'): - return self.cancel_outbound(out_migration_id, - workload_topic.name, producer) - - admin.execute_data_migration_action(out_migration_id, - MigrationAction.finish) - - self.validate_topic_access(topic=workload_topic.name, - metadata_locked=True, - read_blocked=True, - produce_blocked=True, - assert_topic_present=False) - - self.wait_for_migration_states(out_migration_id, - ['cut_over', 'finished']) - - producer.stop_if_running() - - self.assert_no_topics() - - self.wait_for_migration_states(out_migration_id, ['finished']) - self.validate_topic_access(topic=workload_topic.name, - metadata_locked=True, - read_blocked=True, - produce_blocked=True, - assert_topic_present=False) - - admin.delete_data_migration(out_migration_id) - - # attach topic back - inbound_topic_name = "aliased-workload-topic" if params.use_alias else workload_topic.name - alias = None - if params.use_alias: - alias = make_namespaced_topic(topic=inbound_topic_name) - - in_tl_thread = TransferLeadersBackgroundThread( - self.redpanda, - inbound_topic_name) if transfer_leadership else nullcontext() - with in_tl_thread: - remounted = False - # two cycles max: to cancel halfway and to complete + check e2e - while not remounted: - in_migration = InboundDataMigration(topics=[ - InboundTopic(src_topic=workload_ns_topic, alias=alias) - ], - consumer_groups=[]) - in_migration_id = self.create_and_wait(in_migration) - - # check if topic that is being migrated can not be created even if - # migration has not yet been prepared - self._do_validate_topic_operation( - inbound_topic_name, - "creation", - expected_to_pass=False, - operation=lambda topic: rpk.create_topic(topic=topic, - replicas=3)) - admin.execute_data_migration_action(in_migration_id, + admin.execute_data_migration_action(out_migration_id, MigrationAction.prepare) - - if params.cancellation == CancellationStage('in', 'preparing'): - cancellation = None - self.wait_for_migration_states(in_migration_id, + if params.cancellation == CancellationStage( + 'out', 'preparing'): + self.wait_for_migration_states(out_migration_id, ['preparing', 'prepared']) - self.cancel_inbound(in_migration_id, inbound_topic_name) - continue + return self.cancel_outbound(out_migration_id, + workload_topic.name, producer) - self.validate_topic_access(topic=inbound_topic_name, + self.validate_topic_access(topic=workload_topic.name, metadata_locked=True, - read_blocked=True, - produce_blocked=True, - assert_topic_present=False) + read_blocked=False, + produce_blocked=False) - self.wait_for_migration_states(in_migration_id, ['prepared']) + self.wait_for_migration_states(out_migration_id, ['prepared']) - self.validate_topic_access(topic=inbound_topic_name, + self.validate_topic_access(topic=workload_topic.name, metadata_locked=True, - read_blocked=True, - produce_blocked=True) - - if params.cancellation == CancellationStage('in', 'prepared'): - cancellation = None - self.cancel_inbound(in_migration_id, inbound_topic_name) - continue - - topics = list(rpk.list_topics()) - self.logger.info( - f"topic list after inbound migration is prepared: {topics}" - ) - assert inbound_topic_name in topics, "workload topic should be present after the inbound migration is prepared" + read_blocked=False, + produce_blocked=False) + if params.cancellation == CancellationStage('out', 'prepared'): + return self.cancel_outbound(out_migration_id, + workload_topic.name, producer) - admin.execute_data_migration_action(in_migration_id, + admin.execute_data_migration_action(out_migration_id, MigrationAction.execute) - if params.cancellation == CancellationStage('in', 'executing'): - cancellation = None - self.wait_for_migration_states(in_migration_id, + if params.cancellation == CancellationStage( + 'out', 'executing'): + self.wait_for_migration_states(out_migration_id, ['executing', 'executed']) - self.cancel_inbound(in_migration_id, inbound_topic_name) - continue + return self.cancel_outbound(out_migration_id, + workload_topic.name, producer) - self.validate_topic_access(topic=inbound_topic_name, + self.validate_topic_access(topic=workload_topic.name, metadata_locked=True, - read_blocked=True, + read_blocked=False, produce_blocked=True) - self.wait_for_migration_states(in_migration_id, ['executed']) - - self.validate_topic_access(topic=inbound_topic_name, + self.wait_for_migration_states(out_migration_id, ['executed']) + self.validate_topic_access(topic=workload_topic.name, metadata_locked=True, - read_blocked=True, + read_blocked=False, produce_blocked=True) - if params.cancellation == CancellationStage('in', 'executed'): - cancellation = None - self.cancel_inbound(in_migration_id, inbound_topic_name) - continue + if params.cancellation == CancellationStage('out', 'executed'): + return self.cancel_outbound(out_migration_id, + workload_topic.name, producer) - admin.execute_data_migration_action(in_migration_id, + admin.execute_data_migration_action(out_migration_id, MigrationAction.finish) - self.wait_for_migration_states(in_migration_id, ['finished']) - admin.delete_data_migration(in_migration_id) - # now the topic should be fully operational - self.consume_and_validate(inbound_topic_name, - producer.acked_records) - self.validate_topic_access(topic=inbound_topic_name, - metadata_locked=False, - read_blocked=False, - produce_blocked=False) - remounted = True + self.validate_topic_access(topic=workload_topic.name, + metadata_locked=True, + read_blocked=True, + produce_blocked=True, + assert_topic_present=False) + + self.wait_for_migration_states(out_migration_id, + ['cut_over', 'finished']) + + producer.stop_if_running() + + self.assert_no_topics() + + self.wait_for_migration_states(out_migration_id, ['finished']) + self.validate_topic_access(topic=workload_topic.name, + metadata_locked=True, + read_blocked=True, + produce_blocked=True, + assert_topic_present=False) + + admin.delete_data_migration(out_migration_id) + + # attach topic back + inbound_topic_name = "aliased-workload-topic" if params.use_alias else workload_topic.name + alias = None + if params.use_alias: + alias = make_namespaced_topic(topic=inbound_topic_name) + + in_tl_thread = TransferLeadersBackgroundThread( + self.redpanda, + inbound_topic_name) if transfer_leadership else nullcontext() + with in_tl_thread: + remounted = False + # two cycles max: to cancel halfway and to complete + check e2e + while not remounted: + in_migration = InboundDataMigration(topics=[ + InboundTopic(src_topic=workload_ns_topic, alias=alias) + ], + consumer_groups=[]) + in_migration_id = self.create_and_wait(in_migration) + + # check if topic that is being migrated can not be created even if + # migration has not yet been prepared + self._do_validate_topic_operation( + inbound_topic_name, + "creation", + expected_to_pass=False, + operation=lambda topic: rpk.create_topic(topic=topic, + replicas=3)) + admin.execute_data_migration_action( + in_migration_id, MigrationAction.prepare) + + if params.cancellation == CancellationStage( + 'in', 'preparing'): + cancellation = None + self.wait_for_migration_states( + in_migration_id, ['preparing', 'prepared']) + self.cancel_inbound(in_migration_id, + inbound_topic_name) + continue + + self.validate_topic_access(topic=inbound_topic_name, + metadata_locked=True, + read_blocked=True, + produce_blocked=True, + assert_topic_present=False) + + self.wait_for_migration_states(in_migration_id, + ['prepared']) + + self.validate_topic_access(topic=inbound_topic_name, + metadata_locked=True, + read_blocked=True, + produce_blocked=True) + + if params.cancellation == CancellationStage( + 'in', 'prepared'): + cancellation = None + self.cancel_inbound(in_migration_id, + inbound_topic_name) + continue + + topics = list(rpk.list_topics()) + self.logger.info( + f"topic list after inbound migration is prepared: {topics}" + ) + assert inbound_topic_name in topics, "workload topic should be present after the inbound migration is prepared" + + admin.execute_data_migration_action( + in_migration_id, MigrationAction.execute) + if params.cancellation == CancellationStage( + 'in', 'executing'): + cancellation = None + self.wait_for_migration_states( + in_migration_id, ['executing', 'executed']) + self.cancel_inbound(in_migration_id, + inbound_topic_name) + continue + + self.validate_topic_access(topic=inbound_topic_name, + metadata_locked=True, + read_blocked=True, + produce_blocked=True) + + self.wait_for_migration_states(in_migration_id, + ['executed']) + + self.validate_topic_access(topic=inbound_topic_name, + metadata_locked=True, + read_blocked=True, + produce_blocked=True) + if params.cancellation == CancellationStage( + 'in', 'executed'): + cancellation = None + self.cancel_inbound(in_migration_id, + inbound_topic_name) + continue + + admin.execute_data_migration_action( + in_migration_id, MigrationAction.finish) + + self.wait_for_migration_states(in_migration_id, + ['finished']) + admin.delete_data_migration(in_migration_id) + # now the topic should be fully operational + self.consume_and_validate(inbound_topic_name, + producer.acked_records) + self.validate_topic_access(topic=inbound_topic_name, + metadata_locked=False, + read_blocked=False, + produce_blocked=False) + remounted = True