From 7153b4660410ca700925686109a2ef5f8ece3a34 Mon Sep 17 00:00:00 2001 From: alexey bashtanov Date: Tue, 3 Sep 2024 17:14:44 +0100 Subject: [PATCH] tests/migrations: test migration cancelation --- .../rptest/tests/data_migrations_api_test.py | 428 +++++++++++------- 1 file changed, 256 insertions(+), 172 deletions(-) diff --git a/tests/rptest/tests/data_migrations_api_test.py b/tests/rptest/tests/data_migrations_api_test.py index d5b410c59a6f7..dd391b5900a96 100644 --- a/tests/rptest/tests/data_migrations_api_test.py +++ b/tests/rptest/tests/data_migrations_api_test.py @@ -27,6 +27,7 @@ from rptest.tests.e2e_finjector import Finjector from rptest.clients.rpk import RpkTool from ducktape.mark import matrix +from contextlib import nullcontext import requests @@ -40,10 +41,10 @@ def __init__(self, redpanda: RedpandaServiceBase, topic: str): self.thread = threading.Thread(target=lambda: self._loop()) self.thread.daemon = True - def start(self): + def __enter__(self): self.thread.start() - def stop(self): + def __exit__(self, exc_type, exc_value, traceback): self.stop_ev.set() def _loop(self): @@ -248,10 +249,6 @@ def test_creating_and_listing_migrations(self): f'topic {t.name} is {self.client().describe_topic(t.name)}' ) - # TODO: check unhappy scenarios like this - # admin.execute_data_migration_action(out_migration_id, - # MigrationAction.cancel) - # self.wait_for_migration_state(out_migration_id, 'canceling') # todo: fix rp_storage_tool to use overridden topic names self.redpanda.si_settings.set_expected_damage( {"ntr_no_topic_manifest", "missing_segments"}) @@ -269,24 +266,34 @@ def producer_throughput(self): return 1024 if self.debug_mode else 1024 * 1024 def start_producer(self, topic): + class ProducerWrapper: + def __init__(self, *args, msg_count, **kwargs): + self.producer = KgoVerifierProducer(*args, **kwargs) + self.producer.start(clean=False) + wait_until( \ + lambda: self.producer.produce_status.acked > msg_count, + timeout_sec=120, + backoff_sec=1) + + def stop_if_running(self): + if self.producer: + self.producer.stop() + self.acked_records = self.producer.produce_status.acked + self.producer.free() + self.producer = None + self.logger.info( f"starting kgo-verifier producer with {self.msg_count} messages of size {self.msg_size} and throughput: {self.producer_throughput} bps" ) - producer = KgoVerifierProducer( + return ProducerWrapper( self.test_context, self.redpanda, topic, self.msg_size, 100000000, #we do not want to limit number of messages + msg_count=self.msg_count, rate_limit_bps=self.producer_throughput) - producer.start(clean=False) - - wait_until(lambda: producer.produce_status.acked > 10, - timeout_sec=120, - backoff_sec=1) - return producer - def start_consumer(self, topic): consumer = KgoVerifierConsumerGroupConsumer(self.test_context, self.redpanda, @@ -312,8 +319,8 @@ def _do_validate_topic_operation(self, topic: str, op_name: str, ) success = False - assert expected_to_pass == success, f"Operation {op_name} outcome is not \ - expected. Expected to pass: {expected_to_pass}, succeeded: {success}" + assert expected_to_pass == success, f"Operation {op_name} outcome is not " \ + f"expected. Expected to pass: {expected_to_pass}, succeeded: {success}" def validate_topic_access( self, @@ -324,11 +331,10 @@ def validate_topic_access( test_add_partitions: bool = True, assert_topic_present: bool = True, ): - rpk = RpkTool(self.redpanda) if assert_topic_present: - assert topic in rpk.list_topics( - ), f"validated topic {topic} must be present" + assert topic in rpk.list_topics(), \ + f"validated topic {topic} must be present" if test_add_partitions: self._do_validate_topic_operation( @@ -346,14 +352,11 @@ def _alter_cfg(topic): expected_to_pass=not metadata_locked, operation=_alter_cfg) - #TODO: actually validate read blocking after it is added - if not read_blocked: - self._do_validate_topic_operation( - topic=topic, - op_name="read", - expected_to_pass=not read_blocked, - operation=lambda topic: rpk.consume(topic=topic, n=1, offset=0 - )) + self._do_validate_topic_operation( + topic=topic, + op_name="read", + expected_to_pass=not read_blocked, + operation=lambda topic: rpk.consume(topic=topic, n=1, offset=0)) # check if topic is writable only if it is expected to be blocked not to disturb the verifiers. if produce_blocked: @@ -365,177 +368,258 @@ def _alter_cfg(topic): operation=lambda topic: rpk.produce( topic=topic, key="test-key", msg='test-msg')) - @cluster(num_nodes=4) - @matrix(use_alias=[True, False], transfer_leadership=[True, False]) - def test_migrated_topic_data_integrity(self, use_alias, - transfer_leadership): - workload_topic = TopicSpec(partition_count=32) - - self.client().create_topic(workload_topic) - - producer = self.start_producer(workload_topic.name) + def consume_and_validate(self, topic_name, expected_records): + consumer = self.start_consumer(topic=topic_name) + wait_until( + lambda: \ + consumer._status.validator.valid_reads >= expected_records, + timeout_sec=180, + backoff_sec=0.5, + err_msg= + f"Error waiting for consumer to see all {expected_records} produced messages", + ) + consumer.wait() + consumer.stop() + #self.redpanda.si_settings.set_expected_damage( + # {"ntr_no_topic_manifest", "missing_segments"}) - wait_until(lambda: producer.produce_status.acked > self.msg_count, - timeout_sec=60, - backoff_sec=1) - if transfer_leadership: - tl_thread = TransferLeadersBackgroundThread( - self.redpanda, workload_topic.name) - tl_thread.start() + def cancel(self, migration_id, topic_name): admin = Admin(self.redpanda) - workload_ns_topic = NamespacedTopic(workload_topic.name) - out_migration = OutboundDataMigration(topics=[workload_ns_topic], - consumer_groups=[]) - out_migration_id = self.create_and_wait(out_migration) - admin.execute_data_migration_action(out_migration_id, - MigrationAction.prepare) - - self.validate_topic_access(topic=workload_topic.name, - metadata_locked=True, - read_blocked=False, - produce_blocked=False) - - self.wait_for_migration_states(out_migration_id, ['prepared']) + admin.execute_data_migration_action(migration_id, + MigrationAction.cancel) + self.logger.info('waiting for cancelled') + self.wait_for_migration_states(migration_id, ['cancelled']) + admin.delete_data_migration(migration_id) - self.validate_topic_access(topic=workload_topic.name, - metadata_locked=True, - read_blocked=False, - produce_blocked=False) + def assert_no_topics(self): + rpk = RpkTool(self.redpanda) + topics = list(rpk.list_topics()) + self.logger.info(f"topic list: {topics}") - admin.execute_data_migration_action(out_migration_id, - MigrationAction.execute) + assert len(topics) == 0, \ + "outbound migration complete, inbound migration not complete " \ + "and not in progress, so the topic should be removed" - self.validate_topic_access(topic=workload_topic.name, - metadata_locked=True, + def cancel_outbound(self, migration_id, topic_name, producer): + self.cancel(migration_id, topic_name) + producer.stop_if_running() + self.consume_and_validate(topic_name, producer.acked_records) + self.validate_topic_access(topic=topic_name, + metadata_locked=False, read_blocked=False, - produce_blocked=True) + produce_blocked=False, + assert_topic_present=False, + test_add_partitions=True) - self.wait_for_migration_states(out_migration_id, ['executed']) + def cancel_inbound(self, migration_id, topic_name): + self.cancel(migration_id, topic_name) + self.assert_no_topics() - self.validate_topic_access(topic=workload_topic.name, - metadata_locked=True, - read_blocked=False, - produce_blocked=True) + @cluster(num_nodes=4) + @matrix(use_alias=[True, False], + transfer_leadership=[True, False], + cancellation=[None] + + [(dir, stage) for dir in ('in', 'out') + for stage in ('planned', 'preparing', 'prepared', 'executing', + 'executed')]) + def test_migrated_topic_data_integrity(self, use_alias, + transfer_leadership, cancellation): + rpk = RpkTool(self.redpanda) + self.redpanda.si_settings.set_expected_damage( + {"ntr_no_topic_manifest", "missing_segments"}) - admin.execute_data_migration_action(out_migration_id, - MigrationAction.finish) + workload_topic = TopicSpec(partition_count=32) - self.validate_topic_access(topic=workload_topic.name, - metadata_locked=True, - read_blocked=True, - produce_blocked=True, - assert_topic_present=False) + self.client().create_topic(workload_topic) - self.wait_for_migration_states(out_migration_id, - ['cut_over', 'finished']) + producer = self.start_producer(workload_topic.name) - producer.stop() - producer_acked_messages = producer.produce_status.acked - producer.free() + out_tl_thread = TransferLeadersBackgroundThread( + self.redpanda, + workload_topic.name) if transfer_leadership else nullcontext() + with out_tl_thread: + admin = Admin(self.redpanda) + workload_ns_topic = NamespacedTopic(workload_topic.name) + out_migration = OutboundDataMigration(topics=[workload_ns_topic], + consumer_groups=[]) + out_migration_id = self.create_and_wait(out_migration) - rpk = RpkTool(self.redpanda) + if cancellation == ('out', 'planned'): + return self.cancel_outbound(out_migration_id, + workload_topic.name, producer) - topics = list(rpk.list_topics()) - self.logger.info(f"topic list after migration finished: {topics}") - - assert len( - topics - ) == 0, "workload topic should be removed after migration finished" - - self.wait_for_migration_states(out_migration_id, ['finished']) - self.validate_topic_access(topic=workload_topic.name, - metadata_locked=True, - read_blocked=True, - produce_blocked=True, - assert_topic_present=False) - - admin.delete_data_migration(out_migration_id) - if transfer_leadership: - tl_thread.stop() - # attach topic back - inbound_topic_name = "aliased-workload-topic" if use_alias else workload_topic.name - alias = None - if use_alias: - alias = NamespacedTopic(topic=inbound_topic_name) + admin.execute_data_migration_action(out_migration_id, + MigrationAction.prepare) + if cancellation == ('out', 'preparing'): + self.wait_for_migration_states(out_migration_id, + ['preparing', 'prepared']) + return self.cancel_outbound(out_migration_id, + workload_topic.name, producer) - if transfer_leadership: - tl_thread = TransferLeadersBackgroundThread( - self.redpanda, inbound_topic_name) - tl_thread.start() + self.validate_topic_access(topic=workload_topic.name, + metadata_locked=True, + read_blocked=False, + produce_blocked=False) - in_migration = InboundDataMigration( - topics=[InboundTopic(src_topic=workload_ns_topic, alias=alias)], - consumer_groups=[]) - in_migration_id = self.create_and_wait(in_migration) + self.wait_for_migration_states(out_migration_id, ['prepared']) - # check if topic that is being migrated can not be created even if - # migration has not yet been prepared - self._do_validate_topic_operation( - inbound_topic_name, - "creation", - expected_to_pass=False, - operation=lambda topic: rpk.create_topic(topic=topic, replicas=3)) + self.validate_topic_access(topic=workload_topic.name, + metadata_locked=True, + read_blocked=False, + produce_blocked=False) + if cancellation == ('out', 'prepared'): + return self.cancel_outbound(out_migration_id, + workload_topic.name, producer) - admin.execute_data_migration_action(in_migration_id, - MigrationAction.prepare) + admin.execute_data_migration_action(out_migration_id, + MigrationAction.execute) + if cancellation == ('out', 'executing'): + self.wait_for_migration_states(out_migration_id, + ['executing', 'executed']) + return self.cancel_outbound(out_migration_id, + workload_topic.name, producer) - self.validate_topic_access(topic=inbound_topic_name, - metadata_locked=True, - read_blocked=True, - produce_blocked=True, - assert_topic_present=False) + self.validate_topic_access(topic=workload_topic.name, + metadata_locked=True, + read_blocked=False, + produce_blocked=True) - self.wait_for_migration_states(in_migration_id, ['prepared']) + self.wait_for_migration_states(out_migration_id, ['executed']) + self.validate_topic_access(topic=workload_topic.name, + metadata_locked=True, + read_blocked=False, + produce_blocked=True) + if cancellation == ('out', 'executed'): + return self.cancel_outbound(out_migration_id, + workload_topic.name, producer) - self.validate_topic_access(topic=inbound_topic_name, - metadata_locked=True, - read_blocked=True, - produce_blocked=True) + admin.execute_data_migration_action(out_migration_id, + MigrationAction.finish) - topics = list(rpk.list_topics()) - self.logger.info( - f"topic list after inbound migration is prepared: {topics}") - assert inbound_topic_name in topics, "workload topic should be present after the inbound migration is prepared" + self.validate_topic_access(topic=workload_topic.name, + metadata_locked=True, + read_blocked=True, + produce_blocked=True, + assert_topic_present=False) - admin.execute_data_migration_action(in_migration_id, - MigrationAction.execute) + self.wait_for_migration_states(out_migration_id, + ['cut_over', 'finished']) - self.validate_topic_access(topic=inbound_topic_name, - metadata_locked=True, - read_blocked=True, - produce_blocked=True) + producer.stop_if_running() - self.wait_for_migration_states(in_migration_id, ['executed']) + self.assert_no_topics() - self.validate_topic_access(topic=inbound_topic_name, - metadata_locked=True, - read_blocked=True, - produce_blocked=True) + self.wait_for_migration_states(out_migration_id, ['finished']) + self.validate_topic_access(topic=workload_topic.name, + metadata_locked=True, + read_blocked=True, + produce_blocked=True, + assert_topic_present=False) - admin.execute_data_migration_action(in_migration_id, - MigrationAction.finish) + admin.delete_data_migration(out_migration_id) - self.logger.info('waiting for finished') - self.wait_for_migration_states(in_migration_id, ['finished']) - # now the topic should be fully operational - self.validate_topic_access(topic=inbound_topic_name, - metadata_locked=False, - read_blocked=False, - produce_blocked=False, - test_add_partitions=False) + # attach topic back + inbound_topic_name = "aliased-workload-topic" if use_alias else workload_topic.name + alias = None + if use_alias: + alias = NamespacedTopic(topic=inbound_topic_name) - consumer = self.start_consumer(topic=inbound_topic_name) - wait_until( - lambda: consumer._status.validator.valid_reads >= - producer_acked_messages, - timeout_sec=180, - backoff_sec=0.5, - err_msg= - "Error waiting for consumer to see all the messages that were produced", - ) - consumer.wait() - consumer.stop() - if transfer_leadership: - tl_thread.stop() - self.redpanda.si_settings.set_expected_damage( - {"ntr_no_topic_manifest", "missing_segments"}) + in_tl_thread = TransferLeadersBackgroundThread( + self.redpanda, + inbound_topic_name) if transfer_leadership else nullcontext() + with in_tl_thread: + remounted = False + # two cycles max: to cancel halfway and to complete + check e2e + while not remounted: + in_migration = InboundDataMigration(topics=[ + InboundTopic(src_topic=workload_ns_topic, alias=alias) + ], + consumer_groups=[]) + in_migration_id = self.create_and_wait(in_migration) + + # check if topic that is being migrated can not be created even if + # migration has not yet been prepared + self._do_validate_topic_operation( + inbound_topic_name, + "creation", + expected_to_pass=False, + operation=lambda topic: rpk.create_topic(topic=topic, + replicas=3)) + if cancellation == ('in', 'planned'): + cancellation = None + self.cancel_inbound(in_migration_id, inbound_topic_name) + continue + + admin.execute_data_migration_action(in_migration_id, + MigrationAction.prepare) + + if cancellation == ('in', 'preparing'): + cancellation = None + self.wait_for_migration_states(in_migration_id, + ['preparing', 'prepared']) + self.cancel_inbound(in_migration_id, inbound_topic_name) + continue + + self.validate_topic_access(topic=inbound_topic_name, + metadata_locked=True, + read_blocked=True, + produce_blocked=True, + assert_topic_present=False) + + self.wait_for_migration_states(in_migration_id, ['prepared']) + + self.validate_topic_access(topic=inbound_topic_name, + metadata_locked=True, + read_blocked=True, + produce_blocked=True) + + if cancellation == ('in', 'prepared'): + cancellation = None + self.cancel_inbound(in_migration_id, inbound_topic_name) + continue + + topics = list(rpk.list_topics()) + self.logger.info( + f"topic list after inbound migration is prepared: {topics}" + ) + assert inbound_topic_name in topics, "workload topic should be present after the inbound migration is prepared" + + admin.execute_data_migration_action(in_migration_id, + MigrationAction.execute) + if cancellation == ('in', 'executing'): + cancellation = None + self.wait_for_migration_states(in_migration_id, + ['executing', 'executed']) + self.cancel_inbound(in_migration_id, inbound_topic_name) + continue + + self.validate_topic_access(topic=inbound_topic_name, + metadata_locked=True, + read_blocked=True, + produce_blocked=True) + + self.wait_for_migration_states(in_migration_id, ['executed']) + + self.validate_topic_access(topic=inbound_topic_name, + metadata_locked=True, + read_blocked=True, + produce_blocked=True) + if cancellation == ('in', 'executed'): + cancellation = None + self.cancel_inbound(in_migration_id, inbound_topic_name) + continue + + admin.execute_data_migration_action(in_migration_id, + MigrationAction.finish) + + self.logger.info('waiting for finished') + self.wait_for_migration_states(in_migration_id, ['finished']) + admin.delete_data_migration(in_migration_id) + # now the topic should be fully operational + self.consume_and_validate(inbound_topic_name, + producer.acked_records) + self.validate_topic_access(topic=inbound_topic_name, + metadata_locked=False, + read_blocked=False, + produce_blocked=False) + remounted = True