From 0681edee78a40200e0968bb51bd3fd28ccbcaf83 Mon Sep 17 00:00:00 2001 From: mohbadar Date: Thu, 16 Jan 2020 11:05:24 +0430 Subject: [PATCH] refactoring --- .../connect/api/KafkaConnectApiResource.java | 5 +- .../java/af/gov/anar/connect/core/Topics.java | 8 -- .../gov/anar/connect/core/init/InitBeans.java | 24 ----- .../core/init/KafkaConnectorsInitiator.java | 92 ------------------- .../KafkaConnectIntegrationService.java | 2 +- 5 files changed, 3 insertions(+), 128 deletions(-) delete mode 100755 src/main/java/af/gov/anar/connect/core/Topics.java delete mode 100755 src/main/java/af/gov/anar/connect/core/init/InitBeans.java delete mode 100755 src/main/java/af/gov/anar/connect/core/init/KafkaConnectorsInitiator.java rename src/main/java/af/gov/anar/connect/{core => service}/KafkaConnectIntegrationService.java (99%) diff --git a/src/main/java/af/gov/anar/connect/api/KafkaConnectApiResource.java b/src/main/java/af/gov/anar/connect/api/KafkaConnectApiResource.java index 600af1f..989b81a 100755 --- a/src/main/java/af/gov/anar/connect/api/KafkaConnectApiResource.java +++ b/src/main/java/af/gov/anar/connect/api/KafkaConnectApiResource.java @@ -1,7 +1,7 @@ package af.gov.anar.connect.api; -import af.gov.anar.connect.core.KafkaConnectIntegrationService; +import af.gov.anar.connect.service.KafkaConnectIntegrationService; import af.gov.anar.connect.util.Utility; import org.json.JSONObject; import org.sourcelab.kafka.connect.apiclient.request.dto.*; @@ -17,7 +17,7 @@ import java.util.Map; @RestController -@RequestMapping(value = "/api/integrations") +@RequestMapping(value = "/api/connect") public class KafkaConnectApiResource { @Autowired @@ -83,7 +83,6 @@ ResponseEntity getConnectorStatus(@PathVariable(name = "name", ResponseEntity addConnector(@RequestBody(required = true) String connectorString) { JSONObject jsonConfig = utility.parse(connectorString); - String connectorName = jsonConfig.getString("name"); String connectorConfig =jsonConfig.getString("config"); diff --git a/src/main/java/af/gov/anar/connect/core/Topics.java b/src/main/java/af/gov/anar/connect/core/Topics.java deleted file mode 100755 index fa405b3..0000000 --- a/src/main/java/af/gov/anar/connect/core/Topics.java +++ /dev/null @@ -1,8 +0,0 @@ -package af.gov.anar.connect.core; - -public class Topics { - - public final static String ODKX_POSTGRES_INSTANCE_TOPIC = "asmis.postgres.instance"; - public final static String ODKX_ELASTICSEARCH_INSTANCE_TOPIC="asims.elasticsearch.instance"; - -} diff --git a/src/main/java/af/gov/anar/connect/core/init/InitBeans.java b/src/main/java/af/gov/anar/connect/core/init/InitBeans.java deleted file mode 100755 index 38a9c58..0000000 --- a/src/main/java/af/gov/anar/connect/core/init/InitBeans.java +++ /dev/null @@ -1,24 +0,0 @@ -package af.gov.anar.connect.core.init; - -import org.springframework.boot.autoconfigure.kafka.KafkaProperties; -import org.springframework.context.annotation.Bean; -import org.springframework.stereotype.Component; - -@Component -public class InitBeans { - - - // @PostConstruct - // public void init() - // { - // new NewTopic(topicName, 3, (short) 1); - // } - - @Bean - public KafkaProperties kafkaProperties() - { - return new KafkaProperties(); - } - - -} diff --git a/src/main/java/af/gov/anar/connect/core/init/KafkaConnectorsInitiator.java b/src/main/java/af/gov/anar/connect/core/init/KafkaConnectorsInitiator.java deleted file mode 100755 index 6fb8503..0000000 --- a/src/main/java/af/gov/anar/connect/core/init/KafkaConnectorsInitiator.java +++ /dev/null @@ -1,92 +0,0 @@ -package af.gov.anar.connect.core.init; - - -import af.gov.anar.connect.core.KafkaConnectIntegrationService; -import af.gov.anar.connect.core.Topics; -import org.apache.kafka.clients.admin.NewTopic; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Service; - -import javax.annotation.PostConstruct; -import java.util.HashMap; -import java.util.Map; - -@Service -public class KafkaConnectorsInitiator { - - @Autowired - KafkaConnectIntegrationService kafkaConnectIntegrationService; - - - @PostConstruct - public void init(){ - createDefaultTopics(); - createAsimsPostgresConnetor(); - createElasticsearchSinkConnector(); - } - - public void createAsimsPostgresConnetor() - { - Map config = new HashMap<>(); - config.put("connector.class", "io.confluent.connect.jdbc.JdbcSourceConnector"); -// config.put("errors.log.include.messages", "false"); - config.put("tasks.max", "5"); - config.put("table.types", "TABLE"); - config.put("table.whitelist","instance"); - config.put("mode","bulk"); - config.put("topic.prefix","asims-test-"); -// config.put("poll.interval.ms","5000"); -// config.put("db.timezone","UTC"); - config.put("value.converter","org.apache.kafka.connect.json.JsonConverter"); -// config.put("config.action.reload","restart"); - config.put("errors.log.enable","false"); - config.put("key.converter","org.apache.kafka.connect.json.JsonConverter"); -// config.put("errors.retry.timeout","0"); -// config.put("validate.non.null","true"); -// config.put("connection.attempts","3"); -// config.put("errors.retry.delay.max.ms","60000"); -// config.put("batch.max.rows","100"); -// config.put("connection.backoff.ms","10000"); -// config.put("timestamp.delay.interval.ms","0"); -// config.put("table.poll.interval.ms","60000"); - config.put("value.converter.schemas.enable","false"); -// config.put("errors.tolerance","none"); - config.put("connection.url","jdbc:postgresql://asims.gov.af:5432/asims?user=asims_user&password=secret"); - config.put("numeric.precision.mapping","false"); - config.put("quote.sql.identifiers","ALWAYS"); - - - kafkaConnectIntegrationService.deployConnector("Asims-postgresql-1", config); - - } - - - - public void createElasticsearchSinkConnector(){ - - - Map config = new HashMap<>(); - config.put("connector.class", "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector"); - config.put("connection.url", "http://localhost:9200"); - config.put("tasks.max", "5"); - config.put("topics", Topics.ODKX_ELASTICSEARCH_INSTANCE_TOPIC); -// config.put("topic.index.map", "asims:instance"); - config.put("type.name","kafka-connect"); -// config.put("key.ignore","false"); -// config.put("schema.ignore","false"); -// config.put("drop.invalid.message", "true"); -// config.put("behavior.on.null.values", "ignore"); -// config.put("behavior.on.malformed.documents", "ignore"); - - - kafkaConnectIntegrationService.deployConnector("Asims-elasticsearch-1", config); - - - } - - - private void createDefaultTopics(){ - new NewTopic("asims-test-instance", 3, (short) 1); - } - -} diff --git a/src/main/java/af/gov/anar/connect/core/KafkaConnectIntegrationService.java b/src/main/java/af/gov/anar/connect/service/KafkaConnectIntegrationService.java similarity index 99% rename from src/main/java/af/gov/anar/connect/core/KafkaConnectIntegrationService.java rename to src/main/java/af/gov/anar/connect/service/KafkaConnectIntegrationService.java index 4517c70..4ce5aa8 100755 --- a/src/main/java/af/gov/anar/connect/core/KafkaConnectIntegrationService.java +++ b/src/main/java/af/gov/anar/connect/service/KafkaConnectIntegrationService.java @@ -1,4 +1,4 @@ -package af.gov.anar.connect.core; +package af.gov.anar.connect.service; import af.gov.anar.connect.util.Utility;