From c06e5455bde2aa12ffec60580f1cc01601871a6d Mon Sep 17 00:00:00 2001 From: David Venable Date: Tue, 10 Oct 2023 07:56:36 -0700 Subject: [PATCH] Fix broken build and clean up KafkaSource class. (#3469) Signed-off-by: David Venable --- .../source/MskGlueRegistryMultiTypeIT.java | 1 - .../plugins/kafka/source/KafkaSource.java | 188 ++---------------- 2 files changed, 13 insertions(+), 176 deletions(-) diff --git a/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/MskGlueRegistryMultiTypeIT.java b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/MskGlueRegistryMultiTypeIT.java index 12661df8d9..19d2e755e2 100644 --- a/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/MskGlueRegistryMultiTypeIT.java +++ b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/MskGlueRegistryMultiTypeIT.java @@ -49,7 +49,6 @@ import java.util.Collection; import java.util.Collections; import java.util.List; -import java.util.Map; import java.util.Properties; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSource.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSource.java index 388367d6ae..b668a9a94d 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSource.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSource.java @@ -7,7 +7,6 @@ import com.amazonaws.services.schemaregistry.deserializers.GlueSchemaRegistryKafkaDeserializer; import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient; import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; import io.confluent.kafka.serializers.KafkaAvroDeserializer; @@ -15,15 +14,11 @@ import io.confluent.kafka.serializers.KafkaJsonDeserializer; import kafka.common.BrokerEndPointNotAvailableException; import org.apache.avro.generic.GenericRecord; -import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; -import org.apache.kafka.clients.admin.AdminClient; -import org.apache.kafka.clients.admin.KafkaAdminClient; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.errors.BrokerNotAvailableException; -import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.connect.json.JsonDeserializer; import org.opensearch.dataprepper.metrics.PluginMetrics; @@ -50,24 +45,13 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.BufferedReader; import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.net.HttpURLConnection; -import java.net.InetAddress; -import java.net.Socket; -import java.net.URI; -import java.net.URISyntaxException; -import java.net.URL; import java.util.ArrayList; -import java.util.Arrays; import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Properties; -import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -87,15 +71,14 @@ public class KafkaSource implements Source> { private static final long RETRY_SLEEP_INTERVAL = 30000; private static final Logger LOG = LoggerFactory.getLogger(KafkaSource.class); private final KafkaSourceConfig sourceConfig; - private AtomicBoolean shutdownInProgress; + private final AtomicBoolean shutdownInProgress; private ExecutorService executorService; private final PluginMetrics pluginMetrics; private KafkaCustomConsumer consumer; private KafkaConsumer kafkaConsumer; - private String pipelineName; + private final String pipelineName; private String consumerGroupID; private String schemaType = MessageFormat.PLAINTEXT.toString(); - private static final String SCHEMA_TYPE = "schemaType"; private final AcknowledgementSetManager acknowledgementSetManager; private static CachedSchemaRegistryClient schemaRegistryClient; private GlueSchemaRegistryKafkaDeserializer glueDeserializer; @@ -176,7 +159,7 @@ public void start(Buffer> buffer) { case JSON: return new KafkaConsumer(consumerProperties); case AVRO: - return new KafkaConsumer(consumerProperties); + return new KafkaConsumer(consumerProperties); case PLAINTEXT: default: glueDeserializer = KafkaSecurityConfigurer.getGlueSerializer(sourceConfig); @@ -233,7 +216,7 @@ KafkaConsumer getConsumer() { } private Properties getConsumerProperties(final TopicConfig topicConfig, final Properties authProperties) { - Properties properties = (Properties)authProperties.clone(); + Properties properties = (Properties) authProperties.clone(); if (StringUtils.isNotEmpty(sourceConfig.getClientDnsLookup())) { ClientDNSLookupType dnsLookupType = ClientDNSLookupType.getDnsLookupType(sourceConfig.getClientDnsLookup()); switch (dnsLookupType) { @@ -254,82 +237,10 @@ private Properties getConsumerProperties(final TopicConfig topicConfig, final Pr return properties; } - private static boolean validateURL(String url) { - try { - URI uri = new URI(url); - if (uri.getScheme() == null || uri.getHost() == null) { - return false; - } - return true; - } catch (URISyntaxException ex) { - LOG.error("Invalid Schema Registry URI: ", ex); - return false; - } - } - private String getSchemaRegistryUrl() { return sourceConfig.getSchemaConfig().getRegistryURL(); } - private static String getSchemaType(final String registryUrl, final String topicName, final int schemaVersion) { - StringBuilder response = new StringBuilder(); - String schemaType = MessageFormat.PLAINTEXT.toString(); - try { - String urlPath = registryUrl + "subjects/" + topicName + "-value/versions/" + schemaVersion; - URL url = new URL(urlPath); - HttpURLConnection connection = (HttpURLConnection) url.openConnection(); - connection.setRequestMethod("GET"); - int responseCode = connection.getResponseCode(); - if (responseCode == HttpURLConnection.HTTP_OK) { - BufferedReader reader = new BufferedReader(new InputStreamReader(connection.getInputStream())); - String inputLine; - while ((inputLine = reader.readLine()) != null) { - response.append(inputLine); - } - reader.close(); - ObjectMapper mapper = new ObjectMapper(); - Object json = mapper.readValue(response.toString(), Object.class); - String indented = mapper.writerWithDefaultPrettyPrinter().writeValueAsString(json); - JsonNode rootNode = mapper.readTree(indented); - // If the entry exists but schema type doesn't exist then - // the schemaType defaults to AVRO - if (rootNode.has(SCHEMA_TYPE)) { - JsonNode node = rootNode.findValue(SCHEMA_TYPE); - schemaType = node.textValue(); - } else { - schemaType = MessageFormat.AVRO.toString(); - } - } else { - InputStream errorStream = connection.getErrorStream(); - String errorMessage = readErrorMessage(errorStream); - // Plaintext is not a valid schematype in schema registry - // So, if it doesn't exist in schema regitry, default - // the schemaType to PLAINTEXT - LOG.error("GET request failed while fetching the schema registry. Defaulting to schema type PLAINTEXT"); - return MessageFormat.PLAINTEXT.toString(); - } - } catch (IOException e) { - LOG.error("An error while fetching the schema registry details : ", e); - throw new RuntimeException(); - } - return schemaType; - } - - private static String readErrorMessage(InputStream errorStream) throws IOException { - if (errorStream == null) { - return null; - } - BufferedReader reader = new BufferedReader(new InputStreamReader(errorStream)); - StringBuilder errorMessage = new StringBuilder(); - String line; - while ((line = reader.readLine()) != null) { - errorMessage.append(line); - } - reader.close(); - errorStream.close(); - return errorMessage.toString(); - } - private void setSchemaRegistryProperties(Properties properties, TopicConfig topicConfig) { SchemaConfig schemaConfig = sourceConfig.getSchemaConfig(); if (Objects.isNull(schemaConfig)) { @@ -394,24 +305,24 @@ private void setPropertiesForSchemaType(Properties properties, TopicConfig topic private void setConsumerTopicProperties(Properties properties, TopicConfig topicConfig) { properties.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupID); - properties.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, (int)topicConfig.getMaxPartitionFetchBytes()); - properties.put(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG, ((Long)topicConfig.getRetryBackoff().toMillis()).intValue()); - properties.put(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG, ((Long)topicConfig.getReconnectBackoff().toMillis()).intValue()); + properties.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, (int) topicConfig.getMaxPartitionFetchBytes()); + properties.put(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG, ((Long) topicConfig.getRetryBackoff().toMillis()).intValue()); + properties.put(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG, ((Long) topicConfig.getReconnectBackoff().toMillis()).intValue()); properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, topicConfig.getAutoCommit()); properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, - ((Long)topicConfig.getCommitInterval().toMillis()).intValue()); + ((Long) topicConfig.getCommitInterval().toMillis()).intValue()); properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, topicConfig.getAutoOffsetReset()); properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, topicConfig.getConsumerMaxPollRecords()); properties.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, - ((Long)topicConfig.getMaxPollInterval().toMillis()).intValue()); - properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, ((Long)topicConfig.getSessionTimeOut().toMillis()).intValue()); - properties.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, ((Long)topicConfig.getHeartBeatInterval().toMillis()).intValue()); - properties.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, (int)topicConfig.getFetchMaxBytes()); + ((Long) topicConfig.getMaxPollInterval().toMillis()).intValue()); + properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, ((Long) topicConfig.getSessionTimeOut().toMillis()).intValue()); + properties.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, ((Long) topicConfig.getHeartBeatInterval().toMillis()).intValue()); + properties.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, (int) topicConfig.getFetchMaxBytes()); properties.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, topicConfig.getFetchMaxWait()); - properties.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, (int)topicConfig.getFetchMinBytes()); + properties.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, (int) topicConfig.getFetchMinBytes()); } private void setPropertiesForSchemaRegistryConnectivity(Properties properties) { @@ -436,79 +347,6 @@ private void setPropertiesForSchemaRegistryConnectivity(Properties properties) { } } - private void isTopicExists(String topicName, String bootStrapServer, Properties properties) { - List bootStrapServers = new ArrayList<>(); - String servers[]; - if (bootStrapServer.contains(",")) { - servers = bootStrapServer.split(","); - bootStrapServers.addAll(Arrays.asList(servers)); - } else { - bootStrapServers.add(bootStrapServer); - } - properties.put("connections.max.idle.ms", 5000); - properties.put("request.timeout.ms", 10000); - try (AdminClient client = KafkaAdminClient.create(properties)) { - boolean topicExists = client.listTopics().names().get().stream().anyMatch(name -> name.equalsIgnoreCase(topicName)); - } catch (InterruptedException | ExecutionException e) { - if (e.getCause() instanceof UnknownTopicOrPartitionException) { - LOG.error("Topic does not exist: " + topicName); - } - throw new RuntimeException("Exception while checking the topics availability..."); - } - } - - private boolean isKafkaClusterExists(String bootStrapServers) { - Socket socket = null; - String[] serverDetails = new String[0]; - String[] servers = new String[0]; - int counter = 0; - try { - if (bootStrapServers.contains(",")) { - servers = bootStrapServers.split(","); - } else { - servers = new String[]{bootStrapServers}; - } - if (CollectionUtils.isNotEmpty(Arrays.asList(servers))) { - for (String bootstrapServer : servers) { - if (bootstrapServer.contains(":")) { - serverDetails = bootstrapServer.split(":"); - if (StringUtils.isNotEmpty(serverDetails[0])) { - InetAddress inetAddress = InetAddress.getByName(serverDetails[0]); - socket = new Socket(inetAddress, Integer.parseInt(serverDetails[1])); - } - } - } - } - } catch (IOException e) { - counter++; - LOG.error("Kafka broker : {} is not available...", getMaskedBootStrapDetails(serverDetails[0])); - } finally { - if (socket != null) { - try { - socket.close(); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - } - if (counter == servers.length) { - return true; - } - return false; - } - - private String getMaskedBootStrapDetails(String serverIP) { - if (serverIP == null || serverIP.length() <= 4) { - return serverIP; - } - int maskedLength = serverIP.length() - 4; - StringBuilder maskedString = new StringBuilder(maskedLength); - for (int i = 0; i < maskedLength; i++) { - maskedString.append('*'); - } - return maskedString.append(serverIP.substring(maskedLength)).toString(); - } - protected void sleep(final long millis) throws InterruptedException { Thread.sleep(millis); }