Skip to content

Commit

Permalink
KAFKA-18662: Return CONCURRENT_TRANSACTIONS on produce request in TV2 (
Browse files Browse the repository at this point in the history
…#18733)

While testing, it was found that the not_enough_replicas error was super common and could be easily confused. Since we are already bumping the request, we can signify that the produce request may return this error and new clients can handle it 

(Note, the java client should be able to handle this already as a retriable error, but other client libraries may need to implement this change)

Reviewers: Justine Olshan <[email protected]>
  • Loading branch information
CalvinConfluent authored Jan 29, 2025
1 parent 632aedc commit a3b34c1
Show file tree
Hide file tree
Showing 4 changed files with 129 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
* {@link Errors#INVALID_RECORD}
* {@link Errors#INVALID_TXN_STATE}
* {@link Errors#INVALID_PRODUCER_ID_MAPPING}
* {@link Errors#CONCURRENT_TRANSACTIONS}
*/
public class ProduceResponse extends AbstractResponse {
public static final long INVALID_OFFSET = -1L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1401,6 +1401,65 @@ public void testAbortTransaction() {
}
}

@Test
public void testTransactionV2ProduceWithConcurrentTransactionError() throws Exception {
StringSerializer serializer = new StringSerializer();
KafkaProducerTestContext<String> ctx = new KafkaProducerTestContext<>(testInfo, serializer);

String topic = "foo";
TopicPartition topicPartition = new TopicPartition(topic, 0);
Cluster cluster = TestUtils.singletonCluster(topic, 1);

when(ctx.sender.isRunning()).thenReturn(true);
when(ctx.metadata.fetch()).thenReturn(cluster);

long timestamp = ctx.time.milliseconds();
ProducerRecord<String, String> record = new ProducerRecord<>(topic, 0, timestamp, "key", "value");

Properties props = new Properties();
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
props.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "some-txn");
props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
ProducerConfig config = new ProducerConfig(props);

Time time = new MockTime(1);
MetadataResponse initialUpdateResponse = RequestTestUtils.metadataUpdateWith(1, singletonMap(topic, 1));
ProducerMetadata metadata = newMetadata(0, 0, Long.MAX_VALUE);
MockClient client = new MockClient(time, metadata);
client.updateMetadata(initialUpdateResponse);
NodeApiVersions nodeApiVersions = new NodeApiVersions(NodeApiVersions.create().allSupportedApiVersions().values(),
Arrays.asList(new ApiVersionsResponseData.SupportedFeatureKey()
.setName("transaction.version")
.setMaxVersion((short) 2)
.setMinVersion((short) 0)),
Arrays.asList(new ApiVersionsResponseData.FinalizedFeatureKey()
.setName("transaction.version")
.setMaxVersionLevel((short) 2)
.setMinVersionLevel((short) 2)),
0);
client.setNodeApiVersions(nodeApiVersions);
ApiVersions apiVersions = new ApiVersions();
apiVersions.update(NODE.idString(), nodeApiVersions);

ProducerInterceptors<String, String> interceptor = new ProducerInterceptors<>(Collections.emptyList());

client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, "some-txn", NODE));
client.prepareResponse(initProducerIdResponse(1L, (short) 5, Errors.NONE));
client.prepareResponse(produceResponse(topicPartition, 1L, Errors.CONCURRENT_TRANSACTIONS, 0, 1));
client.prepareResponse(produceResponse(topicPartition, 1L, Errors.NONE, 0, 1));
client.prepareResponse(endTxnResponse(Errors.NONE));

try (KafkaProducer<String, String> producer = new KafkaProducer<>(
config, new StringSerializer(), new StringSerializer(), metadata, client, interceptor, apiVersions, time)
) {
producer.initTransactions();
producer.beginTransaction();
producer.send(record).get();
producer.commitTransaction();
}
}

@Test
public void testMeasureAbortTransactionDuration() {
Map<String, Object> configs = new HashMap<>();
Expand Down
12 changes: 10 additions & 2 deletions core/src/main/scala/kafka/server/ReplicaManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -738,12 +738,20 @@ class ReplicaManager(val config: KafkaConfig,
// retry correctly. Translate these to an error which will cause such clients to retry
// the produce request. We pick `NOT_ENOUGH_REPLICAS` because it does not trigger a
// metadata refresh.
case Errors.CONCURRENT_TRANSACTIONS |
Errors.NETWORK_EXCEPTION |
case Errors.NETWORK_EXCEPTION |
Errors.COORDINATOR_LOAD_IN_PROGRESS |
Errors.COORDINATOR_NOT_AVAILABLE |
Errors.NOT_COORDINATOR => Some(new NotEnoughReplicasException(
s"Unable to verify the partition has been added to the transaction. Underlying error: ${error.toString}"))
case Errors.CONCURRENT_TRANSACTIONS =>
if (transactionSupportedOperation != addPartition) {
Some(new NotEnoughReplicasException(
s"Unable to verify the partition has been added to the transaction. Underlying error: ${error.toString}"))
} else {
// Don't convert the Concurrent Transaction exception for TV2. Because the error is very common during
// the transaction commit phase. Returning Concurrent Transaction is less confusing to the client.
None
}
case _ => None
}
topicPartition -> LogAppendResult(
Expand Down
64 changes: 59 additions & 5 deletions core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,6 @@ class ReplicaManagerTest {
private var mockRemoteLogManager: RemoteLogManager = _
private var addPartitionsToTxnManager: AddPartitionsToTxnManager = _
private var brokerTopicStats: BrokerTopicStats = _
private val transactionSupportedOperation = genericErrorSupported
private val quotaExceededThrottleTime = 1000
private val quotaAvailableThrottleTime = 0

Expand Down Expand Up @@ -2516,6 +2515,55 @@ class ReplicaManagerTest {
}
}

@ParameterizedTest
@EnumSource(
value = classOf[Errors],
names = Array(
"NOT_COORDINATOR",
"NETWORK_EXCEPTION",
"COORDINATOR_LOAD_IN_PROGRESS",
"COORDINATOR_NOT_AVAILABLE"
)
)
def testVerificationErrorConversionsTV2(error: Errors): Unit = {
val tp0 = new TopicPartition(topic, 0)
val producerId = 24L
val producerEpoch = 0.toShort
val sequence = 0
val addPartitionsToTxnManager = mock(classOf[AddPartitionsToTxnManager])

val replicaManager = setUpReplicaManagerWithMockedAddPartitionsToTxnManager(addPartitionsToTxnManager, List(tp0))
try {
replicaManager.becomeLeaderOrFollower(1,
makeLeaderAndIsrRequest(topicIds(tp0.topic), tp0, Seq(0, 1), new LeaderAndIsr(1, List(0, 1).map(Int.box).asJava)),
(_, _) => ())

val transactionalRecords = MemoryRecords.withTransactionalRecords(Compression.NONE, producerId, producerEpoch, sequence,
new SimpleRecord("message".getBytes))

// Start verification and return the coordinator related errors.
val expectedMessage = s"Unable to verify the partition has been added to the transaction. Underlying error: ${error.toString}"
val result = handleProduceAppend(replicaManager, tp0, transactionalRecords, transactionalId = transactionalId, transactionSupportedOperation = addPartition)
val appendCallback = ArgumentCaptor.forClass(classOf[AddPartitionsToTxnManager.AppendCallback])
verify(addPartitionsToTxnManager, times(1)).addOrVerifyTransaction(
ArgumentMatchers.eq(transactionalId),
ArgumentMatchers.eq(producerId),
ArgumentMatchers.eq(producerEpoch),
ArgumentMatchers.eq(Seq(tp0)),
appendCallback.capture(),
any()
)

// Confirm we did not write to the log and instead returned the converted error with the correct error message.
val callback: AddPartitionsToTxnManager.AppendCallback = appendCallback.getValue()
callback(Map(tp0 -> error).toMap)
assertEquals(Errors.NOT_ENOUGH_REPLICAS, result.assertFired.error)
assertEquals(expectedMessage, result.assertFired.errorMessage)
} finally {
replicaManager.shutdown(checkpointHW = false)
}
}

@ParameterizedTest
@EnumSource(
value = classOf[Errors],
Expand All @@ -2527,7 +2575,7 @@ class ReplicaManagerTest {
"COORDINATOR_NOT_AVAILABLE"
)
)
def testVerificationErrorConversions(error: Errors): Unit = {
def testVerificationErrorConversionsTV1(error: Errors): Unit = {
val tp0 = new TopicPartition(topic, 0)
val producerId = 24L
val producerEpoch = 0.toShort
Expand Down Expand Up @@ -2869,7 +2917,9 @@ class ReplicaManagerTest {
entriesToAppend: Map[TopicPartition, MemoryRecords],
transactionalId: String,
origin: AppendOrigin = AppendOrigin.CLIENT,
requiredAcks: Short = -1): CallbackResult[Map[TopicPartition, PartitionResponse]] = {
requiredAcks: Short = -1,
transactionSupportedOperation: TransactionSupportedOperation = genericErrorSupported
): CallbackResult[Map[TopicPartition, PartitionResponse]] = {
val result = new CallbackResult[Map[TopicPartition, PartitionResponse]]()
def appendCallback(responses: Map[TopicPartition, PartitionResponse]): Unit = {
responses.foreach( response => assertTrue(responses.get(response._1).isDefined))
Expand All @@ -2894,7 +2944,9 @@ class ReplicaManagerTest {
records: MemoryRecords,
origin: AppendOrigin = AppendOrigin.CLIENT,
requiredAcks: Short = -1,
transactionalId: String): CallbackResult[PartitionResponse] = {
transactionalId: String,
transactionSupportedOperation: TransactionSupportedOperation = genericErrorSupported
): CallbackResult[PartitionResponse] = {
val result = new CallbackResult[PartitionResponse]()

def appendCallback(responses: Map[TopicPartition, PartitionResponse]): Unit = {
Expand Down Expand Up @@ -2922,7 +2974,9 @@ class ReplicaManagerTest {
transactionalId: String,
producerId: Long,
producerEpoch: Short,
baseSequence: Int = 0): CallbackResult[Either[Errors, VerificationGuard]] = {
baseSequence: Int = 0,
transactionSupportedOperation: TransactionSupportedOperation = genericErrorSupported
): CallbackResult[Either[Errors, VerificationGuard]] = {
val result = new CallbackResult[Either[Errors, VerificationGuard]]()
def postVerificationCallback(errorAndGuard: (Errors, VerificationGuard)): Unit = {
val (error, verificationGuard) = errorAndGuard
Expand Down

0 comments on commit a3b34c1

Please sign in to comment.