From d45c5b189aea86f0670f1edea748bdf9aa1f03fa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Ma=C5=9Blanka?= Date: Wed, 5 Jun 2024 14:36:24 +0000 Subject: [PATCH 1/7] tests: added a very basic test validating adding offset to transaction MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Michał Maślanka --- .../tx_atomic_produce_consume_test.py | 370 ++++++++++++++++++ 1 file changed, 370 insertions(+) create mode 100644 tests/rptest/transactions/tx_atomic_produce_consume_test.py diff --git a/tests/rptest/transactions/tx_atomic_produce_consume_test.py b/tests/rptest/transactions/tx_atomic_produce_consume_test.py new file mode 100644 index 0000000000000..45280b67eec47 --- /dev/null +++ b/tests/rptest/transactions/tx_atomic_produce_consume_test.py @@ -0,0 +1,370 @@ +# Copyright 2024 Redpanda Data, Inc. +# +# Use of this software is governed by the Business Source License +# included in the file licenses/BSL.md +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0 + +from collections import defaultdict +from concurrent.futures import ThreadPoolExecutor +import threading +from concurrent import futures +import time +from rptest.services.cluster import cluster +from rptest.clients.types import TopicSpec +from rptest.services.admin import Admin +import confluent_kafka as ck +from rptest.tests.redpanda_test import RedpandaTest +from rptest.clients.rpk import RpkTool +from ducktape.utils.util import wait_until + + +def deserialize_str(value: bytes, ctx) -> str: + return value.decode('utf-8') + + +class ExactlyOnceVerifier(): + """ + A class populating topic with some data and then executing the transformation + of the data from source topic to the destination topic. + When data are transformed we are using transactions to achieve the exactly + once semantics + """ + def __init__(self, + redpanda, + src_topic: str, + dst_topic: str, + transform_func, + logger, + max_messages: int | None = None, + tx_id: str = "transformer-tx", + group_id: str = "transformer-group", + commit_every: int = 50, + timeout_sec: int = 60): + self._src_topic = src_topic + self._dst_topic = dst_topic + self._transform_func = transform_func + self._logger = logger + self._redpanda = redpanda + self._stop_ev = threading.Event() + self._max_messages = max_messages + self._tx_id = tx_id + self._group_id = group_id + self._commit_every = commit_every + self._produced_to_src = 0 + self._lock = threading.Lock() + self._tasks = [] + self._timeout_sec = timeout_sec + self._error = None + self._finished_partitions = set() + self._consumer_cnt = 0 + + def start_src_producer(self): + produced_per_partition = defaultdict(int) + self.last_report = time.time() + + def delivery_report(err, msg): + if err is None: + produced_per_partition[msg.partition()] += 1 + with self._lock: + self._produced_to_src += 1 + now = time.time() + if now - self.last_report > 5: + self.last_report = now + self._logger.info( + f"Produced {self._produced_to_src} messages to source topic", + ) + + producer = ck.Producer({'bootstrap.servers': self._redpanda.brokers()}) + + self._logger.info( + f"Starting source topic {self._src_topic} producer. Max messages to produce: {self._max_messages}" + ) + i = 0 + while not self._stop_ev.is_set(): + producer.produce(self._src_topic, + value=str(i), + key=str(i), + on_delivery=delivery_report) + i += 1 + if self._max_messages is not None and i >= self._max_messages: + self._stop_ev.set() + + producer.flush() + self._logger.info(f"Messages per partition: {produced_per_partition}") + + def produced_messages(self, to_wait: int): + with self._lock: + return self._produced_to_src >= to_wait + + def request_stop(self): + self._stop_ev.set() + + def wait_for_finish(self, timeout_sec: int = 30): + futures.wait(self._tasks, + timeout=timeout_sec, + return_when=futures.ALL_COMPLETED) + + def start_workload(self, workers: int): + with ThreadPoolExecutor(max_workers=workers + 1) as executor: + self._tasks.append( + executor.submit(lambda: self.start_src_producer())) + wait_until(lambda: self.produced_messages(10), 30, 0.5) + + def transform(id: str): + try: + self.tx_transform(tx_id=id) + except ck.KafkaError as e: + self._logger.error( + f"Client error reported: {e.error} - {e.reason}, retryable: {e.retryable}" + ) + except BaseException as e: + self._logger.error( + f"Error transforming {id} - {e} - {type(e)}") + + for i in range(workers): + id = f"{self._tx_id}-{i}" + self._tasks.append(executor.submit(transform, id)) + + def get_high_watermarks(self, topic): + rpk = RpkTool(self._redpanda) + + return {p.id: p.high_watermark for p in rpk.describe_topic(topic)} + + def tx_transform(self, tx_id): + start = time.time() + self._logger.info(f"Starting tx-transform with tx_id: {tx_id}") + producer = ck.Producer({ + 'bootstrap.servers': self._redpanda.brokers(), + 'transactional.id': tx_id, + }) + + consumer = ck.DeserializingConsumer({ + 'bootstrap.servers': + self._redpanda.brokers(), + 'group.id': + self._group_id, + 'auto.offset.reset': + 'earliest', + 'enable.auto.commit': + False, + 'isolation.level': + 'read_committed', + 'value.deserializer': + deserialize_str, + 'key.deserializer': + deserialize_str, + }) + try: + with self._lock: + self._consumer_cnt += 1 + + def reached_end(): + high_watermarks = self.get_high_watermarks(self._src_topic) + + assignments = {tp.partition for tp in consumer.assignment()} + if len(assignments) == 0: + return False + + positions = consumer.position([ + ck.TopicPartition(self._src_topic, p) for p in assignments + ]) + end_for = { + p.partition + for p in positions if p.partition in high_watermarks + and high_watermarks[p.partition] <= p.offset + } + self._logger.info( + f"[{tx_id}] Topic {self._src_topic} partitions high watermarks" + f"{high_watermarks}, assignment: {assignments} positions: {positions}, end_for: {end_for}" + ) + + consumers = 0 + with self._lock: + self._finished_partitions |= end_for + consumers = self._consumer_cnt + if len(self._finished_partitions) == len(high_watermarks): + return True + + return len(end_for) == len(assignments) and consumers > 1 + + in_transaction = False + + def on_assign(consumer, partitions): + self._logger.info(f"[{tx_id}] Assigned {partitions}") + + def on_revoke(consumer, partitions): + nonlocal in_transaction + self._logger.info(f"[{tx_id}] Revoked {partitions}") + if in_transaction: + self._logger.info(f"[{tx_id}] abort transaction") + producer.abort_transaction() + in_transaction = False + + consumer.subscribe([self._src_topic], + on_assign=on_assign, + on_revoke=on_revoke) + producer.init_transactions() + + msg_cnt = 0 + while True: + msg = consumer.poll(timeout=1) + if msg: + if not in_transaction: + self._logger.info(f"[{tx_id}] begin transaction") + producer.begin_transaction() + in_transaction = True + msg_cnt += 1 + t_key, t_value = self._transform_func( + msg.key(), msg.value()) + + producer.produce(self._dst_topic, + value=t_value, + key=t_key, + partition=msg.partition()) + + if msg_cnt % self._commit_every == 0: + self._logger.info( + f"[{tx_id}] Committing transaction after {msg_cnt} messages. " + f"Current consumer positions: {consumer.position(consumer.assignment())}" + ) + producer.send_offsets_to_transaction( + consumer.position(consumer.assignment()), + consumer.consumer_group_metadata()) + producer.commit_transaction() + in_transaction = False + + if reached_end(): + self._logger.info(f"{tx_id} reached end") + break + + if time.time() - start > self._timeout_sec: + self._logger.error(f"timeout waiting for {tx_id} producer") + self._error = TimeoutError( + "timeout waiting for {tx_id} producer") + self._stop_ev.set() + break + + producer.send_offsets_to_transaction( + consumer.position(consumer.assignment()), + consumer.consumer_group_metadata()) + + producer.commit_transaction() + in_transaction = False + except ck.KafkaError as e: + self._logger.error( + f"Client error reported: {e.error} - {e.reason}, retryable: {e.retryable}" + ) + raise e + finally: + consumer.close() + producer.flush() + with self._lock: + self._consumer_cnt -= 1 + + def validate_transform(self): + src_high_watermarks = self.get_high_watermarks(self._src_topic) + dst_high_watermarks = self.get_high_watermarks(self._dst_topic) + for p, hw in src_high_watermarks.items(): + assert dst_high_watermarks[ + p] >= hw, "Transformed partitions must the same or larger watermark than source" + src_msgs = defaultdict(list) + dst_msgs = defaultdict(list) + consumer = ck.DeserializingConsumer({ + 'bootstrap.servers': + self._redpanda.brokers(), + 'group.id': + f'validator-group-{self._group_id}', + 'auto.offset.reset': + 'earliest', + 'enable.auto.commit': + True, + 'value.deserializer': + deserialize_str, + 'key.deserializer': + deserialize_str, + }) + consumer.subscribe([self._src_topic, self._dst_topic]) + + def is_finished(): + positions = consumer.position(consumer.assignment()) + if len(positions + ) != len(src_high_watermarks) + len(dst_high_watermarks): + return False + + for tp in positions: + if tp.topic == self._src_topic: + if tp.offset < src_high_watermarks[tp.partition]: + return False + else: + if tp.offset < dst_high_watermarks[tp.partition]: + return False + + return True + + while True: + if is_finished(): + break + msg = consumer.poll(1) + if msg is None or msg.error(): + continue + p = msg.partition() + if msg.topic() == self._src_topic: + src_msgs[p].append(msg) + else: + dst_msgs[p].append(msg) + + if len(src_msgs[p]) > 0 and len(dst_msgs[p]) > 0: + s_msg = src_msgs[p][0] + d_msg = dst_msgs[p][0] + self._logger.debug( + f"src: {s_msg.topic()}/{s_msg.partition()}@{s_msg.offset()} - {s_msg.key()}={s_msg.value()}" + ) + self._logger.debug( + f"dst: {d_msg.topic()}/{d_msg.partition()}@{d_msg.offset()} - {d_msg.key()}={d_msg.value()}" + ) + t_key, t_val = self._transform_func(s_msg.key(), s_msg.value()) + assert d_msg.value() == t_val + assert d_msg.key() == t_key + src_msgs[p].pop(0) + dst_msgs[p].pop(0) + + +class TxAtomicProduceConsumeTest(RedpandaTest): + def __init__(self, test_context): + super(TxAtomicProduceConsumeTest, + self).__init__(test_context=test_context, num_brokers=3) + + self.admin = Admin(self.redpanda) + + @cluster(num_nodes=3) + def test_basic_tx_consumer_transform_produce(self): + partition_count = 5 + topic = TopicSpec(name='src-test-exactly-once', + replication_factor=3, + partition_count=partition_count) + + dst_topic = TopicSpec(name='dst-test-exactly-once', + replication_factor=3, + partition_count=partition_count) + self.redpanda.set_cluster_config( + {"group_new_member_join_timeout": "3000"}) + self.client().create_topic(topic) + self.client().create_topic(dst_topic) + + def simple_transform(k, v): + return f"{k}-t", f"{v}-tv" + + transformer = ExactlyOnceVerifier(self.redpanda, + topic.name, + dst_topic.name, + simple_transform, + self.logger, + 2000, + timeout_sec=180) + + transformer.start_workload(2) + transformer.wait_for_finish() + transformer.validate_transform() From 3202c989d13acb550ae1555aa15305b584d9aa22 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Ma=C5=9Blanka?= Date: Thu, 6 Jun 2024 13:48:08 +0000 Subject: [PATCH 2/7] k/group: made group tx metadata printable MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Michał Maślanka --- src/v/kafka/server/group_metadata.cc | 47 ++++++++++++++++++++++++++++ src/v/kafka/server/group_metadata.h | 5 +++ 2 files changed, 52 insertions(+) diff --git a/src/v/kafka/server/group_metadata.cc b/src/v/kafka/server/group_metadata.cc index 5b452e3373e7a..51fe5bd6519e0 100644 --- a/src/v/kafka/server/group_metadata.cc +++ b/src/v/kafka/server/group_metadata.cc @@ -442,4 +442,51 @@ std::ostream& operator<<(std::ostream& o, const offset_metadata_value& v) { v.expiry_timestamp); return o; } +namespace group_tx { +std::ostream& operator<<(std::ostream& o, const offsets_metadata& md) { + fmt::print( + o, + "{{group_id: {}, pid: {}, tx_seq: {}, offsets: {}}}", + md.group_id, + md.pid, + md.tx_seq, + fmt::join(md.offsets, ", ")); + return o; +} + +std::ostream& operator<<(std::ostream& o, const partition_offset& po) { + fmt::print( + o, + "{{partition: {}, offset: {}, leader_epoch: {}, metadata: {}}}", + po.tp, + po.offset, + po.leader_epoch, + po.metadata); + return o; +} +std::ostream& operator<<(std::ostream& o, const fence_metadata_v0& fence) { + fmt::print(o, "{{group_id: {}}}", fence.group_id); + return o; +} +std::ostream& operator<<(std::ostream& o, const fence_metadata_v1& fence) { + fmt::print( + o, + "{{group_id: {}, tx_seq: {}, tx_timeout: {} ms}}", + fence.group_id, + fence.tx_seq, + fence.transaction_timeout_ms.count()); + return o; +} + +std::ostream& operator<<(std::ostream& o, const fence_metadata& fence) { + fmt::print( + o, + "{{tm_partition: {}, group_id: {}, tx_seq: {}, tx_timeout: {} ms}}", + fence.tm_partition, + fence.group_id, + fence.tx_seq, + fence.transaction_timeout_ms); + return o; +} +} // namespace group_tx } // namespace kafka diff --git a/src/v/kafka/server/group_metadata.h b/src/v/kafka/server/group_metadata.h index fbb9d465e50f3..13159ec3b42c2 100644 --- a/src/v/kafka/server/group_metadata.h +++ b/src/v/kafka/server/group_metadata.h @@ -228,12 +228,14 @@ using group_metadata_serializer_factory namespace group_tx { struct fence_metadata_v0 { kafka::group_id group_id; + friend std::ostream& operator<<(std::ostream&, const fence_metadata_v0&); }; struct fence_metadata_v1 { kafka::group_id group_id; model::tx_seq tx_seq; model::timeout_clock::duration transaction_timeout_ms; + friend std::ostream& operator<<(std::ostream&, const fence_metadata_v1&); }; /** * Fence is set by the transaction manager when consumer adds an offset to @@ -244,6 +246,7 @@ struct fence_metadata { model::tx_seq tx_seq; model::timeout_clock::duration transaction_timeout_ms; model::partition_id tm_partition; + friend std::ostream& operator<<(std::ostream&, const fence_metadata&); }; /** * Single partition committed offset @@ -253,6 +256,7 @@ struct partition_offset { model::offset offset; int32_t leader_epoch; std::optional metadata; + friend std::ostream& operator<<(std::ostream&, const partition_offset&); }; /** * Consumer offsets commited as a part of transaction @@ -262,6 +266,7 @@ struct offsets_metadata { model::producer_identity pid; model::tx_seq tx_seq; std::vector offsets; + friend std::ostream& operator<<(std::ostream&, const offsets_metadata&); }; /** From 886bd37f1887581f446ee551073c2b03f04a0fd4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Ma=C5=9Blanka?= Date: Thu, 6 Jun 2024 08:48:05 +0000 Subject: [PATCH 3/7] k/group: improved logging in group recovery consumer MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Michał Maślanka --- src/v/kafka/server/group_recovery_consumer.cc | 45 +++++++++++++++++-- 1 file changed, 42 insertions(+), 3 deletions(-) diff --git a/src/v/kafka/server/group_recovery_consumer.cc b/src/v/kafka/server/group_recovery_consumer.cc index e251bcf0abc25..ed7e6fdba8b17 100644 --- a/src/v/kafka/server/group_recovery_consumer.cc +++ b/src/v/kafka/server/group_recovery_consumer.cc @@ -76,6 +76,7 @@ group_recovery_consumer::operator()(model::record_batch batch) { co_return ss::stop_iteration::yes; } _state.last_read_offset = batch.last_offset(); + if (batch.header().type == model::record_batch_type::raft_data) { _batch_base_offset = batch.base_offset(); co_await model::for_each_record(batch, [this](model::record& r) { @@ -90,6 +91,11 @@ group_recovery_consumer::operator()(model::record_batch batch) { .cmd; auto [group_it, _] = _state.groups.try_emplace(val.group_id); + vlog( + klog.trace, + "[group: {}] recovered update tx offsets: {}", + val.group_id, + val); group_it->second.update_prepared(batch.last_offset(), val); co_return ss::stop_iteration::no; @@ -97,7 +103,7 @@ group_recovery_consumer::operator()(model::record_batch batch) { batch.header().type == model::record_batch_type::group_commit_tx) { auto cmd = parse_tx_batch( batch, group::commit_tx_record_version); - + vlog(klog.trace, "[group: {}] recovered commit tx", cmd.cmd.group_id); auto [group_it, _] = _state.groups.try_emplace(cmd.cmd.group_id); group_it->second.commit(cmd.pid); @@ -106,6 +112,11 @@ group_recovery_consumer::operator()(model::record_batch batch) { batch.header().type == model::record_batch_type::group_abort_tx) { auto cmd = parse_tx_batch( batch, group::aborted_tx_record_version); + vlog( + klog.trace, + "[group: {}] recovered abort tx_seq: {}", + cmd.cmd.group_id, + cmd.cmd.tx_seq); auto [group_it, _] = _state.groups.try_emplace(cmd.cmd.group_id); group_it->second.abort(cmd.pid, cmd.cmd.tx_seq); @@ -117,6 +128,7 @@ group_recovery_consumer::operator()(model::record_batch batch) { } else if (batch.header().type == model::record_batch_type::version_fence) { auto fence = features::feature_table::decode_version_fence( std::move(batch)); + vlog(klog.trace, "recovered version fence"); if (fence.active_version >= cluster::cluster_version{9}) { _state.has_offset_retention_feature_fence = true; } @@ -159,10 +171,23 @@ void group_recovery_consumer::apply_tx_fence(model::record_batch&& batch) { auto cmd = reflection::adl{}.from( val_reader); auto [group_it, _] = _state.groups.try_emplace(cmd.group_id); + vlog( + klog.trace, + "[group: {}] recovered tx fence version: {} for producer: {}", + cmd.group_id, + fence_version, + bid.pid); group_it->second.try_set_fence(bid.pid.get_id(), bid.pid.get_epoch()); } else if (fence_version == group::fence_control_record_v1_version) { auto cmd = reflection::adl{}.from( val_reader); + vlog( + klog.trace, + "[group: {}] recovered tx fence version: {} for producer: {} - {}", + cmd.group_id, + fence_version, + bid.pid, + cmd); auto [group_it, _] = _state.groups.try_emplace(cmd.group_id); group_it->second.try_set_fence( bid.pid.get_id(), @@ -172,6 +197,13 @@ void group_recovery_consumer::apply_tx_fence(model::record_batch&& batch) { model::partition_id(0)); } else if (fence_version == group::fence_control_record_version) { auto cmd = reflection::adl{}.from(val_reader); + vlog( + klog.trace, + "[group: {}] recovered tx fence version: {} for producer: {} - {}", + cmd.group_id, + fence_version, + bid.pid, + cmd); auto [group_it, _] = _state.groups.try_emplace(cmd.group_id); group_it->second.try_set_fence( bid.pid.get_id(), @@ -214,7 +246,7 @@ void group_recovery_consumer::handle_record(model::record r) { } void group_recovery_consumer::handle_group_metadata(group_metadata_kv md) { - vlog(klog.trace, "Recovering group metadata {}", md.key.group_id); + vlog(klog.trace, "[group: {}] recovered group metadata", md.key.group_id); if (md.value) { // until we switch over to a compacted topic or use raft snapshots, @@ -234,7 +266,8 @@ void group_recovery_consumer::handle_offset_metadata(offset_metadata_kv md) { if (md.value) { vlog( klog.trace, - "Recovering offset {}/{} with metadata {}", + "[group: {}] recovered {}/{} committed offset: {}", + md.key.group_id, md.key.topic, md.key.partition, *md.value); @@ -248,6 +281,12 @@ void group_recovery_consumer::handle_offset_metadata(offset_metadata_kv md) { group_it->second.update_offset( tp, _batch_base_offset, std::move(*md.value)); } else { + vlog( + klog.trace, + "[group: {}] recovered {}/{} committed offset tombstone", + md.key.group_id, + md.key.topic, + md.key.partition); // tombstone auto group_it = _state.groups.find(md.key.group_id); if (group_it != _state.groups.end()) { From 2fa552dc02c41575427e4c42905fd309cf6bef8f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Ma=C5=9Blanka?= Date: Mon, 10 Jun 2024 15:32:56 +0000 Subject: [PATCH 4/7] tests/tx: added a timeout to tx verifier tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Michał Maślanka --- tests/rptest/transactions/tx_verifier_test.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/rptest/transactions/tx_verifier_test.py b/tests/rptest/transactions/tx_verifier_test.py index d8c92e2d27da5..9f4d464cd7647 100644 --- a/tests/rptest/transactions/tx_verifier_test.py +++ b/tests/rptest/transactions/tx_verifier_test.py @@ -59,7 +59,8 @@ def verify(self, tests): test=test, brokers=self.redpanda.brokers()) subprocess.check_output(["/bin/sh", "-c", cmd], - stderr=subprocess.STDOUT) + stderr=subprocess.STDOUT, + timeout=240) self.redpanda.logger.info( "txn test \"{test}\" passed".format(test=test)) except subprocess.CalledProcessError as e: From b1a956857a825ca046d33a76306d72d17e39acbf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Ma=C5=9Blanka?= Date: Tue, 11 Jun 2024 09:04:10 +0000 Subject: [PATCH 5/7] k/group: simplified txs handing in group state machine MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Previously transaction state was kept in several map data structures. It made it harder to understand the transaction flow in consumer group state machine. Refactored the transaction handling to use a single source of truth about the transaction state. The group state machine is now using a `_transactions` map to store the information about all ongoing transactions. The map is populated when the transaction starts and the entry in the map is updated throughout the transaction lifecycle. Finally the entry is removed when transaction resolves and it is either committed or aborted. The `group::ongoing_transaction` structure contains all information about transaction state, pending offset commits and expiration. Signed-off-by: Michał Maślanka --- src/v/kafka/server/group.cc | 578 ++++++++---------- src/v/kafka/server/group.h | 116 ++-- src/v/kafka/server/group_manager.cc | 25 +- src/v/kafka/server/group_recovery_consumer.cc | 2 +- src/v/kafka/server/group_stm.cc | 86 +-- src/v/kafka/server/group_stm.h | 50 +- .../tests/consumer_group_recovery_test.cc | 19 +- 7 files changed, 370 insertions(+), 506 deletions(-) diff --git a/src/v/kafka/server/group.cc b/src/v/kafka/server/group.cc index 6967279c04155..c1bc6fb16be1d 100644 --- a/src/v/kafka/server/group.cc +++ b/src/v/kafka/server/group.cc @@ -169,6 +169,16 @@ bool group::valid_previous_state(group_state s) const { __builtin_unreachable(); } + +group::ongoing_transaction::ongoing_transaction( + model::tx_seq tx_seq, + model::partition_id coordinator_partition, + model::timeout_clock::duration tx_timeout) + : tx_seq(tx_seq) + , coordinator_partition(coordinator_partition) + , timeout(tx_timeout) + , last_update(model::timeout_clock::now()) {} + namespace { template model::record_batch make_tx_batch( @@ -1662,64 +1672,59 @@ void group::fail_offset_commit( void group::reset_tx_state(model::term_id term) { _term = term; - _ongoing_tx_offsets.clear(); - _expiration_info.clear(); - _tx_data.clear(); + _transactions.clear(); _fence_pid_epoch.clear(); } -void group::insert_ongoing_tx_offsets(ongoing_tx_offsets tx) { - auto pid = tx.pid; - - // TODO: warn when legacy support is removed and _tx_data doesn't contain - // pid - auto [txseq_it, inserted] = _tx_data.try_emplace( - pid.get_id(), tx_data{tx.tx_seq, model::legacy_tm_ntp.tp.partition}); +void group::insert_ongoing_tx( + model::producer_identity pid, ongoing_transaction tx) { + auto [_, inserted] = _transactions.try_emplace(pid, std::move(tx)); if (!inserted) { - if (txseq_it->second.tx_seq != tx.tx_seq) { - vlog( - _ctx_txlog.warn, - "ongoing tx of pid {} has tx_seq {} while {} expected", - tx.pid, - tx.tx_seq, - txseq_it->second.tx_seq); - } + vlog(_ctx_txlog.warn, "error adding ongoing transaction for {}", pid); } - - _ongoing_tx_offsets[pid] = std::move(tx); } ss::future group::commit_tx(cluster::commit_group_tx_request r) { + vlog(_ctx_txlog.trace, "processing commit_tx request: {}", r); if (_partition->term() != _term) { + vlog( + _ctx_txlog.warn, + "commit_tx request: {} failed - leadership_changed, expected term: " + "{}, current_term: {}", + r, + _term, + _partition->term()); co_return make_commit_tx_reply(cluster::tx::errc::stale); } auto fence_it = _fence_pid_epoch.find(r.pid.get_id()); if (fence_it == _fence_pid_epoch.end()) { vlog( - _ctx_txlog.warn, - "Can't commit tx: fence with pid {} isn't set", - r.pid); + _ctx_txlog.warn, "commit_tx request: {} failed - fence not found", r); co_return make_commit_tx_reply(cluster::tx::errc::request_rejected); } + if (r.pid.get_epoch() != fence_it->second) { vlog( - _ctx_txlog.trace, - "Can't commit tx with pid {} - the fence doesn't match {}", - r.pid, + _ctx_txlog.warn, + "commit_tx request: {} failed - fenced, stored producer epoch: {}", + r, fence_it->second); co_return make_commit_tx_reply(cluster::tx::errc::request_rejected); } - auto txseq_it = _tx_data.find(r.pid.get_id()); - if (txseq_it == _tx_data.end()) { + auto tx_it = _transactions.find(r.pid); + if (tx_it == _transactions.end()) { vlog( _ctx_txlog.trace, - "can't find a tx {}, probably already comitted", - r.pid); + "commit_tx request: {} - can not find ongoing transaction, it was " + "most likely already committed", + r); co_return make_commit_tx_reply(cluster::tx::errc::none); - } else if (txseq_it->second.tx_seq > r.tx_seq) { + } + + if (tx_it->second.tx_seq > r.tx_seq) { // rare situation: // * tm_stm begins (tx_seq+1) // * request on this group passes but then tm_stm fails and forgets @@ -1728,48 +1733,22 @@ group::commit_tx(cluster::commit_group_tx_request r) { // existence of {pid, tx_seq+1} implies {pid, tx_seq} is committed vlog( _ctx_txlog.trace, - "Already commited pid:{} tx_seq:{} - a higher tx_seq:{} was observed", + "Already commited pid: {} tx_seq: {} - a higher tx_seq: {} was " + "observed", r.pid, r.tx_seq, - txseq_it->second.tx_seq); + tx_it->second.tx_seq); co_return make_commit_tx_reply(cluster::tx::errc::none); - } else if (txseq_it->second.tx_seq != r.tx_seq) { - vlog( - _ctx_txlog.warn, - "Can't commit pid {}: passed txseq {} doesn't match ongoing {}", - r.pid, - r.tx_seq, - txseq_it->second.tx_seq); - co_return make_commit_tx_reply(cluster::tx::errc::request_rejected); } - - auto ongoing_it = _ongoing_tx_offsets.find(r.pid); - if (ongoing_it == _ongoing_tx_offsets.end()) { + if (tx_it->second.tx_seq != r.tx_seq) { vlog( - _ctx_txlog.trace, - "can't find a tx {}, probably already comitted", - r.pid); - co_return make_commit_tx_reply(cluster::tx::errc::none); - } - if (ongoing_it->second.tx_seq > r.tx_seq) { - // rare situation: - // * tm_stm prepares (tx_seq+1) - // * prepare on this group passed but tm_stm failed to write to disk - // * during recovery tm_stm recommits (tx_seq) - // existence of {pid, tx_seq+1} implies {pid, tx_seq} is committed - vlog( - _ctx_txlog.trace, - "prepare for pid:{} has higher tx_seq:{} than given: {} => replaying " - "already comitted commit", - r.pid, - ongoing_it->second.tx_seq, - r.tx_seq); - co_return make_commit_tx_reply(cluster::tx::errc::none); - } else if (ongoing_it->second.tx_seq < r.tx_seq) { + _ctx_txlog.warn, + "commit_tx request: {} failed - tx_seq mismatch. Expected seq: {}", + r, + tx_it->second.tx_seq); co_return make_commit_tx_reply(cluster::tx::errc::request_rejected); } - // we commit only if a provided tx_seq matches prepared tx_seq co_return co_await do_commit(r.group_id, r.pid); } @@ -1793,85 +1772,79 @@ cluster::abort_group_tx_reply make_abort_tx_reply(cluster::tx::errc ec) { ss::future group::begin_tx(cluster::begin_group_tx_request r) { + vlog(_ctx_txlog.trace, "processing begin tx request: {}", r); if (_partition->term() != _term) { vlog( - _ctx_txlog.trace, - "processing name:begin_tx pid:{} tx_seq:{} timeout:{} coordinator:{} " - "=> stale leader", - r.pid, - r.tx_seq, - r.timeout, - r.tm_partition); + _ctx_txlog.debug, + "begin tx request {} failed - leadership changed. Expected term: {}, " + "current term: {}", + r, + _term, + _partition->term()); co_return make_begin_tx_reply(cluster::tx::errc::stale); } - vlog( - _ctx_txlog.trace, - "processing name:begin_tx pid:{} tx_seq:{} timeout:{} coordinator:{} in " - "term:{}", - r.pid, - r.tx_seq, - r.timeout, - r.tm_partition, - _term); auto fence_it = _fence_pid_epoch.find(r.pid.get_id()); - if (fence_it == _fence_pid_epoch.end()) { - // intentionally empty - } else if (r.pid.get_epoch() < fence_it->second) { - vlog( - _ctx_txlog.error, - "pid {} fenced out by epoch {}", - r.pid, - fence_it->second); - co_return make_begin_tx_reply(cluster::tx::errc::fenced); - } else if (r.pid.get_epoch() > fence_it->second) { - // there is a fence, it might be that tm_stm failed, forget about - // an ongoing transaction, assigned next pid for the same tx.id and - // started a new transaction without aborting the previous one. - // - // at the same time it's possible that it already aborted the old - // tx before starting this. do_abort_tx is idempotent so calling it - // just in case to proactively abort the tx instead of waiting for - // the timeout - - auto old_pid = model::producer_identity{ - r.pid.get_id(), fence_it->second}; - auto ar = co_await do_try_abort_old_tx(old_pid); - if (ar != cluster::tx::errc::none) { + + if (fence_it != _fence_pid_epoch.end()) { + if (r.pid.get_epoch() < fence_it->second) { vlog( - _ctx_txlog.trace, - "can't begin tx {} because abort of a prev tx {} failed with {}; " - "retrying", + _ctx_txlog.warn, + "begin tx request failed. Producer {} epoch is lower than " + "current fence epoch: {}", r.pid, - old_pid, - ar); - co_return make_begin_tx_reply(cluster::tx::errc::stale); + fence_it->second); + co_return make_begin_tx_reply(cluster::tx::errc::fenced); + } else if (r.pid.get_epoch() > fence_it->second) { + // there is a fence, it might be that tm_stm failed, forget about + // an ongoing transaction, assigned next pid for the same tx.id and + // started a new transaction without aborting the previous one. + // + // at the same time it's possible that it already aborted the old + // tx before starting this. do_abort_tx is idempotent so calling it + // just in case to proactively abort the tx instead of waiting for + // the timeout + + auto old_pid = model::producer_identity{ + r.pid.get_id(), fence_it->second}; + auto ar = co_await do_try_abort_old_tx(old_pid); + if (ar != cluster::tx::errc::none) { + vlog( + _ctx_txlog.warn, + "begin tx request {} failed, can not abort old transaction: " + "{} - {}", + r, + old_pid, + ar); + co_return make_begin_tx_reply(cluster::tx::errc::stale); + } } } - - auto txseq_it = _tx_data.find(r.pid.get_id()); - if (txseq_it != _tx_data.end()) { - if (r.tx_seq != txseq_it->second.tx_seq) { + // Now we know the current producer epoch is valid + auto it = _transactions.find(r.pid); + if (it != _transactions.end()) { + if (r.tx_seq != it->second.tx_seq) { vlog( _ctx_txlog.warn, - "can't begin a tx {} with tx_seq {}: a producer id is already " - "involved in a tx with tx_seq {}", - r.pid, - r.tx_seq, - txseq_it->second.tx_seq); + "begin tx request {} failed - produced has already ongoing " + "transaction with highest sequence number: {}", + r, + it->second.tx_seq); co_return make_begin_tx_reply( cluster::tx::errc::unknown_server_error); } - if (_ongoing_tx_offsets.contains(r.pid)) { + + if (!it->second.offsets.empty()) { vlog( _ctx_txlog.warn, - "can't begin a tx {} with tx_seq {}: it was already begun and it " - "accepted writes", + "begin tx request {} failed - transaction is already ongoing and " + "accepted offset commits", r.pid, r.tx_seq); co_return make_begin_tx_reply( cluster::tx::errc::unknown_server_error); } + // begin_tx request is idempotent, return success co_return cluster::begin_group_tx_reply(_term, cluster::tx::errc::none); } @@ -1880,6 +1853,7 @@ group::begin_tx(cluster::begin_group_tx_request r) { .tx_seq = r.tx_seq, .transaction_timeout_ms = r.timeout, .tm_partition = r.tm_partition}; + // replicate fence batch - this is a transaction boundary model::record_batch batch = make_tx_fence_batch(r.pid, std::move(fence)); auto reader = model::make_memory_record_batch_reader(std::move(batch)); auto res = co_await _partition->raft()->replicate( @@ -1890,9 +1864,9 @@ group::begin_tx(cluster::begin_group_tx_request r) { if (!res) { vlog( _ctx_txlog.warn, - "Error \"{}\" on replicating pid:{} fencing batch", - res.error(), - r.pid); + "begin tx request {} failed - error replicating fencing batch - {}", + r, + res.error().message()); if ( _partition->raft()->is_leader() && _partition->raft()->term() == _term) { @@ -1900,41 +1874,29 @@ group::begin_tx(cluster::begin_group_tx_request r) { } co_return make_begin_tx_reply(cluster::tx::errc::leader_not_found); } - + // set fence or update the one we already have _fence_pid_epoch[r.pid.get_id()] = r.pid.get_epoch(); - _tx_data[r.pid.get_id()] = tx_data{r.tx_seq, r.tm_partition}; - - auto [it, _] = _expiration_info.insert_or_assign( - r.pid, expiration_info(r.timeout)); - try_arm(it->second.deadline()); - - cluster::begin_group_tx_reply reply; - reply.etag = _term; - reply.ec = cluster::tx::errc::none; - co_return reply; -} -cluster::abort_origin group::get_abort_origin( - const model::producer_identity& pid, model::tx_seq tx_seq) const { - auto it = _tx_data.find(pid.get_id()); - if (it != _tx_data.end()) { - if (tx_seq < it->second.tx_seq) { - return cluster::abort_origin::past; - } - if (it->second.tx_seq < tx_seq) { - return cluster::abort_origin::future; - } - } + auto [tx_it, _] = _transactions.try_emplace( + r.pid, r.tx_seq, r.tm_partition, r.timeout); + try_arm(tx_it->second.deadline()); - return cluster::abort_origin::present; + co_return cluster::begin_group_tx_reply(_term, cluster::tx::errc::none); } ss::future group::abort_tx(cluster::abort_group_tx_request r) { // doesn't make sense to fence off an abort because transaction // manager has already decided to abort and acked to a client - + vlog(_ctxlog.trace, "processing abort_tx request: {}", r); if (_partition->term() != _term) { + vlog( + _ctxlog.debug, + "abort_tx request: {} failed - leadership changed, expected term: " + "{}, current term: {}", + r, + _term, + _partition->term()); co_return make_abort_tx_reply(cluster::tx::errc::stale); } @@ -1942,27 +1904,29 @@ group::abort_tx(cluster::abort_group_tx_request r) { if (fence_it == _fence_pid_epoch.end()) { vlog( _ctx_txlog.warn, - "Can't abort tx: fence with pid {} isn't set", - r.pid); + "abort_tx request: {} failed - producer fence not found", + r); co_return make_abort_tx_reply(cluster::tx::errc::request_rejected); } if (r.pid.get_epoch() != fence_it->second) { vlog( - _ctx_txlog.trace, - "Can't abort tx with pid {} - the fence doesn't match {}", + _ctx_txlog.warn, + "abort_tx request: {} failed - fence epoch mismatch. Fence epoch: {}", r.pid, fence_it->second); co_return make_abort_tx_reply(cluster::tx::errc::request_rejected); } - auto txseq_it = _tx_data.find(r.pid.get_id()); - if (txseq_it == _tx_data.end()) { + auto tx_it = _transactions.find(r.pid); + if (tx_it == _transactions.end()) { vlog( _ctx_txlog.trace, - "can't find a tx {}, probably already aborted", + "unable to find transaction for {}, probably already aborted", r.pid); co_return make_abort_tx_reply(cluster::tx::errc::none); - } else if (txseq_it->second.tx_seq > r.tx_seq) { + } + + if (tx_it->second.tx_seq > r.tx_seq) { // rare situation: // * tm_stm begins (tx_seq+1) // * request on this group passes but then tm_stm fails and forgets @@ -1971,46 +1935,21 @@ group::abort_tx(cluster::abort_group_tx_request r) { // existence of {pid, tx_seq+1} implies {pid, tx_seq} is aborted vlog( _ctx_txlog.trace, - "Already aborted pid:{} tx_seq:{} - a higher tx_seq:{} was observed", + "producer transaction {} already aborted, ongoing tx sequence: {}, " + "request tx sequence: {}", r.pid, - r.tx_seq, - txseq_it->second.tx_seq); + tx_it->second.tx_seq, + r.tx_seq); co_return make_abort_tx_reply(cluster::tx::errc::none); - } else if (txseq_it->second.tx_seq != r.tx_seq) { - vlog( - _ctx_txlog.warn, - "Can't abort pid {}: passed txseq {} doesn't match ongoing {}", - r.pid, - r.tx_seq, - txseq_it->second.tx_seq); - co_return make_abort_tx_reply(cluster::tx::errc::request_rejected); } - auto origin = get_abort_origin(r.pid, r.tx_seq); - if (origin == cluster::abort_origin::past) { - // rejecting a delayed abort command to prevent aborting - // a wrong transaction - auto it = _expiration_info.find(r.pid); - if (it != _expiration_info.end()) { - it->second.is_expiration_requested = true; - } else { - vlog( - _ctx_txlog.error, - "pid({}) should be inside _expiration_info", - r.pid); - } - co_return make_abort_tx_reply(cluster::tx::errc::request_rejected); - } - if (origin == cluster::abort_origin::future) { - // impossible situation: before transactional coordinator may issue - // abort of the current transaction it should begin it and abort all - // previous transactions with the same pid + if (tx_it->second.tx_seq != r.tx_seq) { vlog( - _ctx_txlog.error, - "Rejecting abort (pid:{}, tx_seq: {}) because it isn't consistent " - "with the current ongoing transaction", + _ctx_txlog.warn, + "abort_tx request: {} failed - tx sequence mismatch. Ongoing tx " + "sequence: {}, request tx sequence: {}", r.pid, - r.tx_seq); + fence_it->second); co_return make_abort_tx_reply(cluster::tx::errc::request_rejected); } @@ -2053,8 +1992,8 @@ group::store_txn_offsets(txn_offset_commit_request r) { r, error_code::invalid_producer_epoch); } - auto txseq_it = _tx_data.find(pid.get_id()); - if (txseq_it == _tx_data.end()) { + auto tx_it = _transactions.find(pid); + if (tx_it == _transactions.end()) { vlog( _ctx_txlog.warn, "Can't store txn offsets: current tx with pid {} isn't ongoing", @@ -2062,42 +2001,28 @@ group::store_txn_offsets(txn_offset_commit_request r) { co_return txn_offset_commit_response( r, error_code::invalid_producer_epoch); } - auto tx_seq = txseq_it->second.tx_seq; - absl::node_hash_map - offsets; + const auto tx_seq = tx_it->second.tx_seq; - auto ongoing_it = _ongoing_tx_offsets.find(pid); - if (ongoing_it != _ongoing_tx_offsets.end()) { - for (const auto& [tp, offset] : ongoing_it->second.offsets) { - group_tx::partition_offset md{ - .tp = tp, - .offset = offset.offset, - .leader_epoch = offset.committed_leader_epoch, - .metadata = offset.metadata}; - offsets[tp] = md; - } - } + chunked_vector offsets; - for (const auto& t : r.data.topics) { + for (auto& t : r.data.topics) { for (const auto& p : t.partitions) { - model::topic_partition tp(t.name, p.partition_index); - group_tx::partition_offset md{ - .tp = tp, + offsets.push_back(group_tx::partition_offset{ + .tp = model::topic_partition(t.name, p.partition_index), .offset = p.committed_offset, .leader_epoch = p.committed_leader_epoch, - .metadata = p.committed_metadata}; - offsets[tp] = md; + .metadata = p.committed_metadata, + }); } } - auto tx_entry = group_tx::offsets_metadata{ - .group_id = r.data.group_id, .pid = pid, .tx_seq = tx_seq}; - tx_entry.offsets.reserve(offsets.size()); - - for (const auto& [tp, offset] : offsets) { - tx_entry.offsets.push_back(offset); - } + group_tx::offsets_metadata tx_entry{ + .group_id = r.data.group_id, + .pid = pid, + .tx_seq = tx_seq, + .offsets = {offsets.begin(), offsets.end()}, + }; auto batch = make_tx_batch( model::record_batch_type::group_prepare_tx, @@ -2106,12 +2031,12 @@ group::store_txn_offsets(txn_offset_commit_request r) { std::move(tx_entry)); auto reader = model::make_memory_record_batch_reader(std::move(batch)); - auto e = co_await _partition->raft()->replicate( + auto result = co_await _partition->raft()->replicate( _term, std::move(reader), raft::replicate_options(raft::consistency_level::quorum_ack)); - if (!e) { + if (!result) { if ( _partition->raft()->is_leader() && _partition->raft()->term() == _term) { @@ -2121,30 +2046,22 @@ group::store_txn_offsets(txn_offset_commit_request r) { co_return txn_offset_commit_response( r, error_code::unknown_server_error); } - - ongoing_tx_offsets ptx; - ptx.tx_seq = tx_seq; - ptx.pid = pid; - const auto now = model::timestamp::now(); - for (const auto& [tp, offset] : offsets) { - offset_metadata md{ - .log_offset = e.value().last_offset, - .offset = offset.offset, - .metadata = offset.metadata.value_or(""), - .committed_leader_epoch = kafka::leader_epoch(offset.leader_epoch), - .commit_timestamp = now, - .expiry_timestamp = std::nullopt, - }; - ptx.offsets[tp] = md; + tx_it = _transactions.find(pid); + if (tx_it == _transactions.end()) { + vlog( + _ctx_txlog.warn, + "Can't store txn offsets: current tx with pid {} isn't ongoing", + pid); + co_return txn_offset_commit_response( + r, error_code::invalid_producer_epoch); } - _ongoing_tx_offsets[pid] = ptx; - - auto it = _expiration_info.find(pid); - if (it != _expiration_info.end()) { - it->second.update_last_update_time(); - } else { - vlog(_ctx_txlog.warn, "pid {} should be in _expiration_info", pid); + for (auto& o : offsets) { + tx_it->second.offsets[o.tp] = pending_tx_offset{ + .offset_metadata = o, + .log_offset = result.value().last_offset, + }; } + tx_it->second.update_last_update_time(); co_return txn_offset_commit_response(r, error_code::none); } @@ -2919,19 +2836,20 @@ ss::future group::do_abort( co_return make_abort_tx_reply(cluster::tx::errc::timeout); } - _ongoing_tx_offsets.erase(pid); - _tx_data.erase(pid.get_id()); - _expiration_info.erase(pid); + _transactions.erase(pid); co_return make_abort_tx_reply(cluster::tx::errc::none); } ss::future group::do_commit(kafka::group_id group_id, model::producer_identity pid) { - auto ongoing_it = _ongoing_tx_offsets.find(pid); - if (ongoing_it == _ongoing_tx_offsets.end()) { + auto ongoing_it = _transactions.find(pid); + if (ongoing_it == _transactions.end()) { // Impossible situation - vlog(_ctx_txlog.error, "Can not find prepared tx for pid: {}", pid); + vlog( + _ctx_txlog.error, + "Unable to find an ongoing transaction for producer: {}", + pid); co_return make_commit_tx_reply(cluster::tx::errc::unknown_server_error); } @@ -2946,21 +2864,22 @@ group::do_commit(kafka::group_id group_id, model::producer_identity pid) { // problem, because client got ok for commit_request (see // tx_gateway_frontend). So redpanda will eventually finish commit and // complete write for both this events. + model::record_batch_reader::data_t batches; batches.reserve(2); cluster::simple_batch_builder store_offset_builder( model::record_batch_type::raft_data, model::offset(0)); - for (const auto& [tp, metadata] : ongoing_it->second.offsets) { + for (const auto& [tp, pending_offset] : ongoing_it->second.offsets) { update_store_offset_builder( store_offset_builder, tp.topic, tp.partition, - metadata.offset, - metadata.committed_leader_epoch, - metadata.metadata, - metadata.commit_timestamp, - metadata.expiry_timestamp); + pending_offset.offset_metadata.offset, + kafka::leader_epoch(pending_offset.offset_metadata.leader_epoch), + pending_offset.offset_metadata.metadata.value_or(""), + model::timestamp::now(), + std::nullopt); } batches.push_back(std::move(store_offset_builder).build()); @@ -2977,17 +2896,17 @@ group::do_commit(kafka::group_id group_id, model::producer_identity pid) { auto reader = model::make_memory_record_batch_reader(std::move(batches)); - auto e = co_await _partition->raft()->replicate( + auto result = co_await _partition->raft()->replicate( _term, std::move(reader), raft::replicate_options(raft::consistency_level::quorum_ack)); - if (!e) { + if (!result) { vlog( _ctx_txlog.warn, - "Error \"{}\" on replicating pid:{} commit batch", - e.error(), - pid); + "error replicating transaction commit batch for pid: {} - {}", + pid, + result.error().message()); if ( _partition->raft()->is_leader() && _partition->raft()->term() == _term) { @@ -2995,23 +2914,30 @@ group::do_commit(kafka::group_id group_id, model::producer_identity pid) { } co_return make_commit_tx_reply(cluster::tx::errc::timeout); } + ongoing_it = _transactions.find(pid); - ongoing_it = _ongoing_tx_offsets.find(pid); - if (ongoing_it == _ongoing_tx_offsets.end()) { + if (ongoing_it == _transactions.end()) { vlog( _ctx_txlog.error, - "can't find already observed prepared tx pid:{}", + "unable to find ongoing transaction for producer: {}", pid); co_return make_commit_tx_reply(cluster::tx::errc::unknown_server_error); } for (const auto& [tp, md] : ongoing_it->second.offsets) { - try_upsert_offset(tp, md); + try_upsert_offset( + tp, + offset_metadata{ + .log_offset = md.log_offset, + .offset = md.offset_metadata.offset, + .metadata = md.offset_metadata.metadata.value_or(""), + .committed_leader_epoch = kafka::leader_epoch( + md.offset_metadata.leader_epoch), + .commit_timestamp = model::timestamp::now(), + }); } - _ongoing_tx_offsets.erase(ongoing_it); - _tx_data.erase(pid.get_id()); - _expiration_info.erase(pid); + _transactions.erase(ongoing_it); co_return make_commit_tx_reply(cluster::tx::errc::none); } @@ -3025,8 +2951,8 @@ void group::abort_old_txes() { void group::maybe_rearm_timer() { std::optional earliest_deadline; - for (auto& [pid, expiration] : _expiration_info) { - auto candidate = expiration.deadline(); + for (auto& [pid, ongoing_tx] : _transactions) { + auto candidate = ongoing_tx.deadline(); if (earliest_deadline) { earliest_deadline = std::min(earliest_deadline.value(), candidate); } else { @@ -3047,27 +2973,16 @@ ss::future<> group::do_abort_old_txes() { co_return; } - std::vector pids; - for (auto& [id, _] : _ongoing_tx_offsets) { - pids.push_back(id); - } + absl::btree_set expired; - for (auto& [id, _] : _tx_data) { - auto it = _fence_pid_epoch.find(id); - if (it != _fence_pid_epoch.end()) { - pids.emplace_back(id(), it->second); + for (auto& [pid, tx] : _transactions) { + if (!tx.is_expired()) { + continue; } - } - - absl::btree_set expired; - for (auto pid : pids) { - auto expiration_it = _expiration_info.find(pid); - if (expiration_it != _expiration_info.end()) { - if (!expiration_it->second.is_expired()) { - continue; - } + auto it = _fence_pid_epoch.find(pid.get_id()); + if (it != _fence_pid_epoch.end()) { + expired.insert(pid); } - expired.insert(pid); } for (auto pid : expired) { @@ -3079,12 +2994,15 @@ ss::future<> group::do_abort_old_txes() { ss::future<> group::try_abort_old_tx(model::producer_identity pid) { return get_tx_lock(pid.get_id())->with([this, pid]() { - vlog(_ctx_txlog.trace, "attempting to expire pid:{}", pid); + vlog(_ctx_txlog.info, "attempting expiration of producer: {}", pid); - auto expiration_it = _expiration_info.find(pid); - if (expiration_it != _expiration_info.end()) { - if (!expiration_it->second.is_expired()) { - vlog(_ctx_txlog.trace, "pid:{} isn't expired, skipping", pid); + auto tx_it = _transactions.find(pid); + if (tx_it != _transactions.end()) { + if (!tx_it->second.is_expired()) { + vlog( + _ctx_txlog.trace, + "producer {} transaction is not expired, skipping", + pid); return ss::now(); } } @@ -3095,46 +3013,56 @@ ss::future<> group::try_abort_old_tx(model::producer_identity pid) { ss::future group::do_try_abort_old_tx(model::producer_identity pid) { - vlog(_ctx_txlog.trace, "aborting pid:{}", pid); - - auto p_it = _ongoing_tx_offsets.find(pid); - if (p_it != _ongoing_tx_offsets.end()) { - auto tx_seq = p_it->second.tx_seq; - auto tx_data = _tx_data.find(pid.get_id()); - model::partition_id tm = model::legacy_tm_ntp.tp.partition; - if (tx_data != _tx_data.end()) { - tm = tx_data->second.tm_partition; - } else { - vlog( - _ctxlog.error, - "pid {} doesn't exist in current transactions data", - pid); - } + vlog(_ctx_txlog.trace, "aborting producer {} transaction", pid); + + auto tx_it = _transactions.find(pid); + if (tx_it != _transactions.end()) { + auto tx_seq = tx_it->second.tx_seq; + + model::partition_id coordinator_partition_id + = tx_it->second.coordinator_partition; + vlog( + _ctx_txlog.trace, + "sending abort tx request for producer {} with tx_seq: {} to " + "coordinator partition: {}", + pid, + tx_seq, + coordinator_partition_id); auto r = co_await _tx_frontend.local().route_globally( cluster::try_abort_request( - tm, + coordinator_partition_id, pid, tx_seq, config::shard_local_cfg().rm_sync_timeout_ms.value())); + if (r.ec != cluster::tx::errc::none) { co_return r.ec; } + vlog( + _ctx_txlog.trace, + "producer id {} abort request result: [committed: {}, aborted: {}]", + pid, + r.commited, + r.aborted); + if (r.commited) { auto res = co_await do_commit(_id, pid); if (res.ec != cluster::tx::errc::none) { vlog( _ctxlog.warn, - "commit of prepared tx pid:{} failed with ec:{}", + "committing producer {} transaction failed - {}", pid, res.ec); } co_return res.ec; - } else if (r.aborted) { + } + + if (r.aborted) { auto res = co_await do_abort(_id, pid, tx_seq); if (res.ec != cluster::tx::errc::none) { vlog( _ctxlog.warn, - "abort of prepared tx pid:{} failed with ec:{}", + "aborting producer {} transaction failed - {}", pid, res.ec); } @@ -3142,24 +3070,9 @@ group::do_try_abort_old_tx(model::producer_identity pid) { } co_return cluster::tx::errc::stale; - } else { - auto txseq_it = _tx_data.find(pid.get_id()); - if (txseq_it == _tx_data.end()) { - vlog(_ctx_txlog.trace, "skipping pid:{} (can't find tx_seq)", pid); - co_return cluster::tx::errc::none; - } - model::tx_seq tx_seq = txseq_it->second.tx_seq; - auto res = co_await do_abort(_id, pid, tx_seq); - if (res.ec != cluster::tx::errc::none) { - vlog( - _ctxlog.warn, - "abort of pid:{} tx_seq:{} failed with {}", - pid, - tx_seq, - res.ec); - } - co_return res.ec; } + // no active transaction for requested producer found + co_return cluster::tx::errc::none; } void group::try_arm(time_point_type deadline) { @@ -3175,7 +3088,8 @@ void group::try_arm(time_point_type deadline) { std::ostream& operator<<(std::ostream& o, const group::offset_metadata& md) { fmt::print( o, - "{{log_offset:{}, offset:{}, metadata:{}, committed_leader_epoch:{}}}", + "{{log_offset:{}, offset:{}, metadata:{}, " + "committed_leader_epoch:{}}}", md.log_offset, md.offset, md.metadata, @@ -3411,7 +3325,7 @@ group::get_expired_offsets(std::chrono::seconds retention_period) { bool group::has_offsets() const { return !_offsets.empty() || !_pending_offset_commits.empty() - || !_tx_data.empty(); + || !_transactions.empty(); } std::vector diff --git a/src/v/kafka/server/group.h b/src/v/kafka/server/group.h index 6b16c6f5f5fc0..3ec2e5e0514ab 100644 --- a/src/v/kafka/server/group.h +++ b/src/v/kafka/server/group.h @@ -15,6 +15,7 @@ #include "cluster/simple_batch_builder.h" #include "cluster/tx_protocol_types.h" #include "cluster/tx_utils.h" +#include "container/chunked_hash_map.h" #include "container/fragmented_vector.h" #include "features/feature_table.h" #include "kafka/group_probe.h" @@ -149,10 +150,44 @@ class group final : public ss::enable_lw_shared_from_this { using offset_commit_stages = stages; using join_group_stages = stages; using sync_group_stages = stages; + /** + * represents an offset that is to be stored as a part of transaction + */ + struct pending_tx_offset { + group_tx::partition_offset offset_metadata; + model::offset log_offset; + }; + /** + * In memory representation of active transaction. The transaction is added + * when a state machine executes begin transaction request. The transaction + * is removed when the state machine executes commit or abort transaction + * request. The transaction holds all pending offset commits. + */ + struct ongoing_transaction { + ongoing_transaction( + model::tx_seq, model::partition_id, model::timeout_clock::duration); - struct tx_data { model::tx_seq tx_seq; - model::partition_id tm_partition; + model::partition_id coordinator_partition; + + model::timeout_clock::duration timeout; + model::timeout_clock::time_point last_update; + + bool is_expiration_requested{false}; + + model::timeout_clock::time_point deadline() const { + return last_update + timeout; + } + + bool is_expired() const { + return is_expiration_requested || deadline() <= clock_type::now(); + } + + void update_last_update_time() { + last_update = model::timeout_clock::now(); + } + + chunked_hash_map offsets; }; struct offset_metadata { @@ -591,7 +626,8 @@ class group final : public ss::enable_lw_shared_from_this { } } - void insert_ongoing_tx_offsets(ongoing_tx_offsets); + void + insert_ongoing_tx(model::producer_identity pid, ongoing_transaction tx); void try_set_fence(model::producer_id id, model::producer_epoch epoch) { auto [fence_it, _] = _fence_pid_epoch.try_emplace(id, epoch); @@ -600,42 +636,6 @@ class group final : public ss::enable_lw_shared_from_this { } } - void try_set_tx_data( - model::producer_identity id, - model::tx_seq txseq, - model::partition_id tm_partition) { - auto fence_it = _fence_pid_epoch.find(id.get_id()); - if (fence_it == _fence_pid_epoch.end()) { - return; - } - if (fence_it->second != id.get_epoch()) { - return; - } - auto [ongoing_it, _] = _tx_data.try_emplace( - id.get_id(), tx_data{txseq, tm_partition}); - if (ongoing_it->second.tx_seq <= txseq) { - ongoing_it->second.tx_seq = txseq; - ongoing_it->second.tm_partition = tm_partition; - } - } - - void try_set_timeout( - model::producer_identity id, - model::timeout_clock::duration transaction_timeout_ms) { - auto fence_it = _fence_pid_epoch.find(id.get_id()); - if (fence_it == _fence_pid_epoch.end()) { - return; - } - if (fence_it->second != id.get_epoch()) { - return; - } - auto [info_it, inserted] = _expiration_info.try_emplace( - id, expiration_info(transaction_timeout_ms)); - if (inserted) { - try_arm(info_it->second.deadline()); - } - } - // helper for the kafka api: describe groups described_group describe() const; @@ -814,9 +814,6 @@ class group final : public ss::enable_lw_shared_from_this { error_code validate_expected_group(const txn_offset_commit_request& r) const; - cluster::abort_origin - get_abort_origin(const model::producer_identity&, model::tx_seq) const; - bool has_offsets() const; bool has_pending_transaction(const model::topic_partition& tp) { @@ -828,8 +825,8 @@ class group final : public ss::enable_lw_shared_from_this { } if (std::any_of( - _ongoing_tx_offsets.begin(), - _ongoing_tx_offsets.end(), + _transactions.begin(), + _transactions.end(), [&tp](const auto& tp_info) { return tp_info.second.offsets.contains(tp); })) { @@ -921,39 +918,12 @@ class group final : public ss::enable_lw_shared_from_this { model::term_id _term; absl::node_hash_map _fence_pid_epoch; - absl::node_hash_map _tx_data; + chunked_hash_map + _transactions; absl::node_hash_map _pending_offset_commits; enable_group_metrics _enable_group_metrics; - absl::node_hash_map - _ongoing_tx_offsets; - - struct expiration_info { - expiration_info(model::timeout_clock::duration timeout) - : timeout(timeout) - , last_update(model::timeout_clock::now()) {} - - model::timeout_clock::duration timeout; - model::timeout_clock::time_point last_update; - bool is_expiration_requested{false}; - - model::timeout_clock::time_point deadline() const { - return last_update + timeout; - } - - bool is_expired() const { - return is_expiration_requested || deadline() <= clock_type::now(); - } - - void update_last_update_time() { - last_update = model::timeout_clock::now(); - } - }; - - absl::node_hash_map - _expiration_info; - ss::gate _gate; ss::timer _auto_abort_timer; std::chrono::milliseconds _abort_interval_ms; diff --git a/src/v/kafka/server/group_manager.cc b/src/v/kafka/server/group_manager.cc index 9aefaab8b3045..ad3f9641fb862 100644 --- a/src/v/kafka/server/group_manager.cc +++ b/src/v/kafka/server/group_manager.cc @@ -972,18 +972,25 @@ ss::future<> group_manager::do_recover_group( .non_reclaimable = meta.metadata.non_reclaimable, }); } - - for (const auto& [_, tx] : group_stm.ongoing_tx_offsets()) { - group->insert_ongoing_tx_offsets(tx); - } for (auto& [id, epoch] : group_stm.fences()) { group->try_set_fence(id, epoch); } - for (auto& [id, tx_data] : group_stm.tx_data()) { - group->try_set_tx_data(id, tx_data.tx_seq, tx_data.tm_partition); - } - for (auto& [id, timeout] : group_stm.timeouts()) { - group->try_set_timeout(id, timeout); + + for (auto& [pid, tx] : group_stm.ongoing_transactions()) { + group::ongoing_transaction group_tx( + tx.tx_seq, tx.tm_partition, tx.timeout); + for (auto& [tp, o_md] : tx.offsets) { + group_tx.offsets[tp] = group::pending_tx_offset{ + .offset_metadata = group_tx::partition_offset{ + .tp = tp, + .offset = o_md.offset, + .leader_epoch = o_md.committed_leader_epoch, + .metadata = o_md.metadata, + }, + .log_offset = o_md.log_offset}; + } + + group->insert_ongoing_tx(pid, std::move(group_tx)); } if (group_stm.is_removed()) { diff --git a/src/v/kafka/server/group_recovery_consumer.cc b/src/v/kafka/server/group_recovery_consumer.cc index ed7e6fdba8b17..3fa15b535869e 100644 --- a/src/v/kafka/server/group_recovery_consumer.cc +++ b/src/v/kafka/server/group_recovery_consumer.cc @@ -96,7 +96,7 @@ group_recovery_consumer::operator()(model::record_batch batch) { "[group: {}] recovered update tx offsets: {}", val.group_id, val); - group_it->second.update_prepared(batch.last_offset(), val); + group_it->second.update_tx_offset(batch.last_offset(), val); co_return ss::stop_iteration::no; } else if ( diff --git a/src/v/kafka/server/group_stm.cc b/src/v/kafka/server/group_stm.cc index fb31638282853..73a8b5ac593e1 100644 --- a/src/v/kafka/server/group_stm.cc +++ b/src/v/kafka/server/group_stm.cc @@ -3,6 +3,7 @@ #include "cluster/logger.h" #include "kafka/server/group_metadata.h" #include "kafka/types.h" +#include "model/record.h" namespace kafka { @@ -23,31 +24,21 @@ void group_stm::update_offset( .log_offset = offset, .metadata = std::move(meta)}; } -void group_stm::update_prepared( - model::offset offset, group_tx::offsets_metadata val) { - auto tx = group::ongoing_tx_offsets{.pid = val.pid, .tx_seq = val.tx_seq}; - - auto [prepared_it, inserted] = _ongoing_tx_offsets.try_emplace( - tx.pid.get_id(), tx); - if (!inserted && prepared_it->second.pid.epoch > tx.pid.epoch) { +void group_stm::update_tx_offset( + model::offset offset, group_tx::offsets_metadata offset_md) { + auto it = _ongoing_txs.find(offset_md.pid); + // if an ongoing transaction doesn't exists we ignore the update + if (it == _ongoing_txs.end()) { vlog( cluster::txlog.warn, - "a logged tx {} is fenced off by prev logged tx {}", - val.pid, - prepared_it->second.pid); + "ongoing transaction for producer {} not found, skipping offsets " + "update", + offset_md.pid); return; - } else if (!inserted && prepared_it->second.pid.epoch < tx.pid.epoch) { - vlog( - cluster::txlog.warn, - "a logged tx {} overwrites prev logged tx {}", - val.pid, - prepared_it->second.pid); - prepared_it->second.pid = tx.pid; - prepared_it->second.offsets.clear(); } const auto now = model::timestamp::now(); - for (const auto& tx_offset : val.offsets) { + for (const auto& tx_offset : offset_md.offsets) { group::offset_metadata md{ .log_offset = offset, .offset = tx_offset.offset, @@ -56,22 +47,19 @@ void group_stm::update_prepared( .commit_timestamp = now, .expiry_timestamp = std::nullopt, }; - prepared_it->second.offsets[tx_offset.tp] = md; + it->second.offsets[tx_offset.tp] = md; } } void group_stm::commit(model::producer_identity pid) { - auto prepared_it = _ongoing_tx_offsets.find(pid.get_id()); - if (prepared_it == _ongoing_tx_offsets.end()) { + auto prepared_it = _ongoing_txs.find(pid); + if (prepared_it == _ongoing_txs.end()) { // missing prepare may happen when the consumer log gets truncated - vlog(cluster::txlog.trace, "can't find ongoing tx {}", pid); - return; - } else if (prepared_it->second.pid.epoch != pid.epoch) { vlog( cluster::txlog.warn, - "a comitting tx {} doesn't match ongoing tx {}", - pid, - prepared_it->second.pid); + "unable to find ongoing transaction for producer: {}, skipping " + "commit", + pid); return; } @@ -91,24 +79,40 @@ void group_stm::commit(model::producer_identity pid) { .log_offset = md.log_offset, .metadata = std::move(val)}; } - _ongoing_tx_offsets.erase(prepared_it); - _tx_data.erase(pid); - _timeouts.erase(pid); + _ongoing_txs.erase(prepared_it); } void group_stm::abort( model::producer_identity pid, [[maybe_unused]] model::tx_seq tx_seq) { - auto prepared_it = _ongoing_tx_offsets.find(pid.get_id()); - if ( - prepared_it - == _ongoing_tx_offsets.end()) { // NOLINT(bugprone-branch-clone) - return; - } else if (prepared_it->second.pid.epoch != pid.epoch) { - return; + _ongoing_txs.erase(pid); +} + +void group_stm::try_set_fence( + model::producer_id id, model::producer_epoch epoch) { + auto [fence_it, _] = _fence_pid_epoch.try_emplace(id, epoch); + if (fence_it->second < epoch) { + fence_it->second = epoch; + } +} + +void group_stm::try_set_fence( + model::producer_id id, + model::producer_epoch epoch, + model::tx_seq txseq, + model::timeout_clock::duration transaction_timeout_ms, + model::partition_id tm_partition) { + auto [fence_it, _] = _fence_pid_epoch.try_emplace(id, epoch); + if (fence_it->second <= epoch) { + fence_it->second = epoch; + _ongoing_txs.try_emplace( + model::producer_identity(id, epoch), + ongoing_tx{ + .tx_seq = txseq, + .tm_partition = tm_partition, + .timeout = transaction_timeout_ms, + .offsets = {}, + }); } - _ongoing_tx_offsets.erase(prepared_it); - _tx_data.erase(pid); - _timeouts.erase(pid); } } // namespace kafka diff --git a/src/v/kafka/server/group_stm.h b/src/v/kafka/server/group_stm.h index 8ec8f8a93b8cc..582f47c20f122 100644 --- a/src/v/kafka/server/group_stm.h +++ b/src/v/kafka/server/group_stm.h @@ -30,9 +30,12 @@ class group_stm { offset_metadata_value metadata; }; - struct tx_info { + struct ongoing_tx { model::tx_seq tx_seq; model::partition_id tm_partition; + model::timeout_clock::duration timeout; + absl::node_hash_map + offsets; }; void overwrite_metadata(group_metadata_value&&); @@ -40,37 +43,25 @@ class group_stm { void update_offset( const model::topic_partition&, model::offset, offset_metadata_value&&); void remove_offset(const model::topic_partition&); - void update_prepared(model::offset, group_tx::offsets_metadata); + void update_tx_offset(model::offset, group_tx::offsets_metadata); void commit(model::producer_identity); void abort(model::producer_identity, model::tx_seq); - void try_set_fence(model::producer_id id, model::producer_epoch epoch) { - auto [fence_it, _] = _fence_pid_epoch.try_emplace(id, epoch); - if (fence_it->second < epoch) { - fence_it->second = epoch; - } - } + void try_set_fence(model::producer_id id, model::producer_epoch epoch); void try_set_fence( model::producer_id id, model::producer_epoch epoch, model::tx_seq txseq, model::timeout_clock::duration transaction_timeout_ms, - model::partition_id tm_partition) { - auto [fence_it, _] = _fence_pid_epoch.try_emplace(id, epoch); - if (fence_it->second <= epoch) { - fence_it->second = epoch; - model::producer_identity pid(id(), epoch()); - _tx_data[pid] = tx_info{txseq, tm_partition}; - _timeouts[pid] = transaction_timeout_ms; - } - } + model::partition_id tm_partition); + bool has_data() const { return !_is_removed && (_is_loaded || _offsets.size() > 0); } bool is_removed() const { return _is_removed; } - const absl::node_hash_map& - ongoing_tx_offsets() const { - return _ongoing_tx_offsets; + const absl::node_hash_map& + ongoing_transactions() const { + return _ongoing_txs; } const absl::node_hash_map& @@ -83,31 +74,16 @@ class group_stm { return _fence_pid_epoch; } - const absl::node_hash_map& - tx_data() const { - return _tx_data; - } - - const absl:: - node_hash_map& - timeouts() const { - return _timeouts; - } - group_metadata_value& get_metadata() { return _metadata; } const group_metadata_value& get_metadata() const { return _metadata; } private: absl::node_hash_map _offsets; - absl::node_hash_map - _ongoing_tx_offsets; absl::node_hash_map _fence_pid_epoch; - absl::node_hash_map _tx_data; - absl:: - node_hash_map - _timeouts; + absl::node_hash_map _ongoing_txs; + group_metadata_value _metadata; bool _is_loaded{false}; bool _is_removed{false}; diff --git a/src/v/kafka/server/tests/consumer_group_recovery_test.cc b/src/v/kafka/server/tests/consumer_group_recovery_test.cc index fef14d835126a..965d60a0e152b 100644 --- a/src/v/kafka/server/tests/consumer_group_recovery_test.cc +++ b/src/v/kafka/server/tests/consumer_group_recovery_test.cc @@ -390,8 +390,7 @@ TEST_F_CORO(cg_recovery_test_fixture, test_tx_happy_path) { make_tx_offset("topic-3", 12, 1)}})); auto state = co_await recover_from_batches(copy_batches(batches)); - EXPECT_EQ(state.groups[gr_1].ongoing_tx_offsets().size(), 1); - EXPECT_EQ(state.groups[gr_1].tx_data().size(), 1); + EXPECT_EQ(state.groups[gr_1].ongoing_transactions().size(), 1); EXPECT_EQ(state.groups[gr_1].fences().size(), 1); // tx is ongoing offsets included in the transaction should not be // visible in state machine @@ -407,8 +406,7 @@ TEST_F_CORO(cg_recovery_test_fixture, test_tx_happy_path) { group_tx::commit_metadata{.group_id = gr_1})); state = co_await recover_from_batches(copy_batches(batches)); - EXPECT_TRUE(state.groups[gr_1].ongoing_tx_offsets().empty()); - EXPECT_TRUE(state.groups[gr_1].tx_data().empty()); + EXPECT_TRUE(state.groups[gr_1].ongoing_transactions().empty()); EXPECT_EQ(state.groups[gr_1].fences().size(), 1); expect_committed_offsets( @@ -433,9 +431,8 @@ TEST_F_CORO(cg_recovery_test_fixture, test_tx_happy_path) { "test-2/10@256", "topic-3/12@1"); - EXPECT_TRUE(state.groups[gr_1].tx_data().empty()); - EXPECT_TRUE(state.groups[gr_1].ongoing_tx_offsets().empty()); - EXPECT_TRUE(state.groups[gr_1].tx_data().empty()); + EXPECT_TRUE(state.groups[gr_1].ongoing_transactions().empty()); + // EXPECT_TRUE(state.groups[gr_1].fences().empty()); } @@ -476,9 +473,7 @@ TEST_F_CORO(cg_recovery_test_fixture, test_tx_abort) { pid, group_tx::abort_metadata{.group_id = gr_1, .tx_seq = tx_seq})); auto state = co_await recover_from_batches(copy_batches(batches)); - EXPECT_TRUE(state.groups[gr_1].tx_data().empty()); - EXPECT_TRUE(state.groups[gr_1].ongoing_tx_offsets().empty()); - EXPECT_TRUE(state.groups[gr_1].tx_data().empty()); + EXPECT_TRUE(state.groups[gr_1].ongoing_transactions().empty()); // EXPECT_TRUE(state.groups[gr_1].fences().empty()); expect_committed_offsets( state.groups[gr_1].offsets(), "test-1/0@1024", "test-2/10@256"); @@ -490,9 +485,7 @@ TEST_F_CORO(cg_recovery_test_fixture, test_tx_abort) { pid, group_tx::commit_metadata{.group_id = gr_1})); - EXPECT_TRUE(state.groups[gr_1].tx_data().empty()); - EXPECT_TRUE(state.groups[gr_1].ongoing_tx_offsets().empty()); - EXPECT_TRUE(state.groups[gr_1].tx_data().empty()); + EXPECT_TRUE(state.groups[gr_1].ongoing_transactions().empty()); // EXPECT_TRUE(state.groups[gr_1].fences().empty()); expect_committed_offsets( state.groups[gr_1].offsets(), "test-1/0@1024", "test-2/10@256"); From 814b31513a695678c3c7a2052f8d3da525d7cc58 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Ma=C5=9Blanka?= Date: Thu, 13 Jun 2024 07:56:10 +0000 Subject: [PATCH 6/7] k/group: replaced node_hash_map with chunked_hash_map in kafka::group MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Using a `chunked_hash_map` to avoid large allocations in consumer groups implementation code. Signed-off-by: Michał Maślanka --- src/v/kafka/group_probe.h | 5 +++-- src/v/kafka/server/group.cc | 2 +- src/v/kafka/server/group.h | 8 ++++---- src/v/kafka/server/group_stm.h | 15 +++++++-------- .../server/tests/consumer_group_recovery_test.cc | 5 +++-- 5 files changed, 18 insertions(+), 17 deletions(-) diff --git a/src/v/kafka/group_probe.h b/src/v/kafka/group_probe.h index c1db5c6261855..0ba5da9272f7c 100644 --- a/src/v/kafka/group_probe.h +++ b/src/v/kafka/group_probe.h @@ -12,6 +12,7 @@ #pragma once #include "config/configuration.h" +#include "container/chunked_hash_map.h" #include "kafka/server/member.h" #include "kafka/types.h" #include "metrics/metrics.h" @@ -94,8 +95,8 @@ template class group_probe { using member_map = absl::node_hash_map; using static_member_map - = absl::node_hash_map; - using offsets_map = absl::node_hash_map; + = chunked_hash_map; + using offsets_map = chunked_hash_map; public: explicit group_probe( diff --git a/src/v/kafka/server/group.cc b/src/v/kafka/server/group.cc index c1bc6fb16be1d..f5aa25c5adf16 100644 --- a/src/v/kafka/server/group.cc +++ b/src/v/kafka/server/group.cc @@ -2598,7 +2598,7 @@ ss::future<> group::remove_topic_partitions( _pending_offset_commits.erase(tp); if (auto offset = _offsets.extract(tp); offset) { removed.emplace_back( - std::move(offset.key()), std::move(offset.mapped()->metadata)); + std::move(offset->first), std::move(offset->second->metadata)); } } diff --git a/src/v/kafka/server/group.h b/src/v/kafka/server/group.h index 3ec2e5e0514ab..bf0290a67413c 100644 --- a/src/v/kafka/server/group.h +++ b/src/v/kafka/server/group.h @@ -883,7 +883,7 @@ class group final : public ss::enable_lw_shared_from_this { kafka::generation_id _generation; protocol_support _supported_protocols; member_map _members; - absl::node_hash_map _static_members; + chunked_hash_map _static_members; int _num_members_joining; absl::node_hash_map> _pending_members; @@ -895,7 +895,7 @@ class group final : public ss::enable_lw_shared_from_this { config::configuration& _conf; ss::lw_shared_ptr _catchup_lock; ss::lw_shared_ptr _partition; - absl::node_hash_map< + chunked_hash_map< model::topic_partition, std::unique_ptr> _offsets; @@ -916,11 +916,11 @@ class group final : public ss::enable_lw_shared_from_this { absl::flat_hash_map> _tx_locks; model::term_id _term; - absl::node_hash_map + chunked_hash_map _fence_pid_epoch; chunked_hash_map _transactions; - absl::node_hash_map + chunked_hash_map _pending_offset_commits; enable_group_metrics _enable_group_metrics; diff --git a/src/v/kafka/server/group_stm.h b/src/v/kafka/server/group_stm.h index 582f47c20f122..80afcf7c5785a 100644 --- a/src/v/kafka/server/group_stm.h +++ b/src/v/kafka/server/group_stm.h @@ -18,7 +18,6 @@ #include #include -#include #include namespace kafka { @@ -34,7 +33,7 @@ class group_stm { model::tx_seq tx_seq; model::partition_id tm_partition; model::timeout_clock::duration timeout; - absl::node_hash_map + chunked_hash_map offsets; }; @@ -59,17 +58,17 @@ class group_stm { } bool is_removed() const { return _is_removed; } - const absl::node_hash_map& + const chunked_hash_map& ongoing_transactions() const { return _ongoing_txs; } - const absl::node_hash_map& + const chunked_hash_map& offsets() const { return _offsets; } - const absl::node_hash_map& + const chunked_hash_map& fences() const { return _fence_pid_epoch; } @@ -79,10 +78,10 @@ class group_stm { const group_metadata_value& get_metadata() const { return _metadata; } private: - absl::node_hash_map _offsets; - absl::node_hash_map + chunked_hash_map _offsets; + chunked_hash_map _fence_pid_epoch; - absl::node_hash_map _ongoing_txs; + chunked_hash_map _ongoing_txs; group_metadata_value _metadata; bool _is_loaded{false}; diff --git a/src/v/kafka/server/tests/consumer_group_recovery_test.cc b/src/v/kafka/server/tests/consumer_group_recovery_test.cc index 965d60a0e152b..860fd8efe296e 100644 --- a/src/v/kafka/server/tests/consumer_group_recovery_test.cc +++ b/src/v/kafka/server/tests/consumer_group_recovery_test.cc @@ -7,6 +7,7 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0 +#include "container/chunked_hash_map.h" #include "kafka/protocol/types.h" #include "kafka/server/group_metadata.h" #include "kafka/server/group_recovery_consumer.h" @@ -170,9 +171,9 @@ struct cg_recovery_test_fixture : seastar_test { */ template void expect_committed_offsets( - const absl::node_hash_map< + const chunked_hash_map< model::topic_partition, - group_stm::logged_metadata> metadata, + group_stm::logged_metadata>& metadata, Args... offset_desc) { absl::node_hash_map expected_set; (expected_set.emplace(parse_offset(offset_desc)), ...); From f2946485ac60c0cb624e05af95b286a046f36db1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Ma=C5=9Blanka?= Date: Tue, 18 Jun 2024 09:38:10 +0000 Subject: [PATCH 7/7] k/group: unified produce session handling MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Unified handling of producer session fence and transactions into a single map. Signed-off-by: Michał Maślanka --- src/v/kafka/server/group.cc | 331 +++++++++--------- src/v/kafka/server/group.h | 28 +- src/v/kafka/server/group_manager.cc | 24 +- src/v/kafka/server/group_stm.cc | 54 +-- src/v/kafka/server/group_stm.h | 21 +- .../tests/consumer_group_recovery_test.cc | 22 +- 6 files changed, 245 insertions(+), 235 deletions(-) diff --git a/src/v/kafka/server/group.cc b/src/v/kafka/server/group.cc index f5aa25c5adf16..f7cf8f7882660 100644 --- a/src/v/kafka/server/group.cc +++ b/src/v/kafka/server/group.cc @@ -179,6 +179,9 @@ group::ongoing_transaction::ongoing_transaction( , timeout(tx_timeout) , last_update(model::timeout_clock::now()) {} +group::tx_producer::tx_producer(model::producer_epoch epoch) + : epoch(epoch) {} + namespace { template model::record_batch make_tx_batch( @@ -1672,16 +1675,15 @@ void group::fail_offset_commit( void group::reset_tx_state(model::term_id term) { _term = term; - _transactions.clear(); - _fence_pid_epoch.clear(); + _producers.clear(); } void group::insert_ongoing_tx( model::producer_identity pid, ongoing_transaction tx) { - auto [_, inserted] = _transactions.try_emplace(pid, std::move(tx)); - if (!inserted) { - vlog(_ctx_txlog.warn, "error adding ongoing transaction for {}", pid); - } + auto [it, inserted] = _producers.try_emplace(pid.get_id(), pid.get_epoch()); + it->second.epoch = pid.get_epoch(); + it->second.transaction = std::make_unique( + std::move(tx)); } ss::future @@ -1698,24 +1700,25 @@ group::commit_tx(cluster::commit_group_tx_request r) { co_return make_commit_tx_reply(cluster::tx::errc::stale); } - auto fence_it = _fence_pid_epoch.find(r.pid.get_id()); - if (fence_it == _fence_pid_epoch.end()) { + auto it = _producers.find(r.pid.get_id()); + if (it == _producers.end()) { vlog( - _ctx_txlog.warn, "commit_tx request: {} failed - fence not found", r); + _ctx_txlog.warn, + "commit_tx request: {} failed - producer not found", + r); co_return make_commit_tx_reply(cluster::tx::errc::request_rejected); } - - if (r.pid.get_epoch() != fence_it->second) { + auto& producer = it->second; + if (r.pid.get_epoch() != producer.epoch) { vlog( _ctx_txlog.warn, "commit_tx request: {} failed - fenced, stored producer epoch: {}", r, - fence_it->second); + producer.epoch); co_return make_commit_tx_reply(cluster::tx::errc::request_rejected); } - auto tx_it = _transactions.find(r.pid); - if (tx_it == _transactions.end()) { + if (producer.transaction == nullptr) { vlog( _ctx_txlog.trace, "commit_tx request: {} - can not find ongoing transaction, it was " @@ -1723,8 +1726,8 @@ group::commit_tx(cluster::commit_group_tx_request r) { r); co_return make_commit_tx_reply(cluster::tx::errc::none); } - - if (tx_it->second.tx_seq > r.tx_seq) { + auto& producer_tx = *producer.transaction; + if (producer_tx.tx_seq > r.tx_seq) { // rare situation: // * tm_stm begins (tx_seq+1) // * request on this group passes but then tm_stm fails and forgets @@ -1737,15 +1740,15 @@ group::commit_tx(cluster::commit_group_tx_request r) { "observed", r.pid, r.tx_seq, - tx_it->second.tx_seq); + producer_tx.tx_seq); co_return make_commit_tx_reply(cluster::tx::errc::none); } - if (tx_it->second.tx_seq != r.tx_seq) { + if (producer_tx.tx_seq != r.tx_seq) { vlog( _ctx_txlog.warn, "commit_tx request: {} failed - tx_seq mismatch. Expected seq: {}", r, - tx_it->second.tx_seq); + producer_tx.tx_seq); co_return make_commit_tx_reply(cluster::tx::errc::request_rejected); } @@ -1784,18 +1787,19 @@ group::begin_tx(cluster::begin_group_tx_request r) { co_return make_begin_tx_reply(cluster::tx::errc::stale); } - auto fence_it = _fence_pid_epoch.find(r.pid.get_id()); + auto it = _producers.find(r.pid.get_id()); - if (fence_it != _fence_pid_epoch.end()) { - if (r.pid.get_epoch() < fence_it->second) { + if (it != _producers.end()) { + auto& producer = it->second; + if (r.pid.get_epoch() < producer.epoch) { vlog( _ctx_txlog.warn, "begin tx request failed. Producer {} epoch is lower than " "current fence epoch: {}", r.pid, - fence_it->second); + producer.epoch); co_return make_begin_tx_reply(cluster::tx::errc::fenced); - } else if (r.pid.get_epoch() > fence_it->second) { + } else if (r.pid.get_epoch() > producer.epoch) { // there is a fence, it might be that tm_stm failed, forget about // an ongoing transaction, assigned next pid for the same tx.id and // started a new transaction without aborting the previous one. @@ -1806,7 +1810,7 @@ group::begin_tx(cluster::begin_group_tx_request r) { // the timeout auto old_pid = model::producer_identity{ - r.pid.get_id(), fence_it->second}; + r.pid.get_id(), producer.epoch}; auto ar = co_await do_try_abort_old_tx(old_pid); if (ar != cluster::tx::errc::none) { vlog( @@ -1819,33 +1823,32 @@ group::begin_tx(cluster::begin_group_tx_request r) { co_return make_begin_tx_reply(cluster::tx::errc::stale); } } - } - // Now we know the current producer epoch is valid - auto it = _transactions.find(r.pid); - if (it != _transactions.end()) { - if (r.tx_seq != it->second.tx_seq) { - vlog( - _ctx_txlog.warn, - "begin tx request {} failed - produced has already ongoing " - "transaction with highest sequence number: {}", - r, - it->second.tx_seq); - co_return make_begin_tx_reply( - cluster::tx::errc::unknown_server_error); - } + if (producer.transaction) { + auto& producer_tx = *producer.transaction; + if (r.tx_seq != producer_tx.tx_seq) { + vlog( + _ctx_txlog.warn, + "begin tx request {} failed - produced has already ongoing " + "transaction with different sequence number: {}", + r, + producer_tx.tx_seq); + co_return make_begin_tx_reply( + cluster::tx::errc::unknown_server_error); + } - if (!it->second.offsets.empty()) { - vlog( - _ctx_txlog.warn, - "begin tx request {} failed - transaction is already ongoing and " - "accepted offset commits", - r.pid, - r.tx_seq); - co_return make_begin_tx_reply( - cluster::tx::errc::unknown_server_error); + if (!producer_tx.offsets.empty()) { + vlog( + _ctx_txlog.warn, + "begin tx request {} failed - transaction is already ongoing " + "and accepted offset commits", + r); + co_return make_begin_tx_reply( + cluster::tx::errc::unknown_server_error); + } + // begin_tx request is idempotent, return success + co_return cluster::begin_group_tx_reply( + _term, cluster::tx::errc::none); } - // begin_tx request is idempotent, return success - co_return cluster::begin_group_tx_reply(_term, cluster::tx::errc::none); } group_tx::fence_metadata fence{ @@ -1874,12 +1877,13 @@ group::begin_tx(cluster::begin_group_tx_request r) { } co_return make_begin_tx_reply(cluster::tx::errc::leader_not_found); } - // set fence or update the one we already have - _fence_pid_epoch[r.pid.get_id()] = r.pid.get_epoch(); + auto [producer_it, _] = _producers.try_emplace( + r.pid.get_id(), r.pid.get_epoch()); + producer_it->second.epoch = r.pid.get_epoch(); + producer_it->second.transaction = std::make_unique( + ongoing_transaction(r.tx_seq, r.tm_partition, r.timeout)); - auto [tx_it, _] = _transactions.try_emplace( - r.pid, r.tx_seq, r.tm_partition, r.timeout); - try_arm(tx_it->second.deadline()); + try_arm(producer_it->second.transaction->deadline()); co_return cluster::begin_group_tx_reply(_term, cluster::tx::errc::none); } @@ -1900,33 +1904,33 @@ group::abort_tx(cluster::abort_group_tx_request r) { co_return make_abort_tx_reply(cluster::tx::errc::stale); } - auto fence_it = _fence_pid_epoch.find(r.pid.get_id()); - if (fence_it == _fence_pid_epoch.end()) { + auto it = _producers.find(r.pid.get_id()); + if (it == _producers.end()) { vlog( _ctx_txlog.warn, - "abort_tx request: {} failed - producer fence not found", + "abort_tx request: {} failed - producer not found", r); co_return make_abort_tx_reply(cluster::tx::errc::request_rejected); } - if (r.pid.get_epoch() != fence_it->second) { + auto& producer = it->second; + if (r.pid.get_epoch() != producer.epoch) { vlog( _ctx_txlog.warn, "abort_tx request: {} failed - fence epoch mismatch. Fence epoch: {}", r.pid, - fence_it->second); + producer.epoch); co_return make_abort_tx_reply(cluster::tx::errc::request_rejected); } - auto tx_it = _transactions.find(r.pid); - if (tx_it == _transactions.end()) { + if (producer.transaction == nullptr) { vlog( _ctx_txlog.trace, "unable to find transaction for {}, probably already aborted", r.pid); co_return make_abort_tx_reply(cluster::tx::errc::none); } - - if (tx_it->second.tx_seq > r.tx_seq) { + auto& producer_tx = *producer.transaction; + if (producer_tx.tx_seq > r.tx_seq) { // rare situation: // * tm_stm begins (tx_seq+1) // * request on this group passes but then tm_stm fails and forgets @@ -1938,18 +1942,19 @@ group::abort_tx(cluster::abort_group_tx_request r) { "producer transaction {} already aborted, ongoing tx sequence: {}, " "request tx sequence: {}", r.pid, - tx_it->second.tx_seq, + producer_tx.tx_seq, r.tx_seq); co_return make_abort_tx_reply(cluster::tx::errc::none); } - if (tx_it->second.tx_seq != r.tx_seq) { + if (producer_tx.tx_seq != r.tx_seq) { vlog( _ctx_txlog.warn, "abort_tx request: {} failed - tx sequence mismatch. Ongoing tx " "sequence: {}, request tx sequence: {}", r.pid, - fence_it->second); + producer_tx.tx_seq, + r.tx_seq); co_return make_abort_tx_reply(cluster::tx::errc::request_rejected); } @@ -1973,8 +1978,8 @@ group::store_txn_offsets(txn_offset_commit_request r) { model::producer_identity pid{r.data.producer_id, r.data.producer_epoch}; // checking fencing - auto fence_it = _fence_pid_epoch.find(pid.get_id()); - if (fence_it == _fence_pid_epoch.end()) { + auto it = _producers.find(pid.get_id()); + if (it == _producers.end()) { vlog( _ctx_txlog.warn, "Can't store txn offsets: fence with pid {} isn't set", @@ -1982,18 +1987,18 @@ group::store_txn_offsets(txn_offset_commit_request r) { co_return txn_offset_commit_response( r, error_code::invalid_producer_epoch); } - if (r.data.producer_epoch != fence_it->second) { + auto& producer = it->second; + if (r.data.producer_epoch != producer.epoch) { vlog( _ctx_txlog.trace, "Can't store txn offsets with pid {} - the fence doesn't match {}", pid, - fence_it->second); + producer.epoch); co_return txn_offset_commit_response( r, error_code::invalid_producer_epoch); } - auto tx_it = _transactions.find(pid); - if (tx_it == _transactions.end()) { + if (producer.transaction == nullptr) { vlog( _ctx_txlog.warn, "Can't store txn offsets: current tx with pid {} isn't ongoing", @@ -2002,7 +2007,7 @@ group::store_txn_offsets(txn_offset_commit_request r) { r, error_code::invalid_producer_epoch); } - const auto tx_seq = tx_it->second.tx_seq; + auto& producer_tx = *producer.transaction; chunked_vector offsets; @@ -2020,7 +2025,7 @@ group::store_txn_offsets(txn_offset_commit_request r) { group_tx::offsets_metadata tx_entry{ .group_id = r.data.group_id, .pid = pid, - .tx_seq = tx_seq, + .tx_seq = producer_tx.tx_seq, .offsets = {offsets.begin(), offsets.end()}, }; @@ -2046,8 +2051,9 @@ group::store_txn_offsets(txn_offset_commit_request r) { co_return txn_offset_commit_response( r, error_code::unknown_server_error); } - tx_it = _transactions.find(pid); - if (tx_it == _transactions.end()) { + + it = _producers.find(pid.get_id()); + if (it == _producers.end() || it->second.transaction == nullptr) { vlog( _ctx_txlog.warn, "Can't store txn offsets: current tx with pid {} isn't ongoing", @@ -2055,13 +2061,14 @@ group::store_txn_offsets(txn_offset_commit_request r) { co_return txn_offset_commit_response( r, error_code::invalid_producer_epoch); } + auto& ongoing_tx = *it->second.transaction; for (auto& o : offsets) { - tx_it->second.offsets[o.tp] = pending_tx_offset{ + ongoing_tx.offsets[o.tp] = pending_tx_offset{ .offset_metadata = o, .log_offset = result.value().last_offset, }; } - tx_it->second.update_last_update_time(); + ongoing_tx.update_last_update_time(); co_return txn_offset_commit_response(r, error_code::none); } @@ -2835,16 +2842,17 @@ ss::future group::do_abort( } co_return make_abort_tx_reply(cluster::tx::errc::timeout); } - - _transactions.erase(pid); - + auto it = _producers.find(pid.get_id()); + if (it != _producers.end()) { + it->second.transaction.reset(); + } co_return make_abort_tx_reply(cluster::tx::errc::none); } ss::future group::do_commit(kafka::group_id group_id, model::producer_identity pid) { - auto ongoing_it = _transactions.find(pid); - if (ongoing_it == _transactions.end()) { + auto it = _producers.find(pid.get_id()); + if (it == _producers.end() || it->second.transaction == nullptr) { // Impossible situation vlog( _ctx_txlog.error, @@ -2852,7 +2860,7 @@ group::do_commit(kafka::group_id group_id, model::producer_identity pid) { pid); co_return make_commit_tx_reply(cluster::tx::errc::unknown_server_error); } - + auto& ongoing_tx = *it->second.transaction; // It is fix for https://github.com/redpanda-data/redpanda/issues/5163. // Problem is group_*_tx contains only producer_id in key, so compaction // save only last records for this events. We need only save in logs @@ -2870,7 +2878,7 @@ group::do_commit(kafka::group_id group_id, model::producer_identity pid) { cluster::simple_batch_builder store_offset_builder( model::record_batch_type::raft_data, model::offset(0)); - for (const auto& [tp, pending_offset] : ongoing_it->second.offsets) { + for (const auto& [tp, pending_offset] : ongoing_tx.offsets) { update_store_offset_builder( store_offset_builder, tp.topic, @@ -2914,9 +2922,9 @@ group::do_commit(kafka::group_id group_id, model::producer_identity pid) { } co_return make_commit_tx_reply(cluster::tx::errc::timeout); } - ongoing_it = _transactions.find(pid); - if (ongoing_it == _transactions.end()) { + it = _producers.find(pid.get_id()); + if (it == _producers.end() || it->second.transaction == nullptr) { vlog( _ctx_txlog.error, "unable to find ongoing transaction for producer: {}", @@ -2924,7 +2932,7 @@ group::do_commit(kafka::group_id group_id, model::producer_identity pid) { co_return make_commit_tx_reply(cluster::tx::errc::unknown_server_error); } - for (const auto& [tp, md] : ongoing_it->second.offsets) { + for (const auto& [tp, md] : it->second.transaction->offsets) { try_upsert_offset( tp, offset_metadata{ @@ -2937,7 +2945,7 @@ group::do_commit(kafka::group_id group_id, model::producer_identity pid) { }); } - _transactions.erase(ongoing_it); + it->second.transaction.reset(); co_return make_commit_tx_reply(cluster::tx::errc::none); } @@ -2951,8 +2959,11 @@ void group::abort_old_txes() { void group::maybe_rearm_timer() { std::optional earliest_deadline; - for (auto& [pid, ongoing_tx] : _transactions) { - auto candidate = ongoing_tx.deadline(); + for (auto& [pid, producer] : _producers) { + if (producer.transaction == nullptr) { + continue; + } + auto candidate = producer.transaction->deadline(); if (earliest_deadline) { earliest_deadline = std::min(earliest_deadline.value(), candidate); } else { @@ -2975,14 +2986,14 @@ ss::future<> group::do_abort_old_txes() { absl::btree_set expired; - for (auto& [pid, tx] : _transactions) { - if (!tx.is_expired()) { + for (auto& [pid, producer] : _producers) { + if ( + producer.transaction == nullptr + || !producer.transaction->is_expired()) { continue; } - auto it = _fence_pid_epoch.find(pid.get_id()); - if (it != _fence_pid_epoch.end()) { - expired.insert(pid); - } + + expired.insert(model::producer_identity{pid, producer.epoch}); } for (auto pid : expired) { @@ -2994,18 +3005,10 @@ ss::future<> group::do_abort_old_txes() { ss::future<> group::try_abort_old_tx(model::producer_identity pid) { return get_tx_lock(pid.get_id())->with([this, pid]() { - vlog(_ctx_txlog.info, "attempting expiration of producer: {}", pid); - - auto tx_it = _transactions.find(pid); - if (tx_it != _transactions.end()) { - if (!tx_it->second.is_expired()) { - vlog( - _ctx_txlog.trace, - "producer {} transaction is not expired, skipping", - pid); - return ss::now(); - } - } + vlog( + _ctx_txlog.info, + "attempting expiration of producer: {} transaction", + pid); return do_try_abort_old_tx(pid).discard_result(); }); @@ -3015,64 +3018,61 @@ ss::future group::do_try_abort_old_tx(model::producer_identity pid) { vlog(_ctx_txlog.trace, "aborting producer {} transaction", pid); - auto tx_it = _transactions.find(pid); - if (tx_it != _transactions.end()) { - auto tx_seq = tx_it->second.tx_seq; + auto it = _producers.find(pid.get_id()); + if (it == _producers.end() || it->second.transaction == nullptr) { + co_return cluster::tx::errc::none; + } + auto& producer_tx = *it->second.transaction; - model::partition_id coordinator_partition_id - = tx_it->second.coordinator_partition; - vlog( - _ctx_txlog.trace, - "sending abort tx request for producer {} with tx_seq: {} to " - "coordinator partition: {}", - pid, - tx_seq, - coordinator_partition_id); - auto r = co_await _tx_frontend.local().route_globally( - cluster::try_abort_request( - coordinator_partition_id, - pid, - tx_seq, - config::shard_local_cfg().rm_sync_timeout_ms.value())); - - if (r.ec != cluster::tx::errc::none) { - co_return r.ec; - } - vlog( - _ctx_txlog.trace, - "producer id {} abort request result: [committed: {}, aborted: {}]", - pid, - r.commited, - r.aborted); + vlog( + _ctx_txlog.trace, + "sending abort tx request for producer {} with tx_seq: {} to " + "coordinator partition: {}", + pid, + producer_tx.tx_seq, + producer_tx.coordinator_partition); + auto r = co_await _tx_frontend.local().route_globally( + cluster::try_abort_request( + producer_tx.coordinator_partition, + pid, + producer_tx.tx_seq, + config::shard_local_cfg().rm_sync_timeout_ms.value())); + + if (r.ec != cluster::tx::errc::none) { + co_return r.ec; + } + vlog( + _ctx_txlog.trace, + "producer id {} abort request result: [committed: {}, aborted: {}]", + pid, + r.commited, + r.aborted); - if (r.commited) { - auto res = co_await do_commit(_id, pid); - if (res.ec != cluster::tx::errc::none) { - vlog( - _ctxlog.warn, - "committing producer {} transaction failed - {}", - pid, - res.ec); - } - co_return res.ec; + if (r.commited) { + auto res = co_await do_commit(_id, pid); + if (res.ec != cluster::tx::errc::none) { + vlog( + _ctxlog.warn, + "committing producer {} transaction failed - {}", + pid, + res.ec); } + co_return res.ec; + } - if (r.aborted) { - auto res = co_await do_abort(_id, pid, tx_seq); - if (res.ec != cluster::tx::errc::none) { - vlog( - _ctxlog.warn, - "aborting producer {} transaction failed - {}", - pid, - res.ec); - } - co_return res.ec; + if (r.aborted) { + auto res = co_await do_abort(_id, pid, producer_tx.tx_seq); + if (res.ec != cluster::tx::errc::none) { + vlog( + _ctxlog.warn, + "aborting producer {} transaction failed - {}", + pid, + res.ec); } - - co_return cluster::tx::errc::stale; + co_return res.ec; } - // no active transaction for requested producer found - co_return cluster::tx::errc::none; + + co_return cluster::tx::errc::stale; } void group::try_arm(time_point_type deadline) { @@ -3325,7 +3325,12 @@ group::get_expired_offsets(std::chrono::seconds retention_period) { bool group::has_offsets() const { return !_offsets.empty() || !_pending_offset_commits.empty() - || !_transactions.empty(); + || std::any_of( + _producers.begin(), + _producers.end(), + [](const producers_map::value_type& p) { + return p.second.transaction != nullptr; + }); } std::vector diff --git a/src/v/kafka/server/group.h b/src/v/kafka/server/group.h index bf0290a67413c..3627e3dbf6526 100644 --- a/src/v/kafka/server/group.h +++ b/src/v/kafka/server/group.h @@ -190,6 +190,13 @@ class group final : public ss::enable_lw_shared_from_this { chunked_hash_map offsets; }; + struct tx_producer { + explicit tx_producer(model::producer_epoch); + + model::producer_epoch epoch; + std::unique_ptr transaction; + }; + struct offset_metadata { model::offset log_offset; model::offset offset; @@ -628,11 +635,11 @@ class group final : public ss::enable_lw_shared_from_this { void insert_ongoing_tx(model::producer_identity pid, ongoing_transaction tx); - void try_set_fence(model::producer_id id, model::producer_epoch epoch) { - auto [fence_it, _] = _fence_pid_epoch.try_emplace(id, epoch); - if (fence_it->second <= epoch) { - fence_it->second = epoch; + auto [it, _] = _producers.try_emplace(id, epoch); + if (it->second.epoch < epoch) { + it->second.epoch = epoch; + it->second.transaction.reset(); } } @@ -692,6 +699,7 @@ class group final : public ss::enable_lw_shared_from_this { private: using member_map = absl::node_hash_map; using protocol_support = absl::node_hash_map; + using producers_map = chunked_hash_map; friend std::ostream& operator<<(std::ostream&, const group&); @@ -825,10 +833,9 @@ class group final : public ss::enable_lw_shared_from_this { } if (std::any_of( - _transactions.begin(), - _transactions.end(), - [&tp](const auto& tp_info) { - return tp_info.second.offsets.contains(tp); + _producers.begin(), _producers.end(), [&tp](const auto& p) { + return p.second.transaction + && p.second.transaction->offsets.contains(tp); })) { return true; } @@ -916,10 +923,7 @@ class group final : public ss::enable_lw_shared_from_this { absl::flat_hash_map> _tx_locks; model::term_id _term; - chunked_hash_map - _fence_pid_epoch; - chunked_hash_map - _transactions; + producers_map _producers; chunked_hash_map _pending_offset_commits; enable_group_metrics _enable_group_metrics; diff --git a/src/v/kafka/server/group_manager.cc b/src/v/kafka/server/group_manager.cc index ad3f9641fb862..d9955abae413d 100644 --- a/src/v/kafka/server/group_manager.cc +++ b/src/v/kafka/server/group_manager.cc @@ -972,15 +972,14 @@ ss::future<> group_manager::do_recover_group( .non_reclaimable = meta.metadata.non_reclaimable, }); } - for (auto& [id, epoch] : group_stm.fences()) { - group->try_set_fence(id, epoch); - } - - for (auto& [pid, tx] : group_stm.ongoing_transactions()) { - group::ongoing_transaction group_tx( - tx.tx_seq, tx.tm_partition, tx.timeout); - for (auto& [tp, o_md] : tx.offsets) { - group_tx.offsets[tp] = group::pending_tx_offset{ + for (auto& [id, session] : group_stm.producers()) { + group->try_set_fence(id, session.epoch); + if (session.tx) { + auto& tx = *session.tx; + group::ongoing_transaction group_tx( + tx.tx_seq, tx.tm_partition, tx.timeout); + for (auto& [tp, o_md] : tx.offsets) { + group_tx.offsets[tp] = group::pending_tx_offset{ .offset_metadata = group_tx::partition_offset{ .tp = tp, .offset = o_md.offset, @@ -988,9 +987,12 @@ ss::future<> group_manager::do_recover_group( .metadata = o_md.metadata, }, .log_offset = o_md.log_offset}; - } + } - group->insert_ongoing_tx(pid, std::move(group_tx)); + group->insert_ongoing_tx( + model::producer_identity(id, session.epoch), + std::move(group_tx)); + } } if (group_stm.is_removed()) { diff --git a/src/v/kafka/server/group_stm.cc b/src/v/kafka/server/group_stm.cc index 73a8b5ac593e1..983974a56bf71 100644 --- a/src/v/kafka/server/group_stm.cc +++ b/src/v/kafka/server/group_stm.cc @@ -26,13 +26,13 @@ void group_stm::update_offset( void group_stm::update_tx_offset( model::offset offset, group_tx::offsets_metadata offset_md) { - auto it = _ongoing_txs.find(offset_md.pid); - // if an ongoing transaction doesn't exists we ignore the update - if (it == _ongoing_txs.end()) { + auto it = _producers.find(offset_md.pid.get_id()); + if ( + it == _producers.end() || it->second.tx == nullptr + || offset_md.pid.epoch != it->second.epoch) { vlog( cluster::txlog.warn, - "ongoing transaction for producer {} not found, skipping offsets " - "update", + "producer {} not found, skipping offsets update", offset_md.pid); return; } @@ -47,13 +47,15 @@ void group_stm::update_tx_offset( .commit_timestamp = now, .expiry_timestamp = std::nullopt, }; - it->second.offsets[tx_offset.tp] = md; + it->second.tx->offsets[tx_offset.tp] = md; } } void group_stm::commit(model::producer_identity pid) { - auto prepared_it = _ongoing_txs.find(pid); - if (prepared_it == _ongoing_txs.end()) { + auto it = _producers.find(pid.get_id()); + if ( + it == _producers.end() || it->second.tx == nullptr + || pid.epoch != it->second.epoch) { // missing prepare may happen when the consumer log gets truncated vlog( cluster::txlog.warn, @@ -63,7 +65,7 @@ void group_stm::commit(model::producer_identity pid) { return; } - for (const auto& [tp, md] : prepared_it->second.offsets) { + for (const auto& [tp, md] : it->second.tx->offsets) { offset_metadata_value val{ .offset = md.offset, .leader_epoch @@ -78,20 +80,22 @@ void group_stm::commit(model::producer_identity pid) { _offsets[tp] = logged_metadata{ .log_offset = md.log_offset, .metadata = std::move(val)}; } - - _ongoing_txs.erase(prepared_it); + it->second.tx.reset(); } void group_stm::abort( model::producer_identity pid, [[maybe_unused]] model::tx_seq tx_seq) { - _ongoing_txs.erase(pid); + auto it = _producers.find(pid.get_id()); + if (it != _producers.end() && it->second.epoch == pid.get_epoch()) { + it->second.tx.reset(); + } } void group_stm::try_set_fence( model::producer_id id, model::producer_epoch epoch) { - auto [fence_it, _] = _fence_pid_epoch.try_emplace(id, epoch); - if (fence_it->second < epoch) { - fence_it->second = epoch; + auto [it, _] = _producers.try_emplace(id, epoch); + if (it->second.epoch < epoch) { + it->second.epoch = epoch; } } @@ -101,17 +105,15 @@ void group_stm::try_set_fence( model::tx_seq txseq, model::timeout_clock::duration transaction_timeout_ms, model::partition_id tm_partition) { - auto [fence_it, _] = _fence_pid_epoch.try_emplace(id, epoch); - if (fence_it->second <= epoch) { - fence_it->second = epoch; - _ongoing_txs.try_emplace( - model::producer_identity(id, epoch), - ongoing_tx{ - .tx_seq = txseq, - .tm_partition = tm_partition, - .timeout = transaction_timeout_ms, - .offsets = {}, - }); + auto [it, _] = _producers.try_emplace(id, epoch); + if (it->second.epoch <= epoch) { + it->second.epoch = epoch; + it->second.tx = std::make_unique(ongoing_tx{ + .tx_seq = txseq, + .tm_partition = tm_partition, + .timeout = transaction_timeout_ms, + .offsets = {}, + }); } } diff --git a/src/v/kafka/server/group_stm.h b/src/v/kafka/server/group_stm.h index 80afcf7c5785a..d9a99c136c952 100644 --- a/src/v/kafka/server/group_stm.h +++ b/src/v/kafka/server/group_stm.h @@ -20,6 +20,8 @@ #include +#include + namespace kafka { class group_stm { @@ -37,6 +39,10 @@ class group_stm { offsets; }; + struct producer { + model::producer_epoch epoch; + std::unique_ptr tx; + }; void overwrite_metadata(group_metadata_value&&); void update_offset( @@ -57,10 +63,8 @@ class group_stm { return !_is_removed && (_is_loaded || _offsets.size() > 0); } bool is_removed() const { return _is_removed; } - - const chunked_hash_map& - ongoing_transactions() const { - return _ongoing_txs; + const chunked_hash_map& producers() const { + return _producers; } const chunked_hash_map& @@ -68,20 +72,13 @@ class group_stm { return _offsets; } - const chunked_hash_map& - fences() const { - return _fence_pid_epoch; - } - group_metadata_value& get_metadata() { return _metadata; } const group_metadata_value& get_metadata() const { return _metadata; } private: chunked_hash_map _offsets; - chunked_hash_map - _fence_pid_epoch; - chunked_hash_map _ongoing_txs; + chunked_hash_map _producers; group_metadata_value _metadata; bool _is_loaded{false}; diff --git a/src/v/kafka/server/tests/consumer_group_recovery_test.cc b/src/v/kafka/server/tests/consumer_group_recovery_test.cc index 860fd8efe296e..a934381ec884e 100644 --- a/src/v/kafka/server/tests/consumer_group_recovery_test.cc +++ b/src/v/kafka/server/tests/consumer_group_recovery_test.cc @@ -391,8 +391,7 @@ TEST_F_CORO(cg_recovery_test_fixture, test_tx_happy_path) { make_tx_offset("topic-3", 12, 1)}})); auto state = co_await recover_from_batches(copy_batches(batches)); - EXPECT_EQ(state.groups[gr_1].ongoing_transactions().size(), 1); - EXPECT_EQ(state.groups[gr_1].fences().size(), 1); + EXPECT_EQ(state.groups[gr_1].producers().size(), 1); // tx is ongoing offsets included in the transaction should not be // visible in state machine @@ -407,8 +406,8 @@ TEST_F_CORO(cg_recovery_test_fixture, test_tx_happy_path) { group_tx::commit_metadata{.group_id = gr_1})); state = co_await recover_from_batches(copy_batches(batches)); - EXPECT_TRUE(state.groups[gr_1].ongoing_transactions().empty()); - EXPECT_EQ(state.groups[gr_1].fences().size(), 1); + + EXPECT_EQ(state.groups[gr_1].producers().size(), 1); expect_committed_offsets( state.groups[gr_1].offsets(), @@ -432,9 +431,8 @@ TEST_F_CORO(cg_recovery_test_fixture, test_tx_happy_path) { "test-2/10@256", "topic-3/12@1"); - EXPECT_TRUE(state.groups[gr_1].ongoing_transactions().empty()); - - // EXPECT_TRUE(state.groups[gr_1].fences().empty()); + EXPECT_EQ(state.groups[gr_1].producers().size(), 1); + EXPECT_EQ(state.groups[gr_1].producers().begin()->second.tx, nullptr); } TEST_F_CORO(cg_recovery_test_fixture, test_tx_abort) { @@ -474,8 +472,9 @@ TEST_F_CORO(cg_recovery_test_fixture, test_tx_abort) { pid, group_tx::abort_metadata{.group_id = gr_1, .tx_seq = tx_seq})); auto state = co_await recover_from_batches(copy_batches(batches)); - EXPECT_TRUE(state.groups[gr_1].ongoing_transactions().empty()); - // EXPECT_TRUE(state.groups[gr_1].fences().empty()); + EXPECT_EQ(state.groups[gr_1].producers().size(), 1); + EXPECT_EQ(state.groups[gr_1].producers().begin()->second.tx, nullptr); + expect_committed_offsets( state.groups[gr_1].offsets(), "test-1/0@1024", "test-2/10@256"); @@ -486,8 +485,9 @@ TEST_F_CORO(cg_recovery_test_fixture, test_tx_abort) { pid, group_tx::commit_metadata{.group_id = gr_1})); - EXPECT_TRUE(state.groups[gr_1].ongoing_transactions().empty()); - // EXPECT_TRUE(state.groups[gr_1].fences().empty()); + EXPECT_EQ(state.groups[gr_1].producers().size(), 1); + EXPECT_EQ(state.groups[gr_1].producers().begin()->second.tx, nullptr); + expect_committed_offsets( state.groups[gr_1].offsets(), "test-1/0@1024", "test-2/10@256"); }