Skip to content
This repository was archived by the owner on Sep 26, 2024. It is now read-only.

Commit

Permalink
MINOR: Code Cleanup - Connect Module (apache#16066)
Browse files Browse the repository at this point in the history
Reviewers: Chia-Ping Tsai <[email protected]>
  • Loading branch information
sjhajharia authored May 30, 2024
1 parent 33a292e commit e974914
Show file tree
Hide file tree
Showing 32 changed files with 187 additions and 191 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public SchemaBuilder(Type type) {

@Override
public boolean isOptional() {
return optional == null ? false : optional;
return optional != null && optional;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public void testReconfigureStopException() {

private static class TestConnector extends Connector {

private boolean stopException;
private final boolean stopException;
private int order = 0;
public int stopOrder = -1;
public int configureOrder = -1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public void testNumericTypeProjection() {
expectedProjected.put(values[2], Arrays.asList(32767, 32767L, 32767.F, 32767.));
expectedProjected.put(values[3], Arrays.asList(327890L, 327890.F, 327890.));
expectedProjected.put(values[4], Arrays.asList(1.2F, 1.2));
expectedProjected.put(values[5], Arrays.asList(1.2345));
expectedProjected.put(values[5], Collections.singletonList(1.2345));

Object promoted;
for (int i = 0; i < promotableSchemas.length; ++i) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,29 +33,29 @@ public class ConnectorUtilsTest {
public void testGroupPartitions() {

List<List<Integer>> grouped = ConnectorUtils.groupPartitions(FIVE_ELEMENTS, 1);
assertEquals(Arrays.asList(FIVE_ELEMENTS), grouped);
assertEquals(Collections.singletonList(FIVE_ELEMENTS), grouped);

grouped = ConnectorUtils.groupPartitions(FIVE_ELEMENTS, 2);
assertEquals(Arrays.asList(Arrays.asList(1, 2, 3), Arrays.asList(4, 5)), grouped);

grouped = ConnectorUtils.groupPartitions(FIVE_ELEMENTS, 3);
assertEquals(Arrays.asList(Arrays.asList(1, 2),
Arrays.asList(3, 4),
Arrays.asList(5)), grouped);
Collections.singletonList(5)), grouped);

grouped = ConnectorUtils.groupPartitions(FIVE_ELEMENTS, 5);
assertEquals(Arrays.asList(Arrays.asList(1),
Arrays.asList(2),
Arrays.asList(3),
Arrays.asList(4),
Arrays.asList(5)), grouped);
assertEquals(Arrays.asList(Collections.singletonList(1),
Collections.singletonList(2),
Collections.singletonList(3),
Collections.singletonList(4),
Collections.singletonList(5)), grouped);

grouped = ConnectorUtils.groupPartitions(FIVE_ELEMENTS, 7);
assertEquals(Arrays.asList(Arrays.asList(1),
Arrays.asList(2),
Arrays.asList(3),
Arrays.asList(4),
Arrays.asList(5),
assertEquals(Arrays.asList(Collections.singletonList(1),
Collections.singletonList(2),
Collections.singletonList(3),
Collections.singletonList(4),
Collections.singletonList(5),
Collections.emptyList(),
Collections.emptyList()), grouped);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertEquals;
Expand Down Expand Up @@ -60,7 +61,7 @@ public void testPutFlush() {

// We do not call task.start() since it would override the output stream

task.put(Arrays.asList(
task.put(Collections.singletonList(
new SinkRecord("topic1", 0, null, null, Schema.STRING_SCHEMA, "line1", 1)
));
offsets.put(new TopicPartition("topic1", 0), new OffsetAndMetadata(1L));
Expand All @@ -85,7 +86,7 @@ public void testStart() throws IOException {
task.start(props);

HashMap<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
task.put(Arrays.asList(
task.put(Collections.singletonList(
new SinkRecord("topic1", 0, null, null, Schema.STRING_SCHEMA, "line0", 1)
));
offsets.put(new TopicPartition("topic1", 0), new OffsetAndMetadata(1L));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public void testStopResumeSavedOffset() throws Exception {
// Append NUM_LINES more lines to the file
try (PrintStream printStream = new PrintStream(Files.newOutputStream(sourceFile.toPath(), StandardOpenOption.APPEND))) {
for (int i = NUM_LINES; i < 2 * NUM_LINES; i++) {
printStream.println(String.format(LINE_FORMAT, i));
printStream.printf(LINE_FORMAT + "%n", i);
}
}

Expand Down Expand Up @@ -197,7 +197,7 @@ private File createTempFile(int numLines) throws Exception {

try (PrintStream printStream = new PrintStream(Files.newOutputStream(sourceFile.toPath()))) {
for (int i = 0; i < numLines; i++) {
printStream.println(String.format(LINE_FORMAT, i));
printStream.printf(LINE_FORMAT + "%n", i);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.glassfish.hk2.utilities.binding.AbstractBinder;
import org.glassfish.jersey.server.ResourceConfig;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
Expand All @@ -48,7 +47,7 @@ public void initializeInternalResources(Map<SourceAndTarget, Herder> herders) {

@Override
protected Collection<Class<?>> regularResources() {
return Arrays.asList(
return Collections.singletonList(
InternalMirrorResource.class
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@
public class FakeLocalMetadataStore {
private static final Logger log = LoggerFactory.getLogger(FakeLocalMetadataStore.class);

private static ConcurrentHashMap<String, ConcurrentHashMap<String, String>> allTopics = new ConcurrentHashMap<>();
private static ConcurrentHashMap<String, Vector<AclBinding>> allAcls = new ConcurrentHashMap<>();
private static final ConcurrentHashMap<String, ConcurrentHashMap<String, String>> ALL_TOPICS = new ConcurrentHashMap<>();
private static final ConcurrentHashMap<String, Vector<AclBinding>> ALL_ACLS = new ConcurrentHashMap<>();

/**
* Add topic to allTopics.
Expand All @@ -44,7 +44,7 @@ public class FakeLocalMetadataStore {
public static void addTopicToLocalMetadataStore(NewTopic newTopic) {
ConcurrentHashMap<String, String> configs = new ConcurrentHashMap<>(newTopic.configs());
configs.putIfAbsent("partitions", String.valueOf(newTopic.numPartitions()));
allTopics.putIfAbsent(newTopic.name(), configs);
ALL_TOPICS.putIfAbsent(newTopic.name(), configs);
}

/**
Expand All @@ -53,9 +53,9 @@ public static void addTopicToLocalMetadataStore(NewTopic newTopic) {
* @param newPartitionCount new partition count.
*/
public static void updatePartitionCount(String topic, int newPartitionCount) {
ConcurrentHashMap<String, String> configs = FakeLocalMetadataStore.allTopics.getOrDefault(topic, new ConcurrentHashMap<>());
ConcurrentHashMap<String, String> configs = FakeLocalMetadataStore.ALL_TOPICS.getOrDefault(topic, new ConcurrentHashMap<>());
configs.compute("partitions", (key, value) -> String.valueOf(newPartitionCount));
FakeLocalMetadataStore.allTopics.putIfAbsent(topic, configs);
FakeLocalMetadataStore.ALL_TOPICS.putIfAbsent(topic, configs);
}

/**
Expand All @@ -64,7 +64,7 @@ public static void updatePartitionCount(String topic, int newPartitionCount) {
* @param newConfig topic config
*/
public static void updateTopicConfig(String topic, Config newConfig) {
ConcurrentHashMap<String, String> topicConfigs = FakeLocalMetadataStore.allTopics.getOrDefault(topic, new ConcurrentHashMap<>());
ConcurrentHashMap<String, String> topicConfigs = FakeLocalMetadataStore.ALL_TOPICS.getOrDefault(topic, new ConcurrentHashMap<>());
newConfig.entries().stream().forEach(configEntry -> {
if (configEntry.name() != null) {
if (configEntry.value() != null) {
Expand All @@ -75,7 +75,7 @@ public static void updateTopicConfig(String topic, Config newConfig) {
}
}
});
FakeLocalMetadataStore.allTopics.putIfAbsent(topic, topicConfigs);
FakeLocalMetadataStore.ALL_TOPICS.putIfAbsent(topic, topicConfigs);
}

/**
Expand All @@ -84,7 +84,7 @@ public static void updateTopicConfig(String topic, Config newConfig) {
* @return true if topic name is a key in allTopics
*/
public static Boolean containsTopic(String topic) {
return allTopics.containsKey(topic);
return ALL_TOPICS.containsKey(topic);
}

/**
Expand All @@ -93,7 +93,7 @@ public static Boolean containsTopic(String topic) {
* @return topic configurations.
*/
public static Map<String, String> topicConfig(String topic) {
return allTopics.getOrDefault(topic, new ConcurrentHashMap<>());
return ALL_TOPICS.getOrDefault(topic, new ConcurrentHashMap<>());
}

/**
Expand All @@ -102,7 +102,7 @@ public static Map<String, String> topicConfig(String topic) {
* @return {@link List<AclBinding>}
*/
public static List<AclBinding> aclBindings(String aclPrinciple) {
return FakeLocalMetadataStore.allAcls.getOrDefault("User:" + aclPrinciple, new Vector<>());
return FakeLocalMetadataStore.ALL_ACLS.getOrDefault("User:" + aclPrinciple, new Vector<>());
}

/**
Expand All @@ -111,16 +111,16 @@ public static List<AclBinding> aclBindings(String aclPrinciple) {
* @param aclBinding {@link AclBinding}
*/
public static void addACLs(String principal, AclBinding aclBinding) {
Vector<AclBinding> aclBindings = FakeLocalMetadataStore.allAcls.getOrDefault(principal, new Vector<>());
Vector<AclBinding> aclBindings = FakeLocalMetadataStore.ALL_ACLS.getOrDefault(principal, new Vector<>());
aclBindings.add(aclBinding);
FakeLocalMetadataStore.allAcls.putIfAbsent(principal, aclBindings);
FakeLocalMetadataStore.ALL_ACLS.putIfAbsent(principal, aclBindings);
}

/**
* clear allTopics and allAcls.
*/
public static void clear() {
allTopics.clear();
allAcls.clear();
ALL_TOPICS.clear();
ALL_ACLS.clear();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -164,15 +164,15 @@ public void startClusters() throws Exception {
startClusters(additionalConfig);

try (Admin adminClient = primary.kafka().createAdminClient()) {
adminClient.createAcls(Arrays.asList(
adminClient.createAcls(Collections.singletonList(
new AclBinding(
new ResourcePattern(ResourceType.TOPIC, "*", PatternType.LITERAL),
new AccessControlEntry("User:connector", "*", AclOperation.ALL, AclPermissionType.ALLOW)
)
)).all().get();
}
try (Admin adminClient = backup.kafka().createAdminClient()) {
adminClient.createAcls(Arrays.asList(
adminClient.createAcls(Collections.singletonList(
new AclBinding(
new ResourcePattern(ResourceType.TOPIC, "*", PatternType.LITERAL),
new AccessControlEntry("User:connector", "*", AclOperation.ALL, AclPermissionType.ALLOW)
Expand Down Expand Up @@ -293,7 +293,7 @@ public void testSyncTopicConfigUseProvidedForwardingAdmin() throws Exception {
public void testSyncTopicACLsUseProvidedForwardingAdmin() throws Exception {
mm2Props.put("sync.topic.acls.enabled", "true");
mm2Config = new MirrorMakerConfig(mm2Props);
List<AclBinding> aclBindings = Arrays.asList(
List<AclBinding> aclBindings = Collections.singletonList(
new AclBinding(
new ResourcePattern(ResourceType.TOPIC, "test-topic-1", PatternType.LITERAL),
new AccessControlEntry("User:dummy", "*", AclOperation.DESCRIBE, AclPermissionType.ALLOW)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ public <R extends ConnectRecord<R>> List<TransformationStage<R>> transformationS
@SuppressWarnings("unchecked")
Predicate<R> predicate = Utils.newInstance(getClass(predicatePrefix + "type"), Predicate.class);
predicate.configure(originalsWithPrefix(predicatePrefix));
transformations.add(new TransformationStage<>(predicate, negate == null ? false : Boolean.parseBoolean(negate.toString()), transformation));
transformations.add(new TransformationStage<>(predicate, negate != null && Boolean.parseBoolean(negate.toString()), transformation));
} else {
transformations.add(new TransformationStage<>(transformation));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;

public class ConnectRestServer extends RestServer {
Expand Down Expand Up @@ -56,7 +57,7 @@ protected Collection<Class<?>> regularResources() {

@Override
protected Collection<Class<?>> adminResources() {
return Arrays.asList(
return Collections.singletonList(
LoggingResource.class
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,17 +85,15 @@ public int hashCode() {

@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("[")
.append(name)
.append(",")
.append(errorCount)
.append(",")
.append(groups)
.append(",")
.append(configs)
.append("]");
return sb.toString();
return "[" +
name +
"," +
errorCount +
"," +
groups +
"," +
configs +
"]";
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -141,30 +141,28 @@ public int hashCode() {

@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("[")
.append(name)
.append(",")
.append(type)
.append(",")
.append(required)
.append(",")
.append(defaultValue)
.append(",")
.append(importance)
.append(",")
.append(documentation)
.append(",")
.append(group)
.append(",")
.append(orderInGroup)
.append(",")
.append(width)
.append(",")
.append(displayName)
.append(",")
.append(dependents)
.append("]");
return sb.toString();
return "[" +
name +
"," +
type +
"," +
required +
"," +
defaultValue +
"," +
importance +
"," +
documentation +
"," +
group +
"," +
orderInGroup +
"," +
width +
"," +
displayName +
"," +
dependents +
"]";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,19 +87,17 @@ public int hashCode() {

@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("[")
.append(name)
.append(",")
.append(value)
.append(",")
.append(recommendedValues)
.append(",")
.append(errors)
.append(",")
.append(visible)
.append("]");
return sb.toString();
return "[" +
name +
"," +
value +
"," +
recommendedValues +
"," +
errors +
"," +
visible +
"]";
}

}
Loading

0 comments on commit e974914

Please sign in to comment.