Skip to content

Commit

Permalink
KAFKA-18157:Consider UnsupportedVersionException child class to repre…
Browse files Browse the repository at this point in the history
…sent the case of unsupported fields
  • Loading branch information
frankvicky committed Jan 19, 2025
1 parent 20e616e commit 6b93d6e
Show file tree
Hide file tree
Showing 18 changed files with 102 additions and 57 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.errors;

public class UnsupportedProtocolFieldException extends UnsupportedVersionException {
private static final long serialVersionUID = 1L;

public UnsupportedProtocolFieldException(String fieldOrValue, String apiKeyName, int apiVersion, int lowestSupportedVersion) {
super("The cluster does not support [" + fieldOrValue + "] in " + apiKeyName + " API version " + apiVersion +
". Upgrade the cluster to " + apiKeyName + " API version >= " + lowestSupportedVersion +
" to enable [" + fieldOrValue + "].");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.acl.AclPermissionType;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.errors.UnsupportedProtocolFieldException;
import org.apache.kafka.common.message.CreateAclsRequestData;
import org.apache.kafka.common.message.CreateAclsRequestData.AclCreation;
import org.apache.kafka.common.message.CreateAclsResponseData;
Expand All @@ -33,8 +33,10 @@
import org.apache.kafka.common.resource.ResourceType;

import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;

public class CreateAclsRequest extends AbstractRequest {

Expand Down Expand Up @@ -91,8 +93,13 @@ private void validate(CreateAclsRequestData data) {
if (version() == 0) {
final boolean unsupported = data.creations().stream().anyMatch(creation ->
creation.resourcePatternType() != PatternType.LITERAL.code());
if (unsupported)
throw new UnsupportedVersionException("Version 0 only supports literal resource pattern types");
if (unsupported) {
String unsupportedType = Arrays.stream(PatternType.values())
.filter(type -> type != PatternType.LITERAL)
.map(PatternType::name)
.collect(Collectors.joining(","));
throw new UnsupportedProtocolFieldException(unsupportedType, apiKey().name(), version(), 1);
}
}

final boolean unknown = data.creations().stream().anyMatch(creation ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/
package org.apache.kafka.common.requests;

import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.errors.UnsupportedProtocolFieldException;
import org.apache.kafka.common.message.CreateTopicsRequestData;
import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic;
import org.apache.kafka.common.message.CreateTopicsResponseData;
Expand All @@ -40,8 +40,7 @@ public Builder(CreateTopicsRequestData data) {
@Override
public CreateTopicsRequest build(short version) {
if (data.validateOnly() && version == 0)
throw new UnsupportedVersionException("validateOnly is not supported in version 0 of " +
"CreateTopicsRequest");
throw new UnsupportedProtocolFieldException("validateOnly", apiKey().name(), version, 1);

final List<String> topicsWithDefaults = data.topics()
.stream()
Expand All @@ -53,10 +52,7 @@ public CreateTopicsRequest build(short version) {
.collect(Collectors.toList());

if (!topicsWithDefaults.isEmpty() && version < 4) {
throw new UnsupportedVersionException("Creating topics with default "
+ "partitions/replication factor are only supported in CreateTopicRequest "
+ "version 4+. The following topics need values for partitions and replicas: "
+ topicsWithDefaults);
throw new UnsupportedProtocolFieldException(String.join(",", topicsWithDefaults), apiKey().name(), version, 4);
}

return new CreateTopicsRequest(data, version);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.acl.AclPermissionType;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.errors.UnsupportedProtocolFieldException;
import org.apache.kafka.common.message.DeleteAclsRequestData;
import org.apache.kafka.common.message.DeleteAclsRequestData.DeleteAclsFilter;
import org.apache.kafka.common.message.DeleteAclsResponseData;
Expand All @@ -32,6 +32,7 @@
import org.apache.kafka.common.resource.ResourceType;

import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -77,9 +78,13 @@ private void normalizeAndValidate() {
// to LITERAL. Note that the wildcard `*` is considered `LITERAL` for compatibility reasons.
if (patternType == PatternType.ANY)
filter.setPatternTypeFilter(PatternType.LITERAL.code());
else if (patternType != PatternType.LITERAL)
throw new UnsupportedVersionException("Version 0 does not support pattern type " +
patternType + " (only LITERAL and ANY are supported)");
else if (patternType != PatternType.LITERAL) {
String unsupportedTypes = Arrays.stream(PatternType.values())
.filter(type -> type != PatternType.ANY && type != PatternType.LITERAL)
.map(PatternType::name)
.collect(Collectors.joining(","));
throw new UnsupportedProtocolFieldException(unsupportedTypes, apiKey().name(), version(), 1);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.acl.AclPermissionType;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.errors.UnsupportedProtocolFieldException;
import org.apache.kafka.common.message.DescribeAclsRequestData;
import org.apache.kafka.common.message.DescribeAclsResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
Expand All @@ -30,6 +30,8 @@
import org.apache.kafka.common.resource.ResourceType;

import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.stream.Collectors;

public class DescribeAclsRequest extends AbstractRequest {

Expand Down Expand Up @@ -77,8 +79,13 @@ private void normalizeAndValidate(short version) {
// to LITERAL. Note that the wildcard `*` is considered `LITERAL` for compatibility reasons.
if (patternType == PatternType.ANY)
data.setPatternTypeFilter(PatternType.LITERAL.code());
else if (patternType != PatternType.LITERAL)
throw new UnsupportedVersionException("Version 0 only supports literal resource pattern types");
else if (patternType != PatternType.LITERAL) {
String unsupportedTypes = Arrays.stream(PatternType.values())
.filter(type -> type != PatternType.LITERAL)
.map(PatternType::name)
.collect(Collectors.joining(","));
throw new UnsupportedProtocolFieldException(unsupportedTypes, apiKey().name(), version, 1);
}
}

if (data.patternTypeFilter() == PatternType.UNKNOWN.code()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import org.apache.kafka.common.ElectionType;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.errors.UnsupportedProtocolFieldException;
import org.apache.kafka.common.message.ElectLeadersRequestData;
import org.apache.kafka.common.message.ElectLeadersRequestData.TopicPartitions;
import org.apache.kafka.common.message.ElectLeadersResponseData.PartitionResult;
Expand All @@ -30,6 +30,7 @@

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -65,7 +66,11 @@ public String toString() {

private ElectLeadersRequestData toRequestData(short version) {
if (electionType != ElectionType.PREFERRED && version == 0) {
throw new UnsupportedVersionException("API Version 0 only supports PREFERRED election type");
String unsupportedTypes = Arrays.stream(ElectionType.values())
.filter(type -> type != ElectionType.PREFERRED)
.map(ElectionType::name)
.collect(Collectors.joining(","));
throw new UnsupportedProtocolFieldException(unsupportedTypes, apiKey().name(), version, 1);
}

ElectLeadersRequestData data = new ElectLeadersRequestData()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import org.apache.kafka.common.Node;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.UnsupportedProtocolFieldException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.FindCoordinatorRequestData;
import org.apache.kafka.common.message.FindCoordinatorResponseData;
Expand All @@ -43,8 +44,7 @@ public Builder(FindCoordinatorRequestData data) {
@Override
public FindCoordinatorRequest build(short version) {
if (version < 1 && data.keyType() == CoordinatorType.TRANSACTION.id()) {
throw new UnsupportedVersionException("Cannot create a v" + version + " FindCoordinator request " +
"because we require features supported only in 2 or later.");
throw new UnsupportedProtocolFieldException(CoordinatorType.TRANSACTION.name(), apiKey().name(), version, 2);
}
int batchedKeys = data.coordinatorKeys().size();
if (version < MIN_BATCHED_VERSION) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/
package org.apache.kafka.common.requests;

import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.errors.UnsupportedProtocolFieldException;
import org.apache.kafka.common.message.HeartbeatRequestData;
import org.apache.kafka.common.message.HeartbeatResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
Expand All @@ -38,8 +38,7 @@ public Builder(HeartbeatRequestData data) {
@Override
public HeartbeatRequest build(short version) {
if (data.groupInstanceId() != null && version < 3) {
throw new UnsupportedVersionException("The broker heartbeat protocol version " +
version + " does not support usage of config group.instance.id.");
throw new UnsupportedProtocolFieldException("GroupInstanceId", apiKey().name(), version, 3);
}
return new HeartbeatRequest(data, version);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package org.apache.kafka.common.requests;

import org.apache.kafka.common.errors.InvalidConfigurationException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.errors.UnsupportedProtocolFieldException;
import org.apache.kafka.common.internals.Topic;
import org.apache.kafka.common.message.JoinGroupRequestData;
import org.apache.kafka.common.message.JoinGroupResponseData;
Expand All @@ -42,8 +42,7 @@ public Builder(JoinGroupRequestData data) {
@Override
public JoinGroupRequest build(short version) {
if (data.groupInstanceId() != null && version < 5) {
throw new UnsupportedVersionException("The broker join group protocol version " +
version + " does not support usage of config group.instance.id.");
throw new UnsupportedProtocolFieldException("GroupInstanceId", apiKey().name(), version, 5);
}
return new JoinGroupRequest(data, version);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/
package org.apache.kafka.common.requests;

import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.errors.UnsupportedProtocolFieldException;
import org.apache.kafka.common.message.ListGroupsRequestData;
import org.apache.kafka.common.message.ListGroupsResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
Expand Down Expand Up @@ -47,12 +47,10 @@ public Builder(ListGroupsRequestData data) {
@Override
public ListGroupsRequest build(short version) {
if (!data.statesFilter().isEmpty() && version < 4) {
throw new UnsupportedVersionException("The broker only supports ListGroups " +
"v" + version + ", but we need v4 or newer to request groups by states.");
throw new UnsupportedProtocolFieldException("StatesFilter", apiKey().name(), version, 4);
}
if (!data.typesFilter().isEmpty() && version < 5) {
throw new UnsupportedVersionException("The broker only supports ListGroups " +
"v" + version + ", but we need v5 or newer to request groups by type.");
throw new UnsupportedProtocolFieldException("TypesFilter", apiKey().name(), version, 5);
}
return new ListGroupsRequest(data, version);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/
package org.apache.kafka.common.requests;

import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.errors.UnsupportedProtocolFieldException;
import org.apache.kafka.common.message.ListTransactionsRequestData;
import org.apache.kafka.common.message.ListTransactionsResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
Expand All @@ -37,8 +37,7 @@ public Builder(ListTransactionsRequestData data) {
@Override
public ListTransactionsRequest build(short version) {
if (data.durationFilter() >= 0 && version < 1) {
throw new UnsupportedVersionException("Duration filter can be set only when using API version 1 or higher." +
" If client is connected to an older broker, do not specify duration filter or set duration filter to -1.");
throw new UnsupportedProtocolFieldException("DurationFilter", apiKey().name(), version, 1);
}
return new ListTransactionsRequest(data, version);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.kafka.common.requests;

import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.UnsupportedProtocolFieldException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.MetadataRequestData;
import org.apache.kafka.common.message.MetadataRequestData.MetadataRequestTopic;
Expand Down Expand Up @@ -105,16 +106,13 @@ public MetadataRequest build(short version) {
if (version < 1)
throw new UnsupportedVersionException("MetadataRequest versions older than 1 are not supported.");
if (!data.allowAutoTopicCreation() && version < 4)
throw new UnsupportedVersionException("MetadataRequest versions older than 4 don't support the " +
"allowAutoTopicCreation field");
throw new UnsupportedProtocolFieldException("allowAutoTopicCreation", apiKey().name(), version, 4);
if (data.topics() != null) {
data.topics().forEach(topic -> {
if (topic.name() == null && version < 12)
throw new UnsupportedVersionException("MetadataRequest version " + version +
" does not support null topic names.");
throw new UnsupportedProtocolFieldException("null topic names", apiKey().name(), version, 12);
if (!Uuid.ZERO_UUID.equals(topic.topicId()) && version < 12)
throw new UnsupportedVersionException("MetadataRequest version " + version +
" does not support non-zero topic IDs.");
throw new UnsupportedProtocolFieldException("non-zero topic IDs", apiKey().name(), version, 12);
});
}
return new MetadataRequest(data, version);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package org.apache.kafka.common.requests;

import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.errors.UnsupportedProtocolFieldException;
import org.apache.kafka.common.message.OffsetCommitRequestData;
import org.apache.kafka.common.message.OffsetCommitRequestData.OffsetCommitRequestTopic;
import org.apache.kafka.common.message.OffsetCommitResponseData;
Expand Down Expand Up @@ -58,8 +58,7 @@ public Builder(OffsetCommitRequestData data) {
@Override
public OffsetCommitRequest build(short version) {
if (data.groupInstanceId() != null && version < 7) {
throw new UnsupportedVersionException("The broker offset commit protocol version " +
version + " does not support usage of config group.instance.id.");
throw new UnsupportedProtocolFieldException("GroupInstanceId", apiKey().name(), version, 7);
}
return new OffsetCommitRequest(data, version);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.kafka.common.requests;

import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.UnsupportedProtocolFieldException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.OffsetFetchRequestData;
import org.apache.kafka.common.message.OffsetFetchRequestData.OffsetFetchRequestGroup;
Expand Down Expand Up @@ -143,8 +144,7 @@ public OffsetFetchRequest build(short version) {
}
if (data.requireStable() && version < 7) {
if (throwOnFetchStableOffsetsUnsupported) {
throw new UnsupportedVersionException("Broker unexpectedly " +
"doesn't support requireStable flag on version " + version);
throw new UnsupportedProtocolFieldException("RequireStable", apiKey().name(), version, 7);
} else {
log.trace("Fallback the requireStable flag to false as broker " +
"only supports OffsetFetchRequest version {}. Need " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/
package org.apache.kafka.common.requests;

import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.errors.UnsupportedProtocolFieldException;
import org.apache.kafka.common.message.SyncGroupRequestData;
import org.apache.kafka.common.message.SyncGroupResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
Expand All @@ -41,8 +41,7 @@ public Builder(SyncGroupRequestData data) {
@Override
public SyncGroupRequest build(short version) {
if (data.groupInstanceId() != null && version < 3) {
throw new UnsupportedVersionException("The broker sync group protocol version " +
version + " does not support usage of config group.instance.id.");
throw new UnsupportedProtocolFieldException("GroupInstanceId", apiKey().name(), version, 3);
}
return new SyncGroupRequest(data, version);
}
Expand Down
Loading

0 comments on commit 6b93d6e

Please sign in to comment.