diff --git a/src/v/kafka/group_probe.h b/src/v/kafka/group_probe.h index c1db5c6261855..0ba5da9272f7c 100644 --- a/src/v/kafka/group_probe.h +++ b/src/v/kafka/group_probe.h @@ -12,6 +12,7 @@ #pragma once #include "config/configuration.h" +#include "container/chunked_hash_map.h" #include "kafka/server/member.h" #include "kafka/types.h" #include "metrics/metrics.h" @@ -94,8 +95,8 @@ template class group_probe { using member_map = absl::node_hash_map; using static_member_map - = absl::node_hash_map; - using offsets_map = absl::node_hash_map; + = chunked_hash_map; + using offsets_map = chunked_hash_map; public: explicit group_probe( diff --git a/src/v/kafka/server/group.cc b/src/v/kafka/server/group.cc index 6967279c04155..f7cf8f7882660 100644 --- a/src/v/kafka/server/group.cc +++ b/src/v/kafka/server/group.cc @@ -169,6 +169,19 @@ bool group::valid_previous_state(group_state s) const { __builtin_unreachable(); } + +group::ongoing_transaction::ongoing_transaction( + model::tx_seq tx_seq, + model::partition_id coordinator_partition, + model::timeout_clock::duration tx_timeout) + : tx_seq(tx_seq) + , coordinator_partition(coordinator_partition) + , timeout(tx_timeout) + , last_update(model::timeout_clock::now()) {} + +group::tx_producer::tx_producer(model::producer_epoch epoch) + : epoch(epoch) {} + namespace { template model::record_batch make_tx_batch( @@ -1662,64 +1675,59 @@ void group::fail_offset_commit( void group::reset_tx_state(model::term_id term) { _term = term; - _ongoing_tx_offsets.clear(); - _expiration_info.clear(); - _tx_data.clear(); - _fence_pid_epoch.clear(); + _producers.clear(); } -void group::insert_ongoing_tx_offsets(ongoing_tx_offsets tx) { - auto pid = tx.pid; - - // TODO: warn when legacy support is removed and _tx_data doesn't contain - // pid - auto [txseq_it, inserted] = _tx_data.try_emplace( - pid.get_id(), tx_data{tx.tx_seq, model::legacy_tm_ntp.tp.partition}); - if (!inserted) { - if (txseq_it->second.tx_seq != tx.tx_seq) { - vlog( - _ctx_txlog.warn, - "ongoing tx of pid {} has tx_seq {} while {} expected", - tx.pid, - tx.tx_seq, - txseq_it->second.tx_seq); - } - } - - _ongoing_tx_offsets[pid] = std::move(tx); +void group::insert_ongoing_tx( + model::producer_identity pid, ongoing_transaction tx) { + auto [it, inserted] = _producers.try_emplace(pid.get_id(), pid.get_epoch()); + it->second.epoch = pid.get_epoch(); + it->second.transaction = std::make_unique( + std::move(tx)); } ss::future group::commit_tx(cluster::commit_group_tx_request r) { + vlog(_ctx_txlog.trace, "processing commit_tx request: {}", r); if (_partition->term() != _term) { + vlog( + _ctx_txlog.warn, + "commit_tx request: {} failed - leadership_changed, expected term: " + "{}, current_term: {}", + r, + _term, + _partition->term()); co_return make_commit_tx_reply(cluster::tx::errc::stale); } - auto fence_it = _fence_pid_epoch.find(r.pid.get_id()); - if (fence_it == _fence_pid_epoch.end()) { + auto it = _producers.find(r.pid.get_id()); + if (it == _producers.end()) { vlog( _ctx_txlog.warn, - "Can't commit tx: fence with pid {} isn't set", - r.pid); + "commit_tx request: {} failed - producer not found", + r); co_return make_commit_tx_reply(cluster::tx::errc::request_rejected); } - if (r.pid.get_epoch() != fence_it->second) { + auto& producer = it->second; + if (r.pid.get_epoch() != producer.epoch) { vlog( - _ctx_txlog.trace, - "Can't commit tx with pid {} - the fence doesn't match {}", - r.pid, - fence_it->second); + _ctx_txlog.warn, + "commit_tx request: {} failed - fenced, stored producer epoch: {}", + r, + producer.epoch); co_return make_commit_tx_reply(cluster::tx::errc::request_rejected); } - auto txseq_it = _tx_data.find(r.pid.get_id()); - if (txseq_it == _tx_data.end()) { + if (producer.transaction == nullptr) { vlog( _ctx_txlog.trace, - "can't find a tx {}, probably already comitted", - r.pid); + "commit_tx request: {} - can not find ongoing transaction, it was " + "most likely already committed", + r); co_return make_commit_tx_reply(cluster::tx::errc::none); - } else if (txseq_it->second.tx_seq > r.tx_seq) { + } + auto& producer_tx = *producer.transaction; + if (producer_tx.tx_seq > r.tx_seq) { // rare situation: // * tm_stm begins (tx_seq+1) // * request on this group passes but then tm_stm fails and forgets @@ -1728,48 +1736,22 @@ group::commit_tx(cluster::commit_group_tx_request r) { // existence of {pid, tx_seq+1} implies {pid, tx_seq} is committed vlog( _ctx_txlog.trace, - "Already commited pid:{} tx_seq:{} - a higher tx_seq:{} was observed", - r.pid, - r.tx_seq, - txseq_it->second.tx_seq); - co_return make_commit_tx_reply(cluster::tx::errc::none); - } else if (txseq_it->second.tx_seq != r.tx_seq) { - vlog( - _ctx_txlog.warn, - "Can't commit pid {}: passed txseq {} doesn't match ongoing {}", + "Already commited pid: {} tx_seq: {} - a higher tx_seq: {} was " + "observed", r.pid, r.tx_seq, - txseq_it->second.tx_seq); - co_return make_commit_tx_reply(cluster::tx::errc::request_rejected); - } - - auto ongoing_it = _ongoing_tx_offsets.find(r.pid); - if (ongoing_it == _ongoing_tx_offsets.end()) { - vlog( - _ctx_txlog.trace, - "can't find a tx {}, probably already comitted", - r.pid); + producer_tx.tx_seq); co_return make_commit_tx_reply(cluster::tx::errc::none); } - if (ongoing_it->second.tx_seq > r.tx_seq) { - // rare situation: - // * tm_stm prepares (tx_seq+1) - // * prepare on this group passed but tm_stm failed to write to disk - // * during recovery tm_stm recommits (tx_seq) - // existence of {pid, tx_seq+1} implies {pid, tx_seq} is committed + if (producer_tx.tx_seq != r.tx_seq) { vlog( - _ctx_txlog.trace, - "prepare for pid:{} has higher tx_seq:{} than given: {} => replaying " - "already comitted commit", - r.pid, - ongoing_it->second.tx_seq, - r.tx_seq); - co_return make_commit_tx_reply(cluster::tx::errc::none); - } else if (ongoing_it->second.tx_seq < r.tx_seq) { + _ctx_txlog.warn, + "commit_tx request: {} failed - tx_seq mismatch. Expected seq: {}", + r, + producer_tx.tx_seq); co_return make_commit_tx_reply(cluster::tx::errc::request_rejected); } - // we commit only if a provided tx_seq matches prepared tx_seq co_return co_await do_commit(r.group_id, r.pid); } @@ -1793,86 +1775,80 @@ cluster::abort_group_tx_reply make_abort_tx_reply(cluster::tx::errc ec) { ss::future group::begin_tx(cluster::begin_group_tx_request r) { + vlog(_ctx_txlog.trace, "processing begin tx request: {}", r); if (_partition->term() != _term) { vlog( - _ctx_txlog.trace, - "processing name:begin_tx pid:{} tx_seq:{} timeout:{} coordinator:{} " - "=> stale leader", - r.pid, - r.tx_seq, - r.timeout, - r.tm_partition); + _ctx_txlog.debug, + "begin tx request {} failed - leadership changed. Expected term: {}, " + "current term: {}", + r, + _term, + _partition->term()); co_return make_begin_tx_reply(cluster::tx::errc::stale); } - vlog( - _ctx_txlog.trace, - "processing name:begin_tx pid:{} tx_seq:{} timeout:{} coordinator:{} in " - "term:{}", - r.pid, - r.tx_seq, - r.timeout, - r.tm_partition, - _term); - auto fence_it = _fence_pid_epoch.find(r.pid.get_id()); - if (fence_it == _fence_pid_epoch.end()) { - // intentionally empty - } else if (r.pid.get_epoch() < fence_it->second) { - vlog( - _ctx_txlog.error, - "pid {} fenced out by epoch {}", - r.pid, - fence_it->second); - co_return make_begin_tx_reply(cluster::tx::errc::fenced); - } else if (r.pid.get_epoch() > fence_it->second) { - // there is a fence, it might be that tm_stm failed, forget about - // an ongoing transaction, assigned next pid for the same tx.id and - // started a new transaction without aborting the previous one. - // - // at the same time it's possible that it already aborted the old - // tx before starting this. do_abort_tx is idempotent so calling it - // just in case to proactively abort the tx instead of waiting for - // the timeout - - auto old_pid = model::producer_identity{ - r.pid.get_id(), fence_it->second}; - auto ar = co_await do_try_abort_old_tx(old_pid); - if (ar != cluster::tx::errc::none) { - vlog( - _ctx_txlog.trace, - "can't begin tx {} because abort of a prev tx {} failed with {}; " - "retrying", - r.pid, - old_pid, - ar); - co_return make_begin_tx_reply(cluster::tx::errc::stale); - } - } + auto it = _producers.find(r.pid.get_id()); - auto txseq_it = _tx_data.find(r.pid.get_id()); - if (txseq_it != _tx_data.end()) { - if (r.tx_seq != txseq_it->second.tx_seq) { + if (it != _producers.end()) { + auto& producer = it->second; + if (r.pid.get_epoch() < producer.epoch) { vlog( _ctx_txlog.warn, - "can't begin a tx {} with tx_seq {}: a producer id is already " - "involved in a tx with tx_seq {}", + "begin tx request failed. Producer {} epoch is lower than " + "current fence epoch: {}", r.pid, - r.tx_seq, - txseq_it->second.tx_seq); - co_return make_begin_tx_reply( - cluster::tx::errc::unknown_server_error); + producer.epoch); + co_return make_begin_tx_reply(cluster::tx::errc::fenced); + } else if (r.pid.get_epoch() > producer.epoch) { + // there is a fence, it might be that tm_stm failed, forget about + // an ongoing transaction, assigned next pid for the same tx.id and + // started a new transaction without aborting the previous one. + // + // at the same time it's possible that it already aborted the old + // tx before starting this. do_abort_tx is idempotent so calling it + // just in case to proactively abort the tx instead of waiting for + // the timeout + + auto old_pid = model::producer_identity{ + r.pid.get_id(), producer.epoch}; + auto ar = co_await do_try_abort_old_tx(old_pid); + if (ar != cluster::tx::errc::none) { + vlog( + _ctx_txlog.warn, + "begin tx request {} failed, can not abort old transaction: " + "{} - {}", + r, + old_pid, + ar); + co_return make_begin_tx_reply(cluster::tx::errc::stale); + } } - if (_ongoing_tx_offsets.contains(r.pid)) { - vlog( - _ctx_txlog.warn, - "can't begin a tx {} with tx_seq {}: it was already begun and it " - "accepted writes", - r.pid, - r.tx_seq); - co_return make_begin_tx_reply( - cluster::tx::errc::unknown_server_error); + if (producer.transaction) { + auto& producer_tx = *producer.transaction; + if (r.tx_seq != producer_tx.tx_seq) { + vlog( + _ctx_txlog.warn, + "begin tx request {} failed - produced has already ongoing " + "transaction with different sequence number: {}", + r, + producer_tx.tx_seq); + co_return make_begin_tx_reply( + cluster::tx::errc::unknown_server_error); + } + + if (!producer_tx.offsets.empty()) { + vlog( + _ctx_txlog.warn, + "begin tx request {} failed - transaction is already ongoing " + "and accepted offset commits", + r); + co_return make_begin_tx_reply( + cluster::tx::errc::unknown_server_error); + } + // begin_tx request is idempotent, return success + co_return cluster::begin_group_tx_reply( + _term, cluster::tx::errc::none); } - co_return cluster::begin_group_tx_reply(_term, cluster::tx::errc::none); } group_tx::fence_metadata fence{ @@ -1880,6 +1856,7 @@ group::begin_tx(cluster::begin_group_tx_request r) { .tx_seq = r.tx_seq, .transaction_timeout_ms = r.timeout, .tm_partition = r.tm_partition}; + // replicate fence batch - this is a transaction boundary model::record_batch batch = make_tx_fence_batch(r.pid, std::move(fence)); auto reader = model::make_memory_record_batch_reader(std::move(batch)); auto res = co_await _partition->raft()->replicate( @@ -1890,9 +1867,9 @@ group::begin_tx(cluster::begin_group_tx_request r) { if (!res) { vlog( _ctx_txlog.warn, - "Error \"{}\" on replicating pid:{} fencing batch", - res.error(), - r.pid); + "begin tx request {} failed - error replicating fencing batch - {}", + r, + res.error().message()); if ( _partition->raft()->is_leader() && _partition->raft()->term() == _term) { @@ -1900,69 +1877,60 @@ group::begin_tx(cluster::begin_group_tx_request r) { } co_return make_begin_tx_reply(cluster::tx::errc::leader_not_found); } + auto [producer_it, _] = _producers.try_emplace( + r.pid.get_id(), r.pid.get_epoch()); + producer_it->second.epoch = r.pid.get_epoch(); + producer_it->second.transaction = std::make_unique( + ongoing_transaction(r.tx_seq, r.tm_partition, r.timeout)); - _fence_pid_epoch[r.pid.get_id()] = r.pid.get_epoch(); - _tx_data[r.pid.get_id()] = tx_data{r.tx_seq, r.tm_partition}; + try_arm(producer_it->second.transaction->deadline()); - auto [it, _] = _expiration_info.insert_or_assign( - r.pid, expiration_info(r.timeout)); - try_arm(it->second.deadline()); - - cluster::begin_group_tx_reply reply; - reply.etag = _term; - reply.ec = cluster::tx::errc::none; - co_return reply; -} - -cluster::abort_origin group::get_abort_origin( - const model::producer_identity& pid, model::tx_seq tx_seq) const { - auto it = _tx_data.find(pid.get_id()); - if (it != _tx_data.end()) { - if (tx_seq < it->second.tx_seq) { - return cluster::abort_origin::past; - } - if (it->second.tx_seq < tx_seq) { - return cluster::abort_origin::future; - } - } - - return cluster::abort_origin::present; + co_return cluster::begin_group_tx_reply(_term, cluster::tx::errc::none); } ss::future group::abort_tx(cluster::abort_group_tx_request r) { // doesn't make sense to fence off an abort because transaction // manager has already decided to abort and acked to a client - + vlog(_ctxlog.trace, "processing abort_tx request: {}", r); if (_partition->term() != _term) { + vlog( + _ctxlog.debug, + "abort_tx request: {} failed - leadership changed, expected term: " + "{}, current term: {}", + r, + _term, + _partition->term()); co_return make_abort_tx_reply(cluster::tx::errc::stale); } - auto fence_it = _fence_pid_epoch.find(r.pid.get_id()); - if (fence_it == _fence_pid_epoch.end()) { + auto it = _producers.find(r.pid.get_id()); + if (it == _producers.end()) { vlog( _ctx_txlog.warn, - "Can't abort tx: fence with pid {} isn't set", - r.pid); + "abort_tx request: {} failed - producer not found", + r); co_return make_abort_tx_reply(cluster::tx::errc::request_rejected); } - if (r.pid.get_epoch() != fence_it->second) { + auto& producer = it->second; + if (r.pid.get_epoch() != producer.epoch) { vlog( - _ctx_txlog.trace, - "Can't abort tx with pid {} - the fence doesn't match {}", + _ctx_txlog.warn, + "abort_tx request: {} failed - fence epoch mismatch. Fence epoch: {}", r.pid, - fence_it->second); + producer.epoch); co_return make_abort_tx_reply(cluster::tx::errc::request_rejected); } - auto txseq_it = _tx_data.find(r.pid.get_id()); - if (txseq_it == _tx_data.end()) { + if (producer.transaction == nullptr) { vlog( _ctx_txlog.trace, - "can't find a tx {}, probably already aborted", + "unable to find transaction for {}, probably already aborted", r.pid); co_return make_abort_tx_reply(cluster::tx::errc::none); - } else if (txseq_it->second.tx_seq > r.tx_seq) { + } + auto& producer_tx = *producer.transaction; + if (producer_tx.tx_seq > r.tx_seq) { // rare situation: // * tm_stm begins (tx_seq+1) // * request on this group passes but then tm_stm fails and forgets @@ -1971,45 +1939,21 @@ group::abort_tx(cluster::abort_group_tx_request r) { // existence of {pid, tx_seq+1} implies {pid, tx_seq} is aborted vlog( _ctx_txlog.trace, - "Already aborted pid:{} tx_seq:{} - a higher tx_seq:{} was observed", + "producer transaction {} already aborted, ongoing tx sequence: {}, " + "request tx sequence: {}", r.pid, - r.tx_seq, - txseq_it->second.tx_seq); + producer_tx.tx_seq, + r.tx_seq); co_return make_abort_tx_reply(cluster::tx::errc::none); - } else if (txseq_it->second.tx_seq != r.tx_seq) { - vlog( - _ctx_txlog.warn, - "Can't abort pid {}: passed txseq {} doesn't match ongoing {}", - r.pid, - r.tx_seq, - txseq_it->second.tx_seq); - co_return make_abort_tx_reply(cluster::tx::errc::request_rejected); } - auto origin = get_abort_origin(r.pid, r.tx_seq); - if (origin == cluster::abort_origin::past) { - // rejecting a delayed abort command to prevent aborting - // a wrong transaction - auto it = _expiration_info.find(r.pid); - if (it != _expiration_info.end()) { - it->second.is_expiration_requested = true; - } else { - vlog( - _ctx_txlog.error, - "pid({}) should be inside _expiration_info", - r.pid); - } - co_return make_abort_tx_reply(cluster::tx::errc::request_rejected); - } - if (origin == cluster::abort_origin::future) { - // impossible situation: before transactional coordinator may issue - // abort of the current transaction it should begin it and abort all - // previous transactions with the same pid + if (producer_tx.tx_seq != r.tx_seq) { vlog( - _ctx_txlog.error, - "Rejecting abort (pid:{}, tx_seq: {}) because it isn't consistent " - "with the current ongoing transaction", + _ctx_txlog.warn, + "abort_tx request: {} failed - tx sequence mismatch. Ongoing tx " + "sequence: {}, request tx sequence: {}", r.pid, + producer_tx.tx_seq, r.tx_seq); co_return make_abort_tx_reply(cluster::tx::errc::request_rejected); } @@ -2034,8 +1978,8 @@ group::store_txn_offsets(txn_offset_commit_request r) { model::producer_identity pid{r.data.producer_id, r.data.producer_epoch}; // checking fencing - auto fence_it = _fence_pid_epoch.find(pid.get_id()); - if (fence_it == _fence_pid_epoch.end()) { + auto it = _producers.find(pid.get_id()); + if (it == _producers.end()) { vlog( _ctx_txlog.warn, "Can't store txn offsets: fence with pid {} isn't set", @@ -2043,18 +1987,18 @@ group::store_txn_offsets(txn_offset_commit_request r) { co_return txn_offset_commit_response( r, error_code::invalid_producer_epoch); } - if (r.data.producer_epoch != fence_it->second) { + auto& producer = it->second; + if (r.data.producer_epoch != producer.epoch) { vlog( _ctx_txlog.trace, "Can't store txn offsets with pid {} - the fence doesn't match {}", pid, - fence_it->second); + producer.epoch); co_return txn_offset_commit_response( r, error_code::invalid_producer_epoch); } - auto txseq_it = _tx_data.find(pid.get_id()); - if (txseq_it == _tx_data.end()) { + if (producer.transaction == nullptr) { vlog( _ctx_txlog.warn, "Can't store txn offsets: current tx with pid {} isn't ongoing", @@ -2062,42 +2006,28 @@ group::store_txn_offsets(txn_offset_commit_request r) { co_return txn_offset_commit_response( r, error_code::invalid_producer_epoch); } - auto tx_seq = txseq_it->second.tx_seq; - absl::node_hash_map - offsets; + auto& producer_tx = *producer.transaction; - auto ongoing_it = _ongoing_tx_offsets.find(pid); - if (ongoing_it != _ongoing_tx_offsets.end()) { - for (const auto& [tp, offset] : ongoing_it->second.offsets) { - group_tx::partition_offset md{ - .tp = tp, - .offset = offset.offset, - .leader_epoch = offset.committed_leader_epoch, - .metadata = offset.metadata}; - offsets[tp] = md; - } - } + chunked_vector offsets; - for (const auto& t : r.data.topics) { + for (auto& t : r.data.topics) { for (const auto& p : t.partitions) { - model::topic_partition tp(t.name, p.partition_index); - group_tx::partition_offset md{ - .tp = tp, + offsets.push_back(group_tx::partition_offset{ + .tp = model::topic_partition(t.name, p.partition_index), .offset = p.committed_offset, .leader_epoch = p.committed_leader_epoch, - .metadata = p.committed_metadata}; - offsets[tp] = md; + .metadata = p.committed_metadata, + }); } } - auto tx_entry = group_tx::offsets_metadata{ - .group_id = r.data.group_id, .pid = pid, .tx_seq = tx_seq}; - tx_entry.offsets.reserve(offsets.size()); - - for (const auto& [tp, offset] : offsets) { - tx_entry.offsets.push_back(offset); - } + group_tx::offsets_metadata tx_entry{ + .group_id = r.data.group_id, + .pid = pid, + .tx_seq = producer_tx.tx_seq, + .offsets = {offsets.begin(), offsets.end()}, + }; auto batch = make_tx_batch( model::record_batch_type::group_prepare_tx, @@ -2106,12 +2036,12 @@ group::store_txn_offsets(txn_offset_commit_request r) { std::move(tx_entry)); auto reader = model::make_memory_record_batch_reader(std::move(batch)); - auto e = co_await _partition->raft()->replicate( + auto result = co_await _partition->raft()->replicate( _term, std::move(reader), raft::replicate_options(raft::consistency_level::quorum_ack)); - if (!e) { + if (!result) { if ( _partition->raft()->is_leader() && _partition->raft()->term() == _term) { @@ -2122,29 +2052,23 @@ group::store_txn_offsets(txn_offset_commit_request r) { r, error_code::unknown_server_error); } - ongoing_tx_offsets ptx; - ptx.tx_seq = tx_seq; - ptx.pid = pid; - const auto now = model::timestamp::now(); - for (const auto& [tp, offset] : offsets) { - offset_metadata md{ - .log_offset = e.value().last_offset, - .offset = offset.offset, - .metadata = offset.metadata.value_or(""), - .committed_leader_epoch = kafka::leader_epoch(offset.leader_epoch), - .commit_timestamp = now, - .expiry_timestamp = std::nullopt, - }; - ptx.offsets[tp] = md; + it = _producers.find(pid.get_id()); + if (it == _producers.end() || it->second.transaction == nullptr) { + vlog( + _ctx_txlog.warn, + "Can't store txn offsets: current tx with pid {} isn't ongoing", + pid); + co_return txn_offset_commit_response( + r, error_code::invalid_producer_epoch); } - _ongoing_tx_offsets[pid] = ptx; - - auto it = _expiration_info.find(pid); - if (it != _expiration_info.end()) { - it->second.update_last_update_time(); - } else { - vlog(_ctx_txlog.warn, "pid {} should be in _expiration_info", pid); + auto& ongoing_tx = *it->second.transaction; + for (auto& o : offsets) { + ongoing_tx.offsets[o.tp] = pending_tx_offset{ + .offset_metadata = o, + .log_offset = result.value().last_offset, + }; } + ongoing_tx.update_last_update_time(); co_return txn_offset_commit_response(r, error_code::none); } @@ -2681,7 +2605,7 @@ ss::future<> group::remove_topic_partitions( _pending_offset_commits.erase(tp); if (auto offset = _offsets.extract(tp); offset) { removed.emplace_back( - std::move(offset.key()), std::move(offset.mapped()->metadata)); + std::move(offset->first), std::move(offset->second->metadata)); } } @@ -2918,23 +2842,25 @@ ss::future group::do_abort( } co_return make_abort_tx_reply(cluster::tx::errc::timeout); } - - _ongoing_tx_offsets.erase(pid); - _tx_data.erase(pid.get_id()); - _expiration_info.erase(pid); - + auto it = _producers.find(pid.get_id()); + if (it != _producers.end()) { + it->second.transaction.reset(); + } co_return make_abort_tx_reply(cluster::tx::errc::none); } ss::future group::do_commit(kafka::group_id group_id, model::producer_identity pid) { - auto ongoing_it = _ongoing_tx_offsets.find(pid); - if (ongoing_it == _ongoing_tx_offsets.end()) { + auto it = _producers.find(pid.get_id()); + if (it == _producers.end() || it->second.transaction == nullptr) { // Impossible situation - vlog(_ctx_txlog.error, "Can not find prepared tx for pid: {}", pid); + vlog( + _ctx_txlog.error, + "Unable to find an ongoing transaction for producer: {}", + pid); co_return make_commit_tx_reply(cluster::tx::errc::unknown_server_error); } - + auto& ongoing_tx = *it->second.transaction; // It is fix for https://github.com/redpanda-data/redpanda/issues/5163. // Problem is group_*_tx contains only producer_id in key, so compaction // save only last records for this events. We need only save in logs @@ -2946,21 +2872,22 @@ group::do_commit(kafka::group_id group_id, model::producer_identity pid) { // problem, because client got ok for commit_request (see // tx_gateway_frontend). So redpanda will eventually finish commit and // complete write for both this events. + model::record_batch_reader::data_t batches; batches.reserve(2); cluster::simple_batch_builder store_offset_builder( model::record_batch_type::raft_data, model::offset(0)); - for (const auto& [tp, metadata] : ongoing_it->second.offsets) { + for (const auto& [tp, pending_offset] : ongoing_tx.offsets) { update_store_offset_builder( store_offset_builder, tp.topic, tp.partition, - metadata.offset, - metadata.committed_leader_epoch, - metadata.metadata, - metadata.commit_timestamp, - metadata.expiry_timestamp); + pending_offset.offset_metadata.offset, + kafka::leader_epoch(pending_offset.offset_metadata.leader_epoch), + pending_offset.offset_metadata.metadata.value_or(""), + model::timestamp::now(), + std::nullopt); } batches.push_back(std::move(store_offset_builder).build()); @@ -2977,17 +2904,17 @@ group::do_commit(kafka::group_id group_id, model::producer_identity pid) { auto reader = model::make_memory_record_batch_reader(std::move(batches)); - auto e = co_await _partition->raft()->replicate( + auto result = co_await _partition->raft()->replicate( _term, std::move(reader), raft::replicate_options(raft::consistency_level::quorum_ack)); - if (!e) { + if (!result) { vlog( _ctx_txlog.warn, - "Error \"{}\" on replicating pid:{} commit batch", - e.error(), - pid); + "error replicating transaction commit batch for pid: {} - {}", + pid, + result.error().message()); if ( _partition->raft()->is_leader() && _partition->raft()->term() == _term) { @@ -2996,22 +2923,29 @@ group::do_commit(kafka::group_id group_id, model::producer_identity pid) { co_return make_commit_tx_reply(cluster::tx::errc::timeout); } - ongoing_it = _ongoing_tx_offsets.find(pid); - if (ongoing_it == _ongoing_tx_offsets.end()) { + it = _producers.find(pid.get_id()); + if (it == _producers.end() || it->second.transaction == nullptr) { vlog( _ctx_txlog.error, - "can't find already observed prepared tx pid:{}", + "unable to find ongoing transaction for producer: {}", pid); co_return make_commit_tx_reply(cluster::tx::errc::unknown_server_error); } - for (const auto& [tp, md] : ongoing_it->second.offsets) { - try_upsert_offset(tp, md); + for (const auto& [tp, md] : it->second.transaction->offsets) { + try_upsert_offset( + tp, + offset_metadata{ + .log_offset = md.log_offset, + .offset = md.offset_metadata.offset, + .metadata = md.offset_metadata.metadata.value_or(""), + .committed_leader_epoch = kafka::leader_epoch( + md.offset_metadata.leader_epoch), + .commit_timestamp = model::timestamp::now(), + }); } - _ongoing_tx_offsets.erase(ongoing_it); - _tx_data.erase(pid.get_id()); - _expiration_info.erase(pid); + it->second.transaction.reset(); co_return make_commit_tx_reply(cluster::tx::errc::none); } @@ -3025,8 +2959,11 @@ void group::abort_old_txes() { void group::maybe_rearm_timer() { std::optional earliest_deadline; - for (auto& [pid, expiration] : _expiration_info) { - auto candidate = expiration.deadline(); + for (auto& [pid, producer] : _producers) { + if (producer.transaction == nullptr) { + continue; + } + auto candidate = producer.transaction->deadline(); if (earliest_deadline) { earliest_deadline = std::min(earliest_deadline.value(), candidate); } else { @@ -3047,27 +2984,16 @@ ss::future<> group::do_abort_old_txes() { co_return; } - std::vector pids; - for (auto& [id, _] : _ongoing_tx_offsets) { - pids.push_back(id); - } + absl::btree_set expired; - for (auto& [id, _] : _tx_data) { - auto it = _fence_pid_epoch.find(id); - if (it != _fence_pid_epoch.end()) { - pids.emplace_back(id(), it->second); + for (auto& [pid, producer] : _producers) { + if ( + producer.transaction == nullptr + || !producer.transaction->is_expired()) { + continue; } - } - absl::btree_set expired; - for (auto pid : pids) { - auto expiration_it = _expiration_info.find(pid); - if (expiration_it != _expiration_info.end()) { - if (!expiration_it->second.is_expired()) { - continue; - } - } - expired.insert(pid); + expired.insert(model::producer_identity{pid, producer.epoch}); } for (auto pid : expired) { @@ -3079,15 +3005,10 @@ ss::future<> group::do_abort_old_txes() { ss::future<> group::try_abort_old_tx(model::producer_identity pid) { return get_tx_lock(pid.get_id())->with([this, pid]() { - vlog(_ctx_txlog.trace, "attempting to expire pid:{}", pid); - - auto expiration_it = _expiration_info.find(pid); - if (expiration_it != _expiration_info.end()) { - if (!expiration_it->second.is_expired()) { - vlog(_ctx_txlog.trace, "pid:{} isn't expired, skipping", pid); - return ss::now(); - } - } + vlog( + _ctx_txlog.info, + "attempting expiration of producer: {} transaction", + pid); return do_try_abort_old_tx(pid).discard_result(); }); @@ -3095,71 +3016,63 @@ ss::future<> group::try_abort_old_tx(model::producer_identity pid) { ss::future group::do_try_abort_old_tx(model::producer_identity pid) { - vlog(_ctx_txlog.trace, "aborting pid:{}", pid); - - auto p_it = _ongoing_tx_offsets.find(pid); - if (p_it != _ongoing_tx_offsets.end()) { - auto tx_seq = p_it->second.tx_seq; - auto tx_data = _tx_data.find(pid.get_id()); - model::partition_id tm = model::legacy_tm_ntp.tp.partition; - if (tx_data != _tx_data.end()) { - tm = tx_data->second.tm_partition; - } else { + vlog(_ctx_txlog.trace, "aborting producer {} transaction", pid); + + auto it = _producers.find(pid.get_id()); + if (it == _producers.end() || it->second.transaction == nullptr) { + co_return cluster::tx::errc::none; + } + auto& producer_tx = *it->second.transaction; + + vlog( + _ctx_txlog.trace, + "sending abort tx request for producer {} with tx_seq: {} to " + "coordinator partition: {}", + pid, + producer_tx.tx_seq, + producer_tx.coordinator_partition); + auto r = co_await _tx_frontend.local().route_globally( + cluster::try_abort_request( + producer_tx.coordinator_partition, + pid, + producer_tx.tx_seq, + config::shard_local_cfg().rm_sync_timeout_ms.value())); + + if (r.ec != cluster::tx::errc::none) { + co_return r.ec; + } + vlog( + _ctx_txlog.trace, + "producer id {} abort request result: [committed: {}, aborted: {}]", + pid, + r.commited, + r.aborted); + + if (r.commited) { + auto res = co_await do_commit(_id, pid); + if (res.ec != cluster::tx::errc::none) { vlog( - _ctxlog.error, - "pid {} doesn't exist in current transactions data", - pid); - } - auto r = co_await _tx_frontend.local().route_globally( - cluster::try_abort_request( - tm, - pid, - tx_seq, - config::shard_local_cfg().rm_sync_timeout_ms.value())); - if (r.ec != cluster::tx::errc::none) { - co_return r.ec; - } - if (r.commited) { - auto res = co_await do_commit(_id, pid); - if (res.ec != cluster::tx::errc::none) { - vlog( - _ctxlog.warn, - "commit of prepared tx pid:{} failed with ec:{}", - pid, - res.ec); - } - co_return res.ec; - } else if (r.aborted) { - auto res = co_await do_abort(_id, pid, tx_seq); - if (res.ec != cluster::tx::errc::none) { - vlog( - _ctxlog.warn, - "abort of prepared tx pid:{} failed with ec:{}", - pid, - res.ec); - } - co_return res.ec; + _ctxlog.warn, + "committing producer {} transaction failed - {}", + pid, + res.ec); } + co_return res.ec; + } - co_return cluster::tx::errc::stale; - } else { - auto txseq_it = _tx_data.find(pid.get_id()); - if (txseq_it == _tx_data.end()) { - vlog(_ctx_txlog.trace, "skipping pid:{} (can't find tx_seq)", pid); - co_return cluster::tx::errc::none; - } - model::tx_seq tx_seq = txseq_it->second.tx_seq; - auto res = co_await do_abort(_id, pid, tx_seq); + if (r.aborted) { + auto res = co_await do_abort(_id, pid, producer_tx.tx_seq); if (res.ec != cluster::tx::errc::none) { vlog( _ctxlog.warn, - "abort of pid:{} tx_seq:{} failed with {}", + "aborting producer {} transaction failed - {}", pid, - tx_seq, res.ec); } co_return res.ec; } + + co_return cluster::tx::errc::stale; } void group::try_arm(time_point_type deadline) { @@ -3175,7 +3088,8 @@ void group::try_arm(time_point_type deadline) { std::ostream& operator<<(std::ostream& o, const group::offset_metadata& md) { fmt::print( o, - "{{log_offset:{}, offset:{}, metadata:{}, committed_leader_epoch:{}}}", + "{{log_offset:{}, offset:{}, metadata:{}, " + "committed_leader_epoch:{}}}", md.log_offset, md.offset, md.metadata, @@ -3411,7 +3325,12 @@ group::get_expired_offsets(std::chrono::seconds retention_period) { bool group::has_offsets() const { return !_offsets.empty() || !_pending_offset_commits.empty() - || !_tx_data.empty(); + || std::any_of( + _producers.begin(), + _producers.end(), + [](const producers_map::value_type& p) { + return p.second.transaction != nullptr; + }); } std::vector diff --git a/src/v/kafka/server/group.h b/src/v/kafka/server/group.h index 6b16c6f5f5fc0..3627e3dbf6526 100644 --- a/src/v/kafka/server/group.h +++ b/src/v/kafka/server/group.h @@ -15,6 +15,7 @@ #include "cluster/simple_batch_builder.h" #include "cluster/tx_protocol_types.h" #include "cluster/tx_utils.h" +#include "container/chunked_hash_map.h" #include "container/fragmented_vector.h" #include "features/feature_table.h" #include "kafka/group_probe.h" @@ -149,10 +150,51 @@ class group final : public ss::enable_lw_shared_from_this { using offset_commit_stages = stages; using join_group_stages = stages; using sync_group_stages = stages; + /** + * represents an offset that is to be stored as a part of transaction + */ + struct pending_tx_offset { + group_tx::partition_offset offset_metadata; + model::offset log_offset; + }; + /** + * In memory representation of active transaction. The transaction is added + * when a state machine executes begin transaction request. The transaction + * is removed when the state machine executes commit or abort transaction + * request. The transaction holds all pending offset commits. + */ + struct ongoing_transaction { + ongoing_transaction( + model::tx_seq, model::partition_id, model::timeout_clock::duration); - struct tx_data { model::tx_seq tx_seq; - model::partition_id tm_partition; + model::partition_id coordinator_partition; + + model::timeout_clock::duration timeout; + model::timeout_clock::time_point last_update; + + bool is_expiration_requested{false}; + + model::timeout_clock::time_point deadline() const { + return last_update + timeout; + } + + bool is_expired() const { + return is_expiration_requested || deadline() <= clock_type::now(); + } + + void update_last_update_time() { + last_update = model::timeout_clock::now(); + } + + chunked_hash_map offsets; + }; + + struct tx_producer { + explicit tx_producer(model::producer_epoch); + + model::producer_epoch epoch; + std::unique_ptr transaction; }; struct offset_metadata { @@ -591,48 +633,13 @@ class group final : public ss::enable_lw_shared_from_this { } } - void insert_ongoing_tx_offsets(ongoing_tx_offsets); - + void + insert_ongoing_tx(model::producer_identity pid, ongoing_transaction tx); void try_set_fence(model::producer_id id, model::producer_epoch epoch) { - auto [fence_it, _] = _fence_pid_epoch.try_emplace(id, epoch); - if (fence_it->second <= epoch) { - fence_it->second = epoch; - } - } - - void try_set_tx_data( - model::producer_identity id, - model::tx_seq txseq, - model::partition_id tm_partition) { - auto fence_it = _fence_pid_epoch.find(id.get_id()); - if (fence_it == _fence_pid_epoch.end()) { - return; - } - if (fence_it->second != id.get_epoch()) { - return; - } - auto [ongoing_it, _] = _tx_data.try_emplace( - id.get_id(), tx_data{txseq, tm_partition}); - if (ongoing_it->second.tx_seq <= txseq) { - ongoing_it->second.tx_seq = txseq; - ongoing_it->second.tm_partition = tm_partition; - } - } - - void try_set_timeout( - model::producer_identity id, - model::timeout_clock::duration transaction_timeout_ms) { - auto fence_it = _fence_pid_epoch.find(id.get_id()); - if (fence_it == _fence_pid_epoch.end()) { - return; - } - if (fence_it->second != id.get_epoch()) { - return; - } - auto [info_it, inserted] = _expiration_info.try_emplace( - id, expiration_info(transaction_timeout_ms)); - if (inserted) { - try_arm(info_it->second.deadline()); + auto [it, _] = _producers.try_emplace(id, epoch); + if (it->second.epoch < epoch) { + it->second.epoch = epoch; + it->second.transaction.reset(); } } @@ -692,6 +699,7 @@ class group final : public ss::enable_lw_shared_from_this { private: using member_map = absl::node_hash_map; using protocol_support = absl::node_hash_map; + using producers_map = chunked_hash_map; friend std::ostream& operator<<(std::ostream&, const group&); @@ -814,9 +822,6 @@ class group final : public ss::enable_lw_shared_from_this { error_code validate_expected_group(const txn_offset_commit_request& r) const; - cluster::abort_origin - get_abort_origin(const model::producer_identity&, model::tx_seq) const; - bool has_offsets() const; bool has_pending_transaction(const model::topic_partition& tp) { @@ -828,10 +833,9 @@ class group final : public ss::enable_lw_shared_from_this { } if (std::any_of( - _ongoing_tx_offsets.begin(), - _ongoing_tx_offsets.end(), - [&tp](const auto& tp_info) { - return tp_info.second.offsets.contains(tp); + _producers.begin(), _producers.end(), [&tp](const auto& p) { + return p.second.transaction + && p.second.transaction->offsets.contains(tp); })) { return true; } @@ -886,7 +890,7 @@ class group final : public ss::enable_lw_shared_from_this { kafka::generation_id _generation; protocol_support _supported_protocols; member_map _members; - absl::node_hash_map _static_members; + chunked_hash_map _static_members; int _num_members_joining; absl::node_hash_map> _pending_members; @@ -898,7 +902,7 @@ class group final : public ss::enable_lw_shared_from_this { config::configuration& _conf; ss::lw_shared_ptr _catchup_lock; ss::lw_shared_ptr _partition; - absl::node_hash_map< + chunked_hash_map< model::topic_partition, std::unique_ptr> _offsets; @@ -919,41 +923,11 @@ class group final : public ss::enable_lw_shared_from_this { absl::flat_hash_map> _tx_locks; model::term_id _term; - absl::node_hash_map - _fence_pid_epoch; - absl::node_hash_map _tx_data; - absl::node_hash_map + producers_map _producers; + chunked_hash_map _pending_offset_commits; enable_group_metrics _enable_group_metrics; - absl::node_hash_map - _ongoing_tx_offsets; - - struct expiration_info { - expiration_info(model::timeout_clock::duration timeout) - : timeout(timeout) - , last_update(model::timeout_clock::now()) {} - - model::timeout_clock::duration timeout; - model::timeout_clock::time_point last_update; - bool is_expiration_requested{false}; - - model::timeout_clock::time_point deadline() const { - return last_update + timeout; - } - - bool is_expired() const { - return is_expiration_requested || deadline() <= clock_type::now(); - } - - void update_last_update_time() { - last_update = model::timeout_clock::now(); - } - }; - - absl::node_hash_map - _expiration_info; - ss::gate _gate; ss::timer _auto_abort_timer; std::chrono::milliseconds _abort_interval_ms; diff --git a/src/v/kafka/server/group_manager.cc b/src/v/kafka/server/group_manager.cc index 9aefaab8b3045..d9955abae413d 100644 --- a/src/v/kafka/server/group_manager.cc +++ b/src/v/kafka/server/group_manager.cc @@ -972,18 +972,27 @@ ss::future<> group_manager::do_recover_group( .non_reclaimable = meta.metadata.non_reclaimable, }); } + for (auto& [id, session] : group_stm.producers()) { + group->try_set_fence(id, session.epoch); + if (session.tx) { + auto& tx = *session.tx; + group::ongoing_transaction group_tx( + tx.tx_seq, tx.tm_partition, tx.timeout); + for (auto& [tp, o_md] : tx.offsets) { + group_tx.offsets[tp] = group::pending_tx_offset{ + .offset_metadata = group_tx::partition_offset{ + .tp = tp, + .offset = o_md.offset, + .leader_epoch = o_md.committed_leader_epoch, + .metadata = o_md.metadata, + }, + .log_offset = o_md.log_offset}; + } - for (const auto& [_, tx] : group_stm.ongoing_tx_offsets()) { - group->insert_ongoing_tx_offsets(tx); - } - for (auto& [id, epoch] : group_stm.fences()) { - group->try_set_fence(id, epoch); - } - for (auto& [id, tx_data] : group_stm.tx_data()) { - group->try_set_tx_data(id, tx_data.tx_seq, tx_data.tm_partition); - } - for (auto& [id, timeout] : group_stm.timeouts()) { - group->try_set_timeout(id, timeout); + group->insert_ongoing_tx( + model::producer_identity(id, session.epoch), + std::move(group_tx)); + } } if (group_stm.is_removed()) { diff --git a/src/v/kafka/server/group_metadata.cc b/src/v/kafka/server/group_metadata.cc index 5b452e3373e7a..51fe5bd6519e0 100644 --- a/src/v/kafka/server/group_metadata.cc +++ b/src/v/kafka/server/group_metadata.cc @@ -442,4 +442,51 @@ std::ostream& operator<<(std::ostream& o, const offset_metadata_value& v) { v.expiry_timestamp); return o; } +namespace group_tx { +std::ostream& operator<<(std::ostream& o, const offsets_metadata& md) { + fmt::print( + o, + "{{group_id: {}, pid: {}, tx_seq: {}, offsets: {}}}", + md.group_id, + md.pid, + md.tx_seq, + fmt::join(md.offsets, ", ")); + return o; +} + +std::ostream& operator<<(std::ostream& o, const partition_offset& po) { + fmt::print( + o, + "{{partition: {}, offset: {}, leader_epoch: {}, metadata: {}}}", + po.tp, + po.offset, + po.leader_epoch, + po.metadata); + return o; +} +std::ostream& operator<<(std::ostream& o, const fence_metadata_v0& fence) { + fmt::print(o, "{{group_id: {}}}", fence.group_id); + return o; +} +std::ostream& operator<<(std::ostream& o, const fence_metadata_v1& fence) { + fmt::print( + o, + "{{group_id: {}, tx_seq: {}, tx_timeout: {} ms}}", + fence.group_id, + fence.tx_seq, + fence.transaction_timeout_ms.count()); + return o; +} + +std::ostream& operator<<(std::ostream& o, const fence_metadata& fence) { + fmt::print( + o, + "{{tm_partition: {}, group_id: {}, tx_seq: {}, tx_timeout: {} ms}}", + fence.tm_partition, + fence.group_id, + fence.tx_seq, + fence.transaction_timeout_ms); + return o; +} +} // namespace group_tx } // namespace kafka diff --git a/src/v/kafka/server/group_metadata.h b/src/v/kafka/server/group_metadata.h index fbb9d465e50f3..13159ec3b42c2 100644 --- a/src/v/kafka/server/group_metadata.h +++ b/src/v/kafka/server/group_metadata.h @@ -228,12 +228,14 @@ using group_metadata_serializer_factory namespace group_tx { struct fence_metadata_v0 { kafka::group_id group_id; + friend std::ostream& operator<<(std::ostream&, const fence_metadata_v0&); }; struct fence_metadata_v1 { kafka::group_id group_id; model::tx_seq tx_seq; model::timeout_clock::duration transaction_timeout_ms; + friend std::ostream& operator<<(std::ostream&, const fence_metadata_v1&); }; /** * Fence is set by the transaction manager when consumer adds an offset to @@ -244,6 +246,7 @@ struct fence_metadata { model::tx_seq tx_seq; model::timeout_clock::duration transaction_timeout_ms; model::partition_id tm_partition; + friend std::ostream& operator<<(std::ostream&, const fence_metadata&); }; /** * Single partition committed offset @@ -253,6 +256,7 @@ struct partition_offset { model::offset offset; int32_t leader_epoch; std::optional metadata; + friend std::ostream& operator<<(std::ostream&, const partition_offset&); }; /** * Consumer offsets commited as a part of transaction @@ -262,6 +266,7 @@ struct offsets_metadata { model::producer_identity pid; model::tx_seq tx_seq; std::vector offsets; + friend std::ostream& operator<<(std::ostream&, const offsets_metadata&); }; /** diff --git a/src/v/kafka/server/group_recovery_consumer.cc b/src/v/kafka/server/group_recovery_consumer.cc index e251bcf0abc25..3fa15b535869e 100644 --- a/src/v/kafka/server/group_recovery_consumer.cc +++ b/src/v/kafka/server/group_recovery_consumer.cc @@ -76,6 +76,7 @@ group_recovery_consumer::operator()(model::record_batch batch) { co_return ss::stop_iteration::yes; } _state.last_read_offset = batch.last_offset(); + if (batch.header().type == model::record_batch_type::raft_data) { _batch_base_offset = batch.base_offset(); co_await model::for_each_record(batch, [this](model::record& r) { @@ -90,14 +91,19 @@ group_recovery_consumer::operator()(model::record_batch batch) { .cmd; auto [group_it, _] = _state.groups.try_emplace(val.group_id); - group_it->second.update_prepared(batch.last_offset(), val); + vlog( + klog.trace, + "[group: {}] recovered update tx offsets: {}", + val.group_id, + val); + group_it->second.update_tx_offset(batch.last_offset(), val); co_return ss::stop_iteration::no; } else if ( batch.header().type == model::record_batch_type::group_commit_tx) { auto cmd = parse_tx_batch( batch, group::commit_tx_record_version); - + vlog(klog.trace, "[group: {}] recovered commit tx", cmd.cmd.group_id); auto [group_it, _] = _state.groups.try_emplace(cmd.cmd.group_id); group_it->second.commit(cmd.pid); @@ -106,6 +112,11 @@ group_recovery_consumer::operator()(model::record_batch batch) { batch.header().type == model::record_batch_type::group_abort_tx) { auto cmd = parse_tx_batch( batch, group::aborted_tx_record_version); + vlog( + klog.trace, + "[group: {}] recovered abort tx_seq: {}", + cmd.cmd.group_id, + cmd.cmd.tx_seq); auto [group_it, _] = _state.groups.try_emplace(cmd.cmd.group_id); group_it->second.abort(cmd.pid, cmd.cmd.tx_seq); @@ -117,6 +128,7 @@ group_recovery_consumer::operator()(model::record_batch batch) { } else if (batch.header().type == model::record_batch_type::version_fence) { auto fence = features::feature_table::decode_version_fence( std::move(batch)); + vlog(klog.trace, "recovered version fence"); if (fence.active_version >= cluster::cluster_version{9}) { _state.has_offset_retention_feature_fence = true; } @@ -159,10 +171,23 @@ void group_recovery_consumer::apply_tx_fence(model::record_batch&& batch) { auto cmd = reflection::adl{}.from( val_reader); auto [group_it, _] = _state.groups.try_emplace(cmd.group_id); + vlog( + klog.trace, + "[group: {}] recovered tx fence version: {} for producer: {}", + cmd.group_id, + fence_version, + bid.pid); group_it->second.try_set_fence(bid.pid.get_id(), bid.pid.get_epoch()); } else if (fence_version == group::fence_control_record_v1_version) { auto cmd = reflection::adl{}.from( val_reader); + vlog( + klog.trace, + "[group: {}] recovered tx fence version: {} for producer: {} - {}", + cmd.group_id, + fence_version, + bid.pid, + cmd); auto [group_it, _] = _state.groups.try_emplace(cmd.group_id); group_it->second.try_set_fence( bid.pid.get_id(), @@ -172,6 +197,13 @@ void group_recovery_consumer::apply_tx_fence(model::record_batch&& batch) { model::partition_id(0)); } else if (fence_version == group::fence_control_record_version) { auto cmd = reflection::adl{}.from(val_reader); + vlog( + klog.trace, + "[group: {}] recovered tx fence version: {} for producer: {} - {}", + cmd.group_id, + fence_version, + bid.pid, + cmd); auto [group_it, _] = _state.groups.try_emplace(cmd.group_id); group_it->second.try_set_fence( bid.pid.get_id(), @@ -214,7 +246,7 @@ void group_recovery_consumer::handle_record(model::record r) { } void group_recovery_consumer::handle_group_metadata(group_metadata_kv md) { - vlog(klog.trace, "Recovering group metadata {}", md.key.group_id); + vlog(klog.trace, "[group: {}] recovered group metadata", md.key.group_id); if (md.value) { // until we switch over to a compacted topic or use raft snapshots, @@ -234,7 +266,8 @@ void group_recovery_consumer::handle_offset_metadata(offset_metadata_kv md) { if (md.value) { vlog( klog.trace, - "Recovering offset {}/{} with metadata {}", + "[group: {}] recovered {}/{} committed offset: {}", + md.key.group_id, md.key.topic, md.key.partition, *md.value); @@ -248,6 +281,12 @@ void group_recovery_consumer::handle_offset_metadata(offset_metadata_kv md) { group_it->second.update_offset( tp, _batch_base_offset, std::move(*md.value)); } else { + vlog( + klog.trace, + "[group: {}] recovered {}/{} committed offset tombstone", + md.key.group_id, + md.key.topic, + md.key.partition); // tombstone auto group_it = _state.groups.find(md.key.group_id); if (group_it != _state.groups.end()) { diff --git a/src/v/kafka/server/group_stm.cc b/src/v/kafka/server/group_stm.cc index fb31638282853..983974a56bf71 100644 --- a/src/v/kafka/server/group_stm.cc +++ b/src/v/kafka/server/group_stm.cc @@ -3,6 +3,7 @@ #include "cluster/logger.h" #include "kafka/server/group_metadata.h" #include "kafka/types.h" +#include "model/record.h" namespace kafka { @@ -23,31 +24,21 @@ void group_stm::update_offset( .log_offset = offset, .metadata = std::move(meta)}; } -void group_stm::update_prepared( - model::offset offset, group_tx::offsets_metadata val) { - auto tx = group::ongoing_tx_offsets{.pid = val.pid, .tx_seq = val.tx_seq}; - - auto [prepared_it, inserted] = _ongoing_tx_offsets.try_emplace( - tx.pid.get_id(), tx); - if (!inserted && prepared_it->second.pid.epoch > tx.pid.epoch) { +void group_stm::update_tx_offset( + model::offset offset, group_tx::offsets_metadata offset_md) { + auto it = _producers.find(offset_md.pid.get_id()); + if ( + it == _producers.end() || it->second.tx == nullptr + || offset_md.pid.epoch != it->second.epoch) { vlog( cluster::txlog.warn, - "a logged tx {} is fenced off by prev logged tx {}", - val.pid, - prepared_it->second.pid); + "producer {} not found, skipping offsets update", + offset_md.pid); return; - } else if (!inserted && prepared_it->second.pid.epoch < tx.pid.epoch) { - vlog( - cluster::txlog.warn, - "a logged tx {} overwrites prev logged tx {}", - val.pid, - prepared_it->second.pid); - prepared_it->second.pid = tx.pid; - prepared_it->second.offsets.clear(); } const auto now = model::timestamp::now(); - for (const auto& tx_offset : val.offsets) { + for (const auto& tx_offset : offset_md.offsets) { group::offset_metadata md{ .log_offset = offset, .offset = tx_offset.offset, @@ -56,26 +47,25 @@ void group_stm::update_prepared( .commit_timestamp = now, .expiry_timestamp = std::nullopt, }; - prepared_it->second.offsets[tx_offset.tp] = md; + it->second.tx->offsets[tx_offset.tp] = md; } } void group_stm::commit(model::producer_identity pid) { - auto prepared_it = _ongoing_tx_offsets.find(pid.get_id()); - if (prepared_it == _ongoing_tx_offsets.end()) { + auto it = _producers.find(pid.get_id()); + if ( + it == _producers.end() || it->second.tx == nullptr + || pid.epoch != it->second.epoch) { // missing prepare may happen when the consumer log gets truncated - vlog(cluster::txlog.trace, "can't find ongoing tx {}", pid); - return; - } else if (prepared_it->second.pid.epoch != pid.epoch) { vlog( cluster::txlog.warn, - "a comitting tx {} doesn't match ongoing tx {}", - pid, - prepared_it->second.pid); + "unable to find ongoing transaction for producer: {}, skipping " + "commit", + pid); return; } - for (const auto& [tp, md] : prepared_it->second.offsets) { + for (const auto& [tp, md] : it->second.tx->offsets) { offset_metadata_value val{ .offset = md.offset, .leader_epoch @@ -90,25 +80,41 @@ void group_stm::commit(model::producer_identity pid) { _offsets[tp] = logged_metadata{ .log_offset = md.log_offset, .metadata = std::move(val)}; } - - _ongoing_tx_offsets.erase(prepared_it); - _tx_data.erase(pid); - _timeouts.erase(pid); + it->second.tx.reset(); } void group_stm::abort( model::producer_identity pid, [[maybe_unused]] model::tx_seq tx_seq) { - auto prepared_it = _ongoing_tx_offsets.find(pid.get_id()); - if ( - prepared_it - == _ongoing_tx_offsets.end()) { // NOLINT(bugprone-branch-clone) - return; - } else if (prepared_it->second.pid.epoch != pid.epoch) { - return; + auto it = _producers.find(pid.get_id()); + if (it != _producers.end() && it->second.epoch == pid.get_epoch()) { + it->second.tx.reset(); + } +} + +void group_stm::try_set_fence( + model::producer_id id, model::producer_epoch epoch) { + auto [it, _] = _producers.try_emplace(id, epoch); + if (it->second.epoch < epoch) { + it->second.epoch = epoch; + } +} + +void group_stm::try_set_fence( + model::producer_id id, + model::producer_epoch epoch, + model::tx_seq txseq, + model::timeout_clock::duration transaction_timeout_ms, + model::partition_id tm_partition) { + auto [it, _] = _producers.try_emplace(id, epoch); + if (it->second.epoch <= epoch) { + it->second.epoch = epoch; + it->second.tx = std::make_unique(ongoing_tx{ + .tx_seq = txseq, + .tm_partition = tm_partition, + .timeout = transaction_timeout_ms, + .offsets = {}, + }); } - _ongoing_tx_offsets.erase(prepared_it); - _tx_data.erase(pid); - _timeouts.erase(pid); } } // namespace kafka diff --git a/src/v/kafka/server/group_stm.h b/src/v/kafka/server/group_stm.h index 8ec8f8a93b8cc..d9a99c136c952 100644 --- a/src/v/kafka/server/group_stm.h +++ b/src/v/kafka/server/group_stm.h @@ -18,9 +18,10 @@ #include #include -#include #include +#include + namespace kafka { class group_stm { @@ -30,84 +31,55 @@ class group_stm { offset_metadata_value metadata; }; - struct tx_info { + struct ongoing_tx { model::tx_seq tx_seq; model::partition_id tm_partition; + model::timeout_clock::duration timeout; + chunked_hash_map + offsets; }; + struct producer { + model::producer_epoch epoch; + std::unique_ptr tx; + }; void overwrite_metadata(group_metadata_value&&); void update_offset( const model::topic_partition&, model::offset, offset_metadata_value&&); void remove_offset(const model::topic_partition&); - void update_prepared(model::offset, group_tx::offsets_metadata); + void update_tx_offset(model::offset, group_tx::offsets_metadata); void commit(model::producer_identity); void abort(model::producer_identity, model::tx_seq); - void try_set_fence(model::producer_id id, model::producer_epoch epoch) { - auto [fence_it, _] = _fence_pid_epoch.try_emplace(id, epoch); - if (fence_it->second < epoch) { - fence_it->second = epoch; - } - } + void try_set_fence(model::producer_id id, model::producer_epoch epoch); void try_set_fence( model::producer_id id, model::producer_epoch epoch, model::tx_seq txseq, model::timeout_clock::duration transaction_timeout_ms, - model::partition_id tm_partition) { - auto [fence_it, _] = _fence_pid_epoch.try_emplace(id, epoch); - if (fence_it->second <= epoch) { - fence_it->second = epoch; - model::producer_identity pid(id(), epoch()); - _tx_data[pid] = tx_info{txseq, tm_partition}; - _timeouts[pid] = transaction_timeout_ms; - } - } + model::partition_id tm_partition); + bool has_data() const { return !_is_removed && (_is_loaded || _offsets.size() > 0); } bool is_removed() const { return _is_removed; } - - const absl::node_hash_map& - ongoing_tx_offsets() const { - return _ongoing_tx_offsets; + const chunked_hash_map& producers() const { + return _producers; } - const absl::node_hash_map& + const chunked_hash_map& offsets() const { return _offsets; } - const absl::node_hash_map& - fences() const { - return _fence_pid_epoch; - } - - const absl::node_hash_map& - tx_data() const { - return _tx_data; - } - - const absl:: - node_hash_map& - timeouts() const { - return _timeouts; - } - group_metadata_value& get_metadata() { return _metadata; } const group_metadata_value& get_metadata() const { return _metadata; } private: - absl::node_hash_map _offsets; - absl::node_hash_map - _ongoing_tx_offsets; - absl::node_hash_map - _fence_pid_epoch; - absl::node_hash_map _tx_data; - absl:: - node_hash_map - _timeouts; + chunked_hash_map _offsets; + chunked_hash_map _producers; + group_metadata_value _metadata; bool _is_loaded{false}; bool _is_removed{false}; diff --git a/src/v/kafka/server/tests/consumer_group_recovery_test.cc b/src/v/kafka/server/tests/consumer_group_recovery_test.cc index fef14d835126a..a934381ec884e 100644 --- a/src/v/kafka/server/tests/consumer_group_recovery_test.cc +++ b/src/v/kafka/server/tests/consumer_group_recovery_test.cc @@ -7,6 +7,7 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0 +#include "container/chunked_hash_map.h" #include "kafka/protocol/types.h" #include "kafka/server/group_metadata.h" #include "kafka/server/group_recovery_consumer.h" @@ -170,9 +171,9 @@ struct cg_recovery_test_fixture : seastar_test { */ template void expect_committed_offsets( - const absl::node_hash_map< + const chunked_hash_map< model::topic_partition, - group_stm::logged_metadata> metadata, + group_stm::logged_metadata>& metadata, Args... offset_desc) { absl::node_hash_map expected_set; (expected_set.emplace(parse_offset(offset_desc)), ...); @@ -390,9 +391,7 @@ TEST_F_CORO(cg_recovery_test_fixture, test_tx_happy_path) { make_tx_offset("topic-3", 12, 1)}})); auto state = co_await recover_from_batches(copy_batches(batches)); - EXPECT_EQ(state.groups[gr_1].ongoing_tx_offsets().size(), 1); - EXPECT_EQ(state.groups[gr_1].tx_data().size(), 1); - EXPECT_EQ(state.groups[gr_1].fences().size(), 1); + EXPECT_EQ(state.groups[gr_1].producers().size(), 1); // tx is ongoing offsets included in the transaction should not be // visible in state machine @@ -407,9 +406,8 @@ TEST_F_CORO(cg_recovery_test_fixture, test_tx_happy_path) { group_tx::commit_metadata{.group_id = gr_1})); state = co_await recover_from_batches(copy_batches(batches)); - EXPECT_TRUE(state.groups[gr_1].ongoing_tx_offsets().empty()); - EXPECT_TRUE(state.groups[gr_1].tx_data().empty()); - EXPECT_EQ(state.groups[gr_1].fences().size(), 1); + + EXPECT_EQ(state.groups[gr_1].producers().size(), 1); expect_committed_offsets( state.groups[gr_1].offsets(), @@ -433,10 +431,8 @@ TEST_F_CORO(cg_recovery_test_fixture, test_tx_happy_path) { "test-2/10@256", "topic-3/12@1"); - EXPECT_TRUE(state.groups[gr_1].tx_data().empty()); - EXPECT_TRUE(state.groups[gr_1].ongoing_tx_offsets().empty()); - EXPECT_TRUE(state.groups[gr_1].tx_data().empty()); - // EXPECT_TRUE(state.groups[gr_1].fences().empty()); + EXPECT_EQ(state.groups[gr_1].producers().size(), 1); + EXPECT_EQ(state.groups[gr_1].producers().begin()->second.tx, nullptr); } TEST_F_CORO(cg_recovery_test_fixture, test_tx_abort) { @@ -476,10 +472,9 @@ TEST_F_CORO(cg_recovery_test_fixture, test_tx_abort) { pid, group_tx::abort_metadata{.group_id = gr_1, .tx_seq = tx_seq})); auto state = co_await recover_from_batches(copy_batches(batches)); - EXPECT_TRUE(state.groups[gr_1].tx_data().empty()); - EXPECT_TRUE(state.groups[gr_1].ongoing_tx_offsets().empty()); - EXPECT_TRUE(state.groups[gr_1].tx_data().empty()); - // EXPECT_TRUE(state.groups[gr_1].fences().empty()); + EXPECT_EQ(state.groups[gr_1].producers().size(), 1); + EXPECT_EQ(state.groups[gr_1].producers().begin()->second.tx, nullptr); + expect_committed_offsets( state.groups[gr_1].offsets(), "test-1/0@1024", "test-2/10@256"); @@ -490,10 +485,9 @@ TEST_F_CORO(cg_recovery_test_fixture, test_tx_abort) { pid, group_tx::commit_metadata{.group_id = gr_1})); - EXPECT_TRUE(state.groups[gr_1].tx_data().empty()); - EXPECT_TRUE(state.groups[gr_1].ongoing_tx_offsets().empty()); - EXPECT_TRUE(state.groups[gr_1].tx_data().empty()); - // EXPECT_TRUE(state.groups[gr_1].fences().empty()); + EXPECT_EQ(state.groups[gr_1].producers().size(), 1); + EXPECT_EQ(state.groups[gr_1].producers().begin()->second.tx, nullptr); + expect_committed_offsets( state.groups[gr_1].offsets(), "test-1/0@1024", "test-2/10@256"); } diff --git a/tests/rptest/transactions/tx_atomic_produce_consume_test.py b/tests/rptest/transactions/tx_atomic_produce_consume_test.py new file mode 100644 index 0000000000000..45280b67eec47 --- /dev/null +++ b/tests/rptest/transactions/tx_atomic_produce_consume_test.py @@ -0,0 +1,370 @@ +# Copyright 2024 Redpanda Data, Inc. +# +# Use of this software is governed by the Business Source License +# included in the file licenses/BSL.md +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0 + +from collections import defaultdict +from concurrent.futures import ThreadPoolExecutor +import threading +from concurrent import futures +import time +from rptest.services.cluster import cluster +from rptest.clients.types import TopicSpec +from rptest.services.admin import Admin +import confluent_kafka as ck +from rptest.tests.redpanda_test import RedpandaTest +from rptest.clients.rpk import RpkTool +from ducktape.utils.util import wait_until + + +def deserialize_str(value: bytes, ctx) -> str: + return value.decode('utf-8') + + +class ExactlyOnceVerifier(): + """ + A class populating topic with some data and then executing the transformation + of the data from source topic to the destination topic. + When data are transformed we are using transactions to achieve the exactly + once semantics + """ + def __init__(self, + redpanda, + src_topic: str, + dst_topic: str, + transform_func, + logger, + max_messages: int | None = None, + tx_id: str = "transformer-tx", + group_id: str = "transformer-group", + commit_every: int = 50, + timeout_sec: int = 60): + self._src_topic = src_topic + self._dst_topic = dst_topic + self._transform_func = transform_func + self._logger = logger + self._redpanda = redpanda + self._stop_ev = threading.Event() + self._max_messages = max_messages + self._tx_id = tx_id + self._group_id = group_id + self._commit_every = commit_every + self._produced_to_src = 0 + self._lock = threading.Lock() + self._tasks = [] + self._timeout_sec = timeout_sec + self._error = None + self._finished_partitions = set() + self._consumer_cnt = 0 + + def start_src_producer(self): + produced_per_partition = defaultdict(int) + self.last_report = time.time() + + def delivery_report(err, msg): + if err is None: + produced_per_partition[msg.partition()] += 1 + with self._lock: + self._produced_to_src += 1 + now = time.time() + if now - self.last_report > 5: + self.last_report = now + self._logger.info( + f"Produced {self._produced_to_src} messages to source topic", + ) + + producer = ck.Producer({'bootstrap.servers': self._redpanda.brokers()}) + + self._logger.info( + f"Starting source topic {self._src_topic} producer. Max messages to produce: {self._max_messages}" + ) + i = 0 + while not self._stop_ev.is_set(): + producer.produce(self._src_topic, + value=str(i), + key=str(i), + on_delivery=delivery_report) + i += 1 + if self._max_messages is not None and i >= self._max_messages: + self._stop_ev.set() + + producer.flush() + self._logger.info(f"Messages per partition: {produced_per_partition}") + + def produced_messages(self, to_wait: int): + with self._lock: + return self._produced_to_src >= to_wait + + def request_stop(self): + self._stop_ev.set() + + def wait_for_finish(self, timeout_sec: int = 30): + futures.wait(self._tasks, + timeout=timeout_sec, + return_when=futures.ALL_COMPLETED) + + def start_workload(self, workers: int): + with ThreadPoolExecutor(max_workers=workers + 1) as executor: + self._tasks.append( + executor.submit(lambda: self.start_src_producer())) + wait_until(lambda: self.produced_messages(10), 30, 0.5) + + def transform(id: str): + try: + self.tx_transform(tx_id=id) + except ck.KafkaError as e: + self._logger.error( + f"Client error reported: {e.error} - {e.reason}, retryable: {e.retryable}" + ) + except BaseException as e: + self._logger.error( + f"Error transforming {id} - {e} - {type(e)}") + + for i in range(workers): + id = f"{self._tx_id}-{i}" + self._tasks.append(executor.submit(transform, id)) + + def get_high_watermarks(self, topic): + rpk = RpkTool(self._redpanda) + + return {p.id: p.high_watermark for p in rpk.describe_topic(topic)} + + def tx_transform(self, tx_id): + start = time.time() + self._logger.info(f"Starting tx-transform with tx_id: {tx_id}") + producer = ck.Producer({ + 'bootstrap.servers': self._redpanda.brokers(), + 'transactional.id': tx_id, + }) + + consumer = ck.DeserializingConsumer({ + 'bootstrap.servers': + self._redpanda.brokers(), + 'group.id': + self._group_id, + 'auto.offset.reset': + 'earliest', + 'enable.auto.commit': + False, + 'isolation.level': + 'read_committed', + 'value.deserializer': + deserialize_str, + 'key.deserializer': + deserialize_str, + }) + try: + with self._lock: + self._consumer_cnt += 1 + + def reached_end(): + high_watermarks = self.get_high_watermarks(self._src_topic) + + assignments = {tp.partition for tp in consumer.assignment()} + if len(assignments) == 0: + return False + + positions = consumer.position([ + ck.TopicPartition(self._src_topic, p) for p in assignments + ]) + end_for = { + p.partition + for p in positions if p.partition in high_watermarks + and high_watermarks[p.partition] <= p.offset + } + self._logger.info( + f"[{tx_id}] Topic {self._src_topic} partitions high watermarks" + f"{high_watermarks}, assignment: {assignments} positions: {positions}, end_for: {end_for}" + ) + + consumers = 0 + with self._lock: + self._finished_partitions |= end_for + consumers = self._consumer_cnt + if len(self._finished_partitions) == len(high_watermarks): + return True + + return len(end_for) == len(assignments) and consumers > 1 + + in_transaction = False + + def on_assign(consumer, partitions): + self._logger.info(f"[{tx_id}] Assigned {partitions}") + + def on_revoke(consumer, partitions): + nonlocal in_transaction + self._logger.info(f"[{tx_id}] Revoked {partitions}") + if in_transaction: + self._logger.info(f"[{tx_id}] abort transaction") + producer.abort_transaction() + in_transaction = False + + consumer.subscribe([self._src_topic], + on_assign=on_assign, + on_revoke=on_revoke) + producer.init_transactions() + + msg_cnt = 0 + while True: + msg = consumer.poll(timeout=1) + if msg: + if not in_transaction: + self._logger.info(f"[{tx_id}] begin transaction") + producer.begin_transaction() + in_transaction = True + msg_cnt += 1 + t_key, t_value = self._transform_func( + msg.key(), msg.value()) + + producer.produce(self._dst_topic, + value=t_value, + key=t_key, + partition=msg.partition()) + + if msg_cnt % self._commit_every == 0: + self._logger.info( + f"[{tx_id}] Committing transaction after {msg_cnt} messages. " + f"Current consumer positions: {consumer.position(consumer.assignment())}" + ) + producer.send_offsets_to_transaction( + consumer.position(consumer.assignment()), + consumer.consumer_group_metadata()) + producer.commit_transaction() + in_transaction = False + + if reached_end(): + self._logger.info(f"{tx_id} reached end") + break + + if time.time() - start > self._timeout_sec: + self._logger.error(f"timeout waiting for {tx_id} producer") + self._error = TimeoutError( + "timeout waiting for {tx_id} producer") + self._stop_ev.set() + break + + producer.send_offsets_to_transaction( + consumer.position(consumer.assignment()), + consumer.consumer_group_metadata()) + + producer.commit_transaction() + in_transaction = False + except ck.KafkaError as e: + self._logger.error( + f"Client error reported: {e.error} - {e.reason}, retryable: {e.retryable}" + ) + raise e + finally: + consumer.close() + producer.flush() + with self._lock: + self._consumer_cnt -= 1 + + def validate_transform(self): + src_high_watermarks = self.get_high_watermarks(self._src_topic) + dst_high_watermarks = self.get_high_watermarks(self._dst_topic) + for p, hw in src_high_watermarks.items(): + assert dst_high_watermarks[ + p] >= hw, "Transformed partitions must the same or larger watermark than source" + src_msgs = defaultdict(list) + dst_msgs = defaultdict(list) + consumer = ck.DeserializingConsumer({ + 'bootstrap.servers': + self._redpanda.brokers(), + 'group.id': + f'validator-group-{self._group_id}', + 'auto.offset.reset': + 'earliest', + 'enable.auto.commit': + True, + 'value.deserializer': + deserialize_str, + 'key.deserializer': + deserialize_str, + }) + consumer.subscribe([self._src_topic, self._dst_topic]) + + def is_finished(): + positions = consumer.position(consumer.assignment()) + if len(positions + ) != len(src_high_watermarks) + len(dst_high_watermarks): + return False + + for tp in positions: + if tp.topic == self._src_topic: + if tp.offset < src_high_watermarks[tp.partition]: + return False + else: + if tp.offset < dst_high_watermarks[tp.partition]: + return False + + return True + + while True: + if is_finished(): + break + msg = consumer.poll(1) + if msg is None or msg.error(): + continue + p = msg.partition() + if msg.topic() == self._src_topic: + src_msgs[p].append(msg) + else: + dst_msgs[p].append(msg) + + if len(src_msgs[p]) > 0 and len(dst_msgs[p]) > 0: + s_msg = src_msgs[p][0] + d_msg = dst_msgs[p][0] + self._logger.debug( + f"src: {s_msg.topic()}/{s_msg.partition()}@{s_msg.offset()} - {s_msg.key()}={s_msg.value()}" + ) + self._logger.debug( + f"dst: {d_msg.topic()}/{d_msg.partition()}@{d_msg.offset()} - {d_msg.key()}={d_msg.value()}" + ) + t_key, t_val = self._transform_func(s_msg.key(), s_msg.value()) + assert d_msg.value() == t_val + assert d_msg.key() == t_key + src_msgs[p].pop(0) + dst_msgs[p].pop(0) + + +class TxAtomicProduceConsumeTest(RedpandaTest): + def __init__(self, test_context): + super(TxAtomicProduceConsumeTest, + self).__init__(test_context=test_context, num_brokers=3) + + self.admin = Admin(self.redpanda) + + @cluster(num_nodes=3) + def test_basic_tx_consumer_transform_produce(self): + partition_count = 5 + topic = TopicSpec(name='src-test-exactly-once', + replication_factor=3, + partition_count=partition_count) + + dst_topic = TopicSpec(name='dst-test-exactly-once', + replication_factor=3, + partition_count=partition_count) + self.redpanda.set_cluster_config( + {"group_new_member_join_timeout": "3000"}) + self.client().create_topic(topic) + self.client().create_topic(dst_topic) + + def simple_transform(k, v): + return f"{k}-t", f"{v}-tv" + + transformer = ExactlyOnceVerifier(self.redpanda, + topic.name, + dst_topic.name, + simple_transform, + self.logger, + 2000, + timeout_sec=180) + + transformer.start_workload(2) + transformer.wait_for_finish() + transformer.validate_transform() diff --git a/tests/rptest/transactions/tx_verifier_test.py b/tests/rptest/transactions/tx_verifier_test.py index d8c92e2d27da5..9f4d464cd7647 100644 --- a/tests/rptest/transactions/tx_verifier_test.py +++ b/tests/rptest/transactions/tx_verifier_test.py @@ -59,7 +59,8 @@ def verify(self, tests): test=test, brokers=self.redpanda.brokers()) subprocess.check_output(["/bin/sh", "-c", cmd], - stderr=subprocess.STDOUT) + stderr=subprocess.STDOUT, + timeout=240) self.redpanda.logger.info( "txn test \"{test}\" passed".format(test=test)) except subprocess.CalledProcessError as e: