From 657629b1d3e3fbe60e574990c41a9e9fb67476e5 Mon Sep 17 00:00:00 2001 From: KrutikaPhirangi <138781661+KrutikaPhirangi@users.noreply.github.com> Date: Mon, 4 Mar 2024 19:00:37 +0530 Subject: [PATCH 1/7] fix : flink version update --- hcx-pipeline-jobs/claims-job/pom.xml | 10 ++-- hcx-pipeline-jobs/communication-job/pom.xml | 10 ++-- .../composite-search-job/pom.xml | 8 +-- hcx-pipeline-jobs/core/pom.xml | 13 +++-- .../function/BaseDispatcherFunction.scala | 54 +++++++++---------- .../function/ContextEnrichmentFunction.scala | 4 +- .../swasth/dp/core/job/BaseJobConfig.scala | 6 +-- .../dp/core/job/FlinkKafkaConnector.scala | 42 ++++++++++----- .../org/swasth/dp/core/serde/MapSerde.scala | 32 +++++------ .../swasth/dp/core/serde/StringSerde.scala | 18 +++---- .../coverage-eligibility-job/pom.xml | 10 ++-- hcx-pipeline-jobs/fetch-job/pom.xml | 10 ++-- .../jobs-distribution/Dockerfile | 2 +- hcx-pipeline-jobs/jobs-distribution/pom.xml | 2 +- hcx-pipeline-jobs/message-service/pom.xml | 10 ++-- .../task/MessageServiceStreamTask.java | 6 ++- hcx-pipeline-jobs/notification-job/pom.xml | 12 ++--- .../task/NotificationStreamTask.java | 16 +++--- .../notification-trigger-job/pom.xml | 12 ++--- .../task/NotificationTriggerStreamTask.java | 8 +-- hcx-pipeline-jobs/payments-job/pom.xml | 10 ++-- hcx-pipeline-jobs/pom.xml | 6 +-- hcx-pipeline-jobs/preauth-job/pom.xml | 10 ++-- .../predetermination-job/pom.xml | 10 ++-- .../protocol-request-processor/pom.xml | 10 ++-- .../ProtocolRequestProcessorStreamTask.java | 8 +-- hcx-pipeline-jobs/retry-job/pom.xml | 10 ++-- hcx-pipeline-jobs/search-response-job/pom.xml | 8 +-- 28 files changed, 192 insertions(+), 165 deletions(-) diff --git a/hcx-pipeline-jobs/claims-job/pom.xml b/hcx-pipeline-jobs/claims-job/pom.xml index 69566bb1f..e0add4699 100644 --- a/hcx-pipeline-jobs/claims-job/pom.xml +++ b/hcx-pipeline-jobs/claims-job/pom.xml @@ -40,20 +40,20 @@ org.apache.flink - flink-streaming-java_${scala.maj.version} + flink-streaming-java ${flink.version} org.apache.flink - flink-clients_${scala.maj.version} + flink-clients ${flink.version} org.apache.flink - flink-connector-kafka_${scala.maj.version} - ${flink.version} + flink-connector-kafka + 3.0.1-1.18 org.swasth @@ -89,7 +89,7 @@ org.apache.flink - flink-streaming-java_2.12 + flink-streaming-java ${flink.version} test tests diff --git a/hcx-pipeline-jobs/communication-job/pom.xml b/hcx-pipeline-jobs/communication-job/pom.xml index 0f6621c2e..efed7511d 100644 --- a/hcx-pipeline-jobs/communication-job/pom.xml +++ b/hcx-pipeline-jobs/communication-job/pom.xml @@ -40,20 +40,20 @@ org.apache.flink - flink-streaming-java_${scala.maj.version} + flink-streaming-java ${flink.version} org.apache.flink - flink-clients_${scala.maj.version} + flink-clients ${flink.version} org.apache.flink - flink-connector-kafka_${scala.maj.version} - ${flink.version} + flink-connector-kafka + 3.0.1-1.18 org.swasth @@ -89,7 +89,7 @@ org.apache.flink - flink-streaming-java_2.12 + flink-streaming-java ${flink.version} test tests diff --git a/hcx-pipeline-jobs/composite-search-job/pom.xml b/hcx-pipeline-jobs/composite-search-job/pom.xml index 5167f3366..80b4fb157 100644 --- a/hcx-pipeline-jobs/composite-search-job/pom.xml +++ b/hcx-pipeline-jobs/composite-search-job/pom.xml @@ -69,13 +69,13 @@ org.apache.flink - flink-test-utils_2.12 + flink-test-utils ${flink.version} test org.apache.flink - flink-runtime_2.12 + flink-runtime ${flink.version} test tests @@ -100,7 +100,7 @@ org.apache.flink - flink-streaming-java_2.12 + flink-streaming-java ${flink.version} test tests @@ -119,7 +119,7 @@ org.apache.flink - flink-clients_${scala.maj.version} + flink-clients ${flink.version} diff --git a/hcx-pipeline-jobs/core/pom.xml b/hcx-pipeline-jobs/core/pom.xml index 03a49182f..f4d21c590 100644 --- a/hcx-pipeline-jobs/core/pom.xml +++ b/hcx-pipeline-jobs/core/pom.xml @@ -25,8 +25,8 @@ org.apache.flink - flink-connector-kafka_${scala.maj.version} - ${flink.version} + flink-connector-kafka + 3.0.1-1.18 org.apache.kafka @@ -94,12 +94,12 @@ org.apache.kafka kafka_${scala.maj.version} - 2.8.0 + ${kafka.version} test org.apache.flink - flink-test-utils_2.12 + flink-test-utils ${flink.version} test @@ -138,6 +138,11 @@ elasticsearch-rest-high-level-client 7.2.1 + + com.typesafe + config + 1.4.2 + diff --git a/hcx-pipeline-jobs/core/src/main/scala/org/swasth/dp/core/function/BaseDispatcherFunction.scala b/hcx-pipeline-jobs/core/src/main/scala/org/swasth/dp/core/function/BaseDispatcherFunction.scala index 822059d71..6f295920b 100644 --- a/hcx-pipeline-jobs/core/src/main/scala/org/swasth/dp/core/function/BaseDispatcherFunction.scala +++ b/hcx-pipeline-jobs/core/src/main/scala/org/swasth/dp/core/function/BaseDispatcherFunction.scala @@ -43,10 +43,10 @@ abstract class BaseDispatcherFunction(config: BaseJobConfig) postgresConnect.closeConnection() } - def validate(event: util.Map[String, AnyRef]): ValidationResult + def validate(event: java.util.Map[String, AnyRef]): ValidationResult @throws(classOf[Exception]) - def getPayload(payloadRefId: String): util.Map[String, AnyRef] = { + def getPayload(payloadRefId: String): java.util.Map[String, AnyRef] = { Console.println("Fetching payload from postgres for mid: " + payloadRefId) logger.info("Fetching payload from postgres for mid: " + payloadRefId) val postgresQuery = String.format("SELECT data FROM %s WHERE mid = '%s'", config.postgresTable, payloadRefId); @@ -55,7 +55,7 @@ abstract class BaseDispatcherFunction(config: BaseJobConfig) val resultSet = preparedStatement.executeQuery() if (resultSet.next()) { val payload = resultSet.getString(1) - JSONUtil.deserialize[util.Map[String, AnyRef]](payload) + JSONUtil.deserialize[java.util.Map[String, AnyRef]](payload) } else { throw new Exception("Payload not found for the given reference id: " + payloadRefId) } @@ -68,23 +68,23 @@ abstract class BaseDispatcherFunction(config: BaseJobConfig) } @throws(classOf[Exception]) - def audit(event: util.Map[String, AnyRef], context: ProcessFunction[util.Map[String, AnyRef], util.Map[String, AnyRef]]#Context, metrics: Metrics): Unit = { + def audit(event: java.util.Map[String, AnyRef], context: ProcessFunction[java.util.Map[String, AnyRef], java.util.Map[String, AnyRef]]#Context, metrics: Metrics): Unit = { auditService.indexAudit(createAuditRecord(event)) context.output(config.auditOutputTag, JSONUtil.serialize(createAuditLog(event))) metrics.incCounter(config.auditEventsCount) } - def createErrorMap(error: Option[ErrorResponse]):util.Map[String, AnyRef] = { - val errorMap:util.Map[String, AnyRef] = new util.HashMap[String, AnyRef] + def createErrorMap(error: Option[ErrorResponse]):java.util.Map[String, AnyRef] = { + val errorMap:java.util.Map[String, AnyRef] = new java.util.HashMap[String, AnyRef] errorMap.put("code", error.flatMap(_.code).getOrElse("")) errorMap.put("message", error.flatMap(_.message).getOrElse("")) errorMap.put("trace", error.flatMap(_.trace).getOrElse("")) errorMap } - def dispatchErrorResponse(event: util.Map[String, AnyRef], error: Option[ErrorResponse], correlationId: String, payloadRefId: String, senderCtx: util.Map[String, AnyRef], context: ProcessFunction[util.Map[String, AnyRef], util.Map[String, AnyRef]]#Context, metrics: Metrics): Unit = { - val protectedMap = new util.HashMap[String, AnyRef] + def dispatchErrorResponse(event: java.util.Map[String, AnyRef], error: Option[ErrorResponse], correlationId: String, payloadRefId: String, senderCtx: util.Map[String, AnyRef], context: ProcessFunction[util.Map[String, AnyRef], util.Map[String, AnyRef]]#Context, metrics: Metrics): Unit = { + val protectedMap = new java.util.HashMap[String, AnyRef] //Update sender code protectedMap.put(Constants.HCX_SENDER_CODE, config.hcxRegistryCode) //Update recipient code @@ -109,13 +109,13 @@ abstract class BaseDispatcherFunction(config: BaseJobConfig) } } - override def processElement(event: util.Map[String, AnyRef], context: ProcessFunction[util.Map[String, AnyRef], util.Map[String, AnyRef]]#Context, metrics: Metrics): Unit = { + override def processElement(event: java.util.Map[String, AnyRef], context: ProcessFunction[java.util.Map[String, AnyRef], java.util.Map[String, AnyRef]]#Context, metrics: Metrics): Unit = { //TODO Make changes here for handling redirect requests with flat JSON objects val correlationId = getProtocolStringValue(event, Constants.HCX_CORRELATION_ID) val payloadRefId = event.get(Constants.MID).asInstanceOf[String] // TODO change cdata to context after discussion. - val senderCtx = event.getOrDefault(Constants.CDATA, new util.HashMap[String, AnyRef]()).asInstanceOf[util.Map[String, AnyRef]].getOrDefault(Constants.SENDER, new util.HashMap[String, AnyRef]()).asInstanceOf[util.Map[String, AnyRef]] - val recipientCtx = event.getOrDefault(Constants.CDATA, new util.HashMap[String, AnyRef]()).asInstanceOf[util.Map[String, AnyRef]].getOrDefault(Constants.RECIPIENT, new util.HashMap[String, AnyRef]()).asInstanceOf[util.Map[String, AnyRef]] + val senderCtx = event.getOrDefault(Constants.CDATA, new java.util.HashMap[String, AnyRef]()).asInstanceOf[java.util.Map[String, AnyRef]].getOrDefault(Constants.SENDER, new java.util.HashMap[String, AnyRef]()).asInstanceOf[java.util.Map[String, AnyRef]] + val recipientCtx = event.getOrDefault(Constants.CDATA, new java.util.HashMap[String, AnyRef]()).asInstanceOf[java.util.Map[String, AnyRef]].getOrDefault(Constants.RECIPIENT, new java.util.HashMap[String, AnyRef]()).asInstanceOf[java.util.Map[String, AnyRef]] try { payload = getPayload(payloadRefId); if (MapUtils.isEmpty(senderCtx)) { @@ -188,7 +188,7 @@ abstract class BaseDispatcherFunction(config: BaseJobConfig) } } - private def dispatchError(payloadRefId: String, event: util.Map[String, AnyRef], result: DispatcherResult, correlationId: String, senderCtx: util.Map[String, AnyRef], context: ProcessFunction[util.Map[String, AnyRef], util.Map[String, AnyRef]]#Context, metrics: Metrics): Unit = { + private def dispatchError(payloadRefId: String, event: java.util.Map[String, AnyRef], result: DispatcherResult, correlationId: String, senderCtx: java.util.Map[String, AnyRef], context: ProcessFunction[java.util.Map[String, AnyRef], java.util.Map[String, AnyRef]]#Context, metrics: Metrics): Unit = { updateDBStatus(payloadRefId, Constants.ERROR_STATUS) setStatus(event, Constants.ERROR_STATUS) setErrorDetails(event, createErrorMap(result.error)) @@ -216,8 +216,8 @@ abstract class BaseDispatcherFunction(config: BaseJobConfig) override def metricsList(): List[String] = { List(config.dispatcherSuccessCount, config.dispatcherFailedCount, config.dispatcherRetryCount, config.dispatcherValidationFailedCount, config.dispatcherValidationSuccessCount, config.auditEventsCount, config.coverageEligibilityDispatcherSuccessCount, config.coverageEligibilityOnDispatcherSuccessCount, config.coverageEligibilityDispatcherRetryCount, config.coverageEligibilityOnDispatcherRetryCount, config.coverageEligibilityDispatcherFailedCount, config.coverageEligibilityOnDispatcherFailedCount, config.preAuthDispatcherSuccessCount, config.preAuthOnDispatcherSuccessCount, config.preAuthDispatcherRetryCount, config.preAuthOnDispatcherRetryCount, config.preAuthDispatcherFailedCount, config.preAuthOnDispatcherFailedCount, config.predeterminationDispatcherSuccessCount, config.predeterminationOnDispatcherSuccessCount, config.predeterminationDispatcherFailedCount, config.predeterminationOnDispatcherFailedCount, config.predeterminationDispatcherRetryCount, config.predeterminationOnDispatcherRetryCount, config.claimDispatcherSuccessCount, config.claimOnDispatcherSuccessCount, config.claimDispatcherFailedCount, config.claimOnDispatcherFailedCount, config.claimDispatcherRetryCount, config.claimOnDispatcherRetryCount, config.paymentDispatcherSuccessCount, config.paymentOnDispatcherSuccessCount, config.paymentDispatcherFailedCount, config.paymentOnDispatcherFailedCount, config.paymentDispatcherRetryCount, config.paymentOnDispatcherRetryCount, config.fetchDispatcherSuccessCount, config.fetchOnDispatcherSuccessCount, config.fetchDispatcherFailedCount, config.fetchOnDispatcherFailedCount, config.fetchDispatcherRetryCount, config.fetchOnDispatcherRetryCount, config.communicationDispatcherSuccessCount, config.communicationOnDispatcherSuccessCount, config.communicationDispatcherFailedCount, config.communicationOnDispatcherFailedCount, config.communicationDispatcherRetryCount, config.communicationOnDispatcherRetryCount, config.searchDispatcherSuccessCount, config.searchOnDispatcherSuccessCount, config.searchDispatcherFailedCount, config.searchOnDispatcherFailedCount, config.searchDispatcherRetryCount, config.searchOnDispatcherRetryCount, config.retryDispatcherSuccessCount, config.retryDispatcherFailedCount, config.retryDispatcherRetryCount)} - def createAuditRecord(event: util.Map[String, AnyRef]): util.Map[String, AnyRef] = { - val audit = new util.HashMap[String, AnyRef](); + def createAuditRecord(event: java.util.Map[String, AnyRef]): java.util.Map[String, AnyRef] = { + val audit = new java.util.HashMap[String, AnyRef](); audit.put(Constants.EID, Constants.AUDIT) audit.put(Constants.HCX_RECIPIENT_CODE, getProtocolStringValue(event, Constants.HCX_RECIPIENT_CODE)) audit.put(Constants.HCX_SENDER_CODE, getProtocolStringValue(event, Constants.HCX_SENDER_CODE)) @@ -249,24 +249,24 @@ abstract class BaseDispatcherFunction(config: BaseJobConfig) audit } - def createAuditLog(event: util.Map[String, AnyRef]): util.Map[String, AnyRef] = { - val audit = new util.HashMap[String, AnyRef]() + def createAuditLog(event: java.util.Map[String, AnyRef]): java.util.Map[String, AnyRef] = { + val audit = new java.util.HashMap[String, AnyRef]() audit.put(Constants.EID, Constants.AUDIT) audit.put(Constants.ETS, Calendar.getInstance().getTime) audit.put(Constants.MID, event.get(Constants.MID).asInstanceOf[String]) - audit.put(Constants.OBJECT, new util.HashMap[String, AnyRef]() { + audit.put(Constants.OBJECT, new java.util.HashMap[String, AnyRef]() { { put(Constants.ID, getProtocolStringValue(event, Constants.HCX_CORRELATION_ID)) put(Constants.TYPE, getEntity(event.get(Constants.ACTION).asInstanceOf[String])) } }) - audit.put(Constants.CDATA, new util.HashMap[String, AnyRef]() { + audit.put(Constants.CDATA, new java.util.HashMap[String, AnyRef]() { { put(Constants.ACTION, event.get(Constants.ACTION).asInstanceOf[String]) - putAll(event.get(Constants.HEADERS).asInstanceOf[util.Map[String, AnyRef]].get(Constants.PROTOCOL).asInstanceOf[util.Map[String, AnyRef]]) + putAll(event.get(Constants.HEADERS).asInstanceOf[java.util.Map[String, AnyRef]].get(Constants.PROTOCOL).asInstanceOf[java.util.Map[String, AnyRef]]) } }) - audit.put(Constants.EDATA, new util.HashMap[String, AnyRef]() { + audit.put(Constants.EDATA, new java.util.HashMap[String, AnyRef]() { { put(Constants.STATUS, getProtocolStringValue(event, Constants.HCX_STATUS)) } @@ -274,7 +274,7 @@ abstract class BaseDispatcherFunction(config: BaseJobConfig) audit } - def removeSensitiveData(payload: util.Map[String, AnyRef]): String = { + def removeSensitiveData(payload: java.util.Map[String, AnyRef]): String = { if (payload.containsKey(Constants.PAYLOAD)) { val modifiedPayload = payload.get(Constants.PAYLOAD).asInstanceOf[String].split("\\.").toBuffer // remove encryption key @@ -292,10 +292,10 @@ abstract class BaseDispatcherFunction(config: BaseJobConfig) } } - def dispatchRecipient(baseSenderCode: String, action: String, parsedPayload: util.Map[String, AnyRef]) = { + def dispatchRecipient(baseSenderCode: String, action: String, parsedPayload: java.util.Map[String, AnyRef]) = { val recipientDetails = fetchDetails(baseSenderCode) val recipientContext = createRecipientContext(recipientDetails, action) - val updatedPayload = new util.HashMap[String, AnyRef]() + val updatedPayload = new java.util.HashMap[String, AnyRef]() //TODO Remove this and use the utility for modifying the ciphertext updatedPayload.put(Constants.PAYLOAD, JSONUtil.createPayloadByValues(parsedPayload)); dispatcherUtil.dispatch(recipientContext, JSONUtil.serialize(updatedPayload)) @@ -308,7 +308,7 @@ abstract class BaseDispatcherFunction(config: BaseJobConfig) } - def getTag(event : util.Map[String, AnyRef],audit: util.HashMap[String, AnyRef]): Unit = { + def getTag(event : java.util.Map[String, AnyRef],audit: java.util.HashMap[String, AnyRef]): Unit = { val tagSet = new java.util.HashSet[String]() tagSet.add(getCDataListValue(event, Constants.SENDER, Constants.TAGS).toString) tagSet.add(getCDataListValue(event, Constants.RECIPIENT, Constants.TAGS).toString) @@ -329,7 +329,7 @@ abstract class BaseDispatcherFunction(config: BaseJobConfig) } } - def generateSuccessMetrics(event: util.Map[String, AnyRef], metrics: Metrics): Unit = { + def generateSuccessMetrics(event: java.util.Map[String, AnyRef], metrics: Metrics): Unit = { val action: String = event.get(Constants.ACTION).asInstanceOf[String]; action match { case Constants.COVERAGE_ELIGIBILITY_CHECK => metrics.incCounter(config.coverageEligibilityDispatcherSuccessCount) @@ -352,7 +352,7 @@ abstract class BaseDispatcherFunction(config: BaseJobConfig) } } - def generateFailedMetrics(event: util.Map[String, AnyRef], metrics: Metrics): Unit = { + def generateFailedMetrics(event: java.util.Map[String, AnyRef], metrics: Metrics): Unit = { val action: String = event.get(Constants.ACTION).asInstanceOf[String]; action match { case Constants.COVERAGE_ELIGIBILITY_CHECK => metrics.incCounter(config.coverageEligibilityDispatcherFailedCount) @@ -375,7 +375,7 @@ abstract class BaseDispatcherFunction(config: BaseJobConfig) } } - def generateRetryMetrics(event: util.Map[String, AnyRef], metrics: Metrics): Unit = { + def generateRetryMetrics(event: java.util.Map[String, AnyRef], metrics: Metrics): Unit = { val action: String = event.get(Constants.ACTION).asInstanceOf[String]; action match { case Constants.COVERAGE_ELIGIBILITY_CHECK => metrics.incCounter(config.coverageEligibilityDispatcherRetryCount) diff --git a/hcx-pipeline-jobs/core/src/main/scala/org/swasth/dp/core/function/ContextEnrichmentFunction.scala b/hcx-pipeline-jobs/core/src/main/scala/org/swasth/dp/core/function/ContextEnrichmentFunction.scala index 02dee1375..ac4256007 100644 --- a/hcx-pipeline-jobs/core/src/main/scala/org/swasth/dp/core/function/ContextEnrichmentFunction.scala +++ b/hcx-pipeline-jobs/core/src/main/scala/org/swasth/dp/core/function/ContextEnrichmentFunction.scala @@ -19,13 +19,13 @@ class ContextEnrichmentFunction(config: BaseJobConfig) (implicit val stringTypeI super.open(parameters) } - override def processElement(event: util.Map[String, AnyRef], context: ProcessFunction[util.Map[String, AnyRef], util.Map[String, AnyRef]]#Context, metrics: Metrics): Unit = { + override def processElement(event: java.util.Map[String, AnyRef], context: ProcessFunction[java.util.Map[String, AnyRef], java.util.Map[String, AnyRef]]#Context, metrics: Metrics): Unit = { val senderCode: String = getProtocolStringValue(event, Constants.HCX_SENDER_CODE) val recipientCode: String = getProtocolStringValue(event, Constants.HCX_RECIPIENT_CODE) val action: String = event.get(Constants.ACTION).asInstanceOf[String] Console.println(s"Sender: $senderCode : Recipient: $recipientCode : Action: $action") - val result: util.Map[String, AnyRef] = new util.HashMap[String, AnyRef]() + val result: java.util.Map[String, AnyRef] = new java.util.HashMap[String, AnyRef]() // Fetch the sender and receiver details from registry or cache val sender = fetchDetails(senderCode) diff --git a/hcx-pipeline-jobs/core/src/main/scala/org/swasth/dp/core/job/BaseJobConfig.scala b/hcx-pipeline-jobs/core/src/main/scala/org/swasth/dp/core/job/BaseJobConfig.scala index ee4e82421..ebf456b28 100644 --- a/hcx-pipeline-jobs/core/src/main/scala/org/swasth/dp/core/job/BaseJobConfig.scala +++ b/hcx-pipeline-jobs/core/src/main/scala/org/swasth/dp/core/job/BaseJobConfig.scala @@ -136,10 +136,10 @@ class BaseJobConfig(val config: Config, val jobName: String) extends Serializabl def kafkaProducerProperties: Properties = { val properties = new Properties() properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBrokerServers) - properties.put(ProducerConfig.LINGER_MS_CONFIG, new Integer(kafkaProducerLingerMs)) - properties.put(ProducerConfig.BATCH_SIZE_CONFIG, new Integer(kafkaProducerBatchSize)) + properties.put(ProducerConfig.LINGER_MS_CONFIG, Integer.valueOf(kafkaProducerLingerMs)) + properties.put(ProducerConfig.BATCH_SIZE_CONFIG, Integer.valueOf(kafkaProducerBatchSize)) properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy") - properties.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, new Integer(kafkaProducerMaxRequestSize)) + properties.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, Integer.valueOf(kafkaProducerMaxRequestSize)) properties } diff --git a/hcx-pipeline-jobs/core/src/main/scala/org/swasth/dp/core/job/FlinkKafkaConnector.scala b/hcx-pipeline-jobs/core/src/main/scala/org/swasth/dp/core/job/FlinkKafkaConnector.scala index 3a7d95155..3ae504def 100644 --- a/hcx-pipeline-jobs/core/src/main/scala/org/swasth/dp/core/job/FlinkKafkaConnector.scala +++ b/hcx-pipeline-jobs/core/src/main/scala/org/swasth/dp/core/job/FlinkKafkaConnector.scala @@ -1,29 +1,43 @@ package org.swasth.dp.core.job -import java.util - -import org.apache.flink.streaming.api.functions.sink.SinkFunction -import org.apache.flink.streaming.api.functions.source.SourceFunction -import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, FlinkKafkaProducer} -import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.Semantic +import org.apache.flink.connector.base.DeliveryGuarantee +import org.apache.flink.connector.kafka.sink.KafkaSink +import org.apache.flink.connector.kafka.source.KafkaSource import org.swasth.dp.core.serde._ +import java.util class FlinkKafkaConnector(config: BaseJobConfig) extends Serializable { - def kafkaMapSource(kafkaTopic: String): SourceFunction[util.Map[String, AnyRef]] = { - new FlinkKafkaConsumer[util.Map[String, AnyRef]](kafkaTopic, new MapDeserializationSchema, config.kafkaConsumerProperties) + def kafkaMapSource(kafkaTopic: String): KafkaSource[util.Map[String, AnyRef]] = { + KafkaSource.builder[util.Map[String, AnyRef]]() + .setTopics(kafkaTopic) + .setDeserializer(new MapDeserializationSchema) + .setProperties(config.kafkaConsumerProperties) + .build() } - def kafkaMapSink(kafkaTopic: String): SinkFunction[util.Map[String, AnyRef]] = { - new FlinkKafkaProducer[util.Map[String, AnyRef]](kafkaTopic, new MapSerializationSchema(kafkaTopic), config.kafkaProducerProperties, Semantic.AT_LEAST_ONCE) + def kafkaMapSink(kafkaTopic: String): KafkaSink[util.Map[String, AnyRef]] = { + KafkaSink.builder[util.Map[String, AnyRef]]() + .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE) + .setRecordSerializer(new MapSerializationSchema(kafkaTopic)) + .setKafkaProducerConfig(config.kafkaProducerProperties) + .build() } - def kafkaStringSource(kafkaTopic: String): SourceFunction[String] = { - new FlinkKafkaConsumer[String](kafkaTopic, new StringDeserializationSchema, config.kafkaConsumerProperties) + def kafkaStringSource(kafkaTopic: String): KafkaSource[String] = { + KafkaSource.builder[String]() + .setTopics(kafkaTopic) + .setDeserializer(new StringDeserializationSchema) + .setProperties(config.kafkaConsumerProperties) + .build() } - def kafkaStringSink(kafkaTopic: String): SinkFunction[String] = { - new FlinkKafkaProducer[String](kafkaTopic, new StringSerializationSchema(kafkaTopic), config.kafkaProducerProperties, Semantic.AT_LEAST_ONCE) + def kafkaStringSink(kafkaTopic: String): KafkaSink[String] = { + KafkaSink.builder[String]() + .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE) + .setRecordSerializer(new StringSerializationSchema(kafkaTopic)) + .setKafkaProducerConfig(config.kafkaProducerProperties) + .build() } } \ No newline at end of file diff --git a/hcx-pipeline-jobs/core/src/main/scala/org/swasth/dp/core/serde/MapSerde.scala b/hcx-pipeline-jobs/core/src/main/scala/org/swasth/dp/core/serde/MapSerde.scala index 193f2f10d..74dc9cd75 100644 --- a/hcx-pipeline-jobs/core/src/main/scala/org/swasth/dp/core/serde/MapSerde.scala +++ b/hcx-pipeline-jobs/core/src/main/scala/org/swasth/dp/core/serde/MapSerde.scala @@ -1,38 +1,38 @@ package org.swasth.dp.core.serde -import java.nio.charset.StandardCharsets -import java.util - -import com.google.gson.Gson import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.typeutils.TypeExtractor -import org.apache.flink.streaming.connectors.kafka.{KafkaDeserializationSchema, KafkaSerializationSchema} +import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema +import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema +import org.apache.flink.util.Collector import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.clients.producer.ProducerRecord import org.swasth.dp.core.util.JSONUtil -class MapDeserializationSchema extends KafkaDeserializationSchema[util.Map[String, AnyRef]] { +import java.nio.charset.StandardCharsets +import java.util +import scala.collection.mutable + +class MapDeserializationSchema extends KafkaRecordDeserializationSchema[util.Map[String, AnyRef]] { private val serialVersionUID = -3224825136576915426L - override def isEndOfStream(nextElement: util.Map[String, AnyRef]): Boolean = false + override def getProducedType: TypeInformation[util.Map[String, AnyRef]] = TypeExtractor.getForClass(classOf[util.Map[String, AnyRef]]) - override def deserialize(record: ConsumerRecord[Array[Byte], Array[Byte]]): util.Map[String, AnyRef] = { - val recordMap = JSONUtil.deserialize[util.HashMap[String, AnyRef]](record.value()) - recordMap + override def deserialize(record: ConsumerRecord[Array[Byte], Array[Byte]], out: Collector[util.Map[String, AnyRef]]): Unit = { + val msg = JSONUtil.deserialize[util.Map[String, AnyRef]](record.value()) + out.collect(msg) } - - override def getProducedType: TypeInformation[util.Map[String, AnyRef]] = TypeExtractor.getForClass(classOf[util.Map[String, AnyRef]]) } -class MapSerializationSchema(topic: String, key: Option[String] = None) extends KafkaSerializationSchema[util.Map[String, AnyRef]] { +class MapSerializationSchema(topic: String, key: Option[String] = None) extends KafkaRecordSerializationSchema[util.Map[String, AnyRef]] { private val serialVersionUID = -4284080856874185929L - override def serialize(element: util.Map[String, AnyRef], timestamp: java.lang.Long): ProducerRecord[Array[Byte], Array[Byte]] = { - val out = new Gson().toJson(element) + override def serialize(element: util.Map[String, AnyRef], context: KafkaRecordSerializationSchema.KafkaSinkContext, timestamp: java.lang.Long): ProducerRecord[Array[Byte], Array[Byte]] = { + val out = JSONUtil.serialize(element) key.map { kafkaKey => new ProducerRecord[Array[Byte], Array[Byte]](topic, kafkaKey.getBytes(StandardCharsets.UTF_8), out.getBytes(StandardCharsets.UTF_8)) }.getOrElse(new ProducerRecord[Array[Byte], Array[Byte]](topic, out.getBytes(StandardCharsets.UTF_8))) } -} +} \ No newline at end of file diff --git a/hcx-pipeline-jobs/core/src/main/scala/org/swasth/dp/core/serde/StringSerde.scala b/hcx-pipeline-jobs/core/src/main/scala/org/swasth/dp/core/serde/StringSerde.scala index a5cb57deb..88ee85ae3 100644 --- a/hcx-pipeline-jobs/core/src/main/scala/org/swasth/dp/core/serde/StringSerde.scala +++ b/hcx-pipeline-jobs/core/src/main/scala/org/swasth/dp/core/serde/StringSerde.scala @@ -4,28 +4,28 @@ import java.nio.charset.StandardCharsets import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.typeutils.TypeExtractor -import org.apache.flink.streaming.connectors.kafka.{KafkaDeserializationSchema, KafkaSerializationSchema} +import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema +import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema +import org.apache.flink.util.Collector import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.clients.producer.ProducerRecord -class StringDeserializationSchema extends KafkaDeserializationSchema[String] { +class StringDeserializationSchema extends KafkaRecordDeserializationSchema[String] { private val serialVersionUID = -3224825136576915426L - override def isEndOfStream(nextElement: String): Boolean = false + override def getProducedType: TypeInformation[String] = TypeExtractor.getForClass(classOf[String]) - override def deserialize(record: ConsumerRecord[Array[Byte], Array[Byte]]): String = { - new String(record.value(), StandardCharsets.UTF_8) + override def deserialize(record: ConsumerRecord[Array[Byte], Array[Byte]], out: Collector[String]): Unit = { + out.collect(new String(record.value(), StandardCharsets.UTF_8)) } - - override def getProducedType: TypeInformation[String] = TypeExtractor.getForClass(classOf[String]) } -class StringSerializationSchema(topic: String, key: Option[String] = None) extends KafkaSerializationSchema[String] { +class StringSerializationSchema(topic: String, key: Option[String] = None) extends KafkaRecordSerializationSchema[String] { private val serialVersionUID = -4284080856874185929L - override def serialize(element: String, timestamp: java.lang.Long): ProducerRecord[Array[Byte], Array[Byte]] = { + override def serialize(element: String, context: KafkaRecordSerializationSchema.KafkaSinkContext, timestamp: java.lang.Long): ProducerRecord[Array[Byte], Array[Byte]] = { key.map { kafkaKey => new ProducerRecord[Array[Byte], Array[Byte]](topic, kafkaKey.getBytes(StandardCharsets.UTF_8), element.getBytes(StandardCharsets.UTF_8)) }.getOrElse(new ProducerRecord[Array[Byte], Array[Byte]](topic, element.getBytes(StandardCharsets.UTF_8))) diff --git a/hcx-pipeline-jobs/coverage-eligibility-job/pom.xml b/hcx-pipeline-jobs/coverage-eligibility-job/pom.xml index 8842e545e..dc9fbcd9c 100644 --- a/hcx-pipeline-jobs/coverage-eligibility-job/pom.xml +++ b/hcx-pipeline-jobs/coverage-eligibility-job/pom.xml @@ -40,20 +40,20 @@ org.apache.flink - flink-streaming-java_${scala.maj.version} + flink-streaming-java ${flink.version} org.apache.flink - flink-clients_${scala.maj.version} + flink-clients ${flink.version} org.apache.flink - flink-connector-kafka_${scala.maj.version} - ${flink.version} + flink-connector-kafka + 3.0.1-1.18 org.swasth @@ -89,7 +89,7 @@ org.apache.flink - flink-streaming-java_2.12 + flink-streaming-java ${flink.version} test tests diff --git a/hcx-pipeline-jobs/fetch-job/pom.xml b/hcx-pipeline-jobs/fetch-job/pom.xml index e68612ac1..bb938c283 100644 --- a/hcx-pipeline-jobs/fetch-job/pom.xml +++ b/hcx-pipeline-jobs/fetch-job/pom.xml @@ -40,20 +40,20 @@ org.apache.flink - flink-streaming-java_${scala.maj.version} + flink-streaming-java ${flink.version} org.apache.flink - flink-clients_${scala.maj.version} + flink-clients ${flink.version} org.apache.flink - flink-connector-kafka_${scala.maj.version} - ${flink.version} + flink-connector-kafka + 3.0.1-1.18 org.swasth @@ -89,7 +89,7 @@ org.apache.flink - flink-streaming-java_2.12 + flink-streaming-java ${flink.version} test tests diff --git a/hcx-pipeline-jobs/jobs-distribution/Dockerfile b/hcx-pipeline-jobs/jobs-distribution/Dockerfile index ef129165b..ce77606cd 100644 --- a/hcx-pipeline-jobs/jobs-distribution/Dockerfile +++ b/hcx-pipeline-jobs/jobs-distribution/Dockerfile @@ -1,4 +1,4 @@ -FROM anandp504/flink:1.12.0-scala_2.12-java11 +FROM anandp504/flink:1.18.1-scala_2.12-java17 USER flink ADD target/jobs-distribution-1.0.tar.gz $FLINK_HOME/lib/ diff --git a/hcx-pipeline-jobs/jobs-distribution/pom.xml b/hcx-pipeline-jobs/jobs-distribution/pom.xml index d28ddf068..93174e5eb 100644 --- a/hcx-pipeline-jobs/jobs-distribution/pom.xml +++ b/hcx-pipeline-jobs/jobs-distribution/pom.xml @@ -16,7 +16,7 @@ org.apache.flink - flink-streaming-java_${scala.maj.version} + flink-streaming-java ${flink.version} jar diff --git a/hcx-pipeline-jobs/message-service/pom.xml b/hcx-pipeline-jobs/message-service/pom.xml index 4d4ffe411..92890f954 100644 --- a/hcx-pipeline-jobs/message-service/pom.xml +++ b/hcx-pipeline-jobs/message-service/pom.xml @@ -52,20 +52,20 @@ org.apache.flink - flink-streaming-java_${scala.maj.version} + flink-streaming-java ${flink.version} org.apache.flink - flink-clients_${scala.maj.version} + flink-clients ${flink.version} org.apache.flink - flink-connector-kafka_${scala.maj.version} - ${flink.version} + flink-connector-kafka + 3.0.1-1.18 org.swasth @@ -101,7 +101,7 @@ org.apache.flink - flink-streaming-java_2.12 + flink-streaming-java ${flink.version} test tests diff --git a/hcx-pipeline-jobs/message-service/src/main/java/org/swasth/dp/message/service/task/MessageServiceStreamTask.java b/hcx-pipeline-jobs/message-service/src/main/java/org/swasth/dp/message/service/task/MessageServiceStreamTask.java index 0a87cbaf0..ee754531c 100644 --- a/hcx-pipeline-jobs/message-service/src/main/java/org/swasth/dp/message/service/task/MessageServiceStreamTask.java +++ b/hcx-pipeline-jobs/message-service/src/main/java/org/swasth/dp/message/service/task/MessageServiceStreamTask.java @@ -2,7 +2,9 @@ import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.connector.kafka.source.KafkaSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; @@ -48,11 +50,11 @@ public static void main(String[] args) { private void process(BaseJobConfig baseJobConfig) throws Exception { StreamExecutionEnvironment env = FlinkUtil.getExecutionContext(baseJobConfig); - SourceFunction> kafkaConsumer = kafkaConnector.kafkaMapSource(config.inputTopic); + KafkaSource> kafkaConsumer = kafkaConnector.kafkaMapSource(config.inputTopic); env.enableCheckpointing(config.checkpointingInterval()); env.getCheckpointConfig().setCheckpointTimeout(config.checkpointingTimeout()); env.getCheckpointConfig().setMinPauseBetweenCheckpoints(config.checkpointingPauseSeconds()); - SingleOutputStreamOperator> eventStream = env.addSource(kafkaConsumer, config.messageServiceConsumer) + SingleOutputStreamOperator> eventStream = env.fromSource(kafkaConsumer,WatermarkStrategy.noWatermarks(), config.messageServiceConsumer) .uid(config.messageServiceConsumer).setParallelism(config.consumerParallelism) .rebalance() .process(new MessageFilterFunction(config)).setParallelism(config.downstreamOperatorsParallelism); diff --git a/hcx-pipeline-jobs/notification-job/pom.xml b/hcx-pipeline-jobs/notification-job/pom.xml index fa91f2bf0..3489246c4 100644 --- a/hcx-pipeline-jobs/notification-job/pom.xml +++ b/hcx-pipeline-jobs/notification-job/pom.xml @@ -41,20 +41,20 @@ org.apache.flink - flink-streaming-java_${scala.maj.version} + flink-streaming-java ${flink.version} org.apache.flink - flink-clients_${scala.maj.version} + flink-clients ${flink.version} org.apache.flink - flink-connector-kafka_${scala.maj.version} - ${flink.version} + flink-connector-kafka + 3.1.0-1.18 org.swasth @@ -96,7 +96,7 @@ org.apache.commons commons-text - 1.9 + 1.10.0 com.googlecode.json-simple @@ -105,7 +105,7 @@ org.apache.flink - flink-streaming-java_2.12 + flink-streaming-java ${flink.version} test tests diff --git a/hcx-pipeline-jobs/notification-job/src/main/java/org/swasth/dp/notification/task/NotificationStreamTask.java b/hcx-pipeline-jobs/notification-job/src/main/java/org/swasth/dp/notification/task/NotificationStreamTask.java index 07b8c9519..d627e21d4 100644 --- a/hcx-pipeline-jobs/notification-job/src/main/java/org/swasth/dp/notification/task/NotificationStreamTask.java +++ b/hcx-pipeline-jobs/notification-job/src/main/java/org/swasth/dp/notification/task/NotificationStreamTask.java @@ -2,9 +2,11 @@ import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.connector.sink.Sink; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.connector.kafka.source.KafkaSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; @@ -49,14 +51,14 @@ public static void main(String[] args) { void process(BaseJobConfig baseJobConfig) throws Exception { StreamExecutionEnvironment env = FlinkUtil.getExecutionContext(baseJobConfig); - SourceFunction> notifyConsumer = kafkaConnector.kafkaMapSource(config.kafkaInputTopic); - SourceFunction> subscriptionConsumer = kafkaConnector.kafkaMapSource(config.subscriptionInputTopic); - SourceFunction> onSubscriptionConsumer = kafkaConnector.kafkaMapSource(config.onSubscriptionInputTopic); + KafkaSource> notifyConsumer = kafkaConnector.kafkaMapSource(config.kafkaInputTopic); + KafkaSource> subscriptionConsumer = kafkaConnector.kafkaMapSource(config.subscriptionInputTopic); + KafkaSource> onSubscriptionConsumer = kafkaConnector.kafkaMapSource(config.onSubscriptionInputTopic); env.enableCheckpointing(config.checkpointingInterval()); env.getCheckpointConfig().setCheckpointTimeout(config.checkpointingTimeout()); env.getCheckpointConfig().setMinPauseBetweenCheckpoints(config.checkpointingPauseSeconds()); //Notification Stream - SingleOutputStreamOperator> dispatchedStream = env.addSource(notifyConsumer, config.notificationConsumer) + SingleOutputStreamOperator> dispatchedStream = env.fromSource(notifyConsumer, WatermarkStrategy.noWatermarks(), config.notificationConsumer) .uid(config.notificationConsumer).setParallelism(config.consumerParallelism) .rebalance() .process(new NotificationFilterFunction(config)).setParallelism(config.downstreamOperatorsParallelism); @@ -65,12 +67,12 @@ void process(BaseJobConfig baseJobConfig) throws Exception { .process(new NotificationDispatcherFunction(config)).setParallelism(config.dispatcherParallelism); // Sink notifications to message topic - notificationStream.getSideOutput(config.messageOutputTag).addSink(kafkaConnector.kafkaStringSink(config.messageTopic)) + notificationStream.getSideOutput(config.messageOutputTag).sinkTo(kafkaConnector.kafkaStringSink(config.messageTopic)) .name(config.notificationMessageProducer).uid("notification-message-sink").setParallelism(config.downstreamOperatorsParallelism); //Subscription Stream //Filter the records based on the action type - SingleOutputStreamOperator> filteredStream = env.addSource(subscriptionConsumer, config.subscriptionConsumer) + SingleOutputStreamOperator> filteredStream = env.fromSource(subscriptionConsumer,WatermarkStrategy.noWatermarks(), config.subscriptionConsumer) .uid(config.subscriptionConsumer).setParallelism(config.consumerParallelism).rebalance() .process(new SubscriptionFilterFunction(config)).setParallelism(config.downstreamOperatorsParallelism); @@ -83,7 +85,7 @@ void process(BaseJobConfig baseJobConfig) throws Exception { .process(new SubscriptionDispatcherFunction(config)).setParallelism(config.downstreamOperatorsParallelism); //OnSubscription Stream - SingleOutputStreamOperator> onSubscribeStream = env.addSource(onSubscriptionConsumer, config.onSubscriptionConsumer) + SingleOutputStreamOperator> onSubscribeStream = env.fromSource(onSubscriptionConsumer, WatermarkStrategy.noWatermarks(),config.onSubscriptionConsumer) .uid(config.onSubscriptionConsumer).setParallelism(config.consumerParallelism).rebalance() .process(new OnSubscriptionFunction(config)).setParallelism(config.downstreamOperatorsParallelism); diff --git a/hcx-pipeline-jobs/notification-trigger-job/pom.xml b/hcx-pipeline-jobs/notification-trigger-job/pom.xml index 856a16b5f..3d217c5a7 100644 --- a/hcx-pipeline-jobs/notification-trigger-job/pom.xml +++ b/hcx-pipeline-jobs/notification-trigger-job/pom.xml @@ -43,20 +43,20 @@ org.apache.flink - flink-streaming-java_${scala.maj.version} + flink-streaming-java ${flink.version} org.apache.flink - flink-clients_${scala.maj.version} + flink-clients ${flink.version} org.apache.flink - flink-connector-kafka_${scala.maj.version} - ${flink.version} + flink-connector-kafka + 3.0.1-1.18 org.swasth @@ -93,7 +93,7 @@ org.apache.commons commons-text - 1.9 + 1.10.0 com.googlecode.json-simple @@ -102,7 +102,7 @@ org.apache.flink - flink-streaming-java_2.12 + flink-streaming-java ${flink.version} test tests diff --git a/hcx-pipeline-jobs/notification-trigger-job/src/main/java/org/swasth/dp/notification/trigger/task/NotificationTriggerStreamTask.java b/hcx-pipeline-jobs/notification-trigger-job/src/main/java/org/swasth/dp/notification/trigger/task/NotificationTriggerStreamTask.java index e7d4f4708..af335885e 100644 --- a/hcx-pipeline-jobs/notification-trigger-job/src/main/java/org/swasth/dp/notification/trigger/task/NotificationTriggerStreamTask.java +++ b/hcx-pipeline-jobs/notification-trigger-job/src/main/java/org/swasth/dp/notification/trigger/task/NotificationTriggerStreamTask.java @@ -2,7 +2,9 @@ import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.connector.kafka.source.KafkaSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; @@ -46,17 +48,17 @@ public static void main(String[] args) { private void process(BaseJobConfig baseJobConfig) throws Exception { StreamExecutionEnvironment env = FlinkUtil.getExecutionContext(baseJobConfig); - SourceFunction> kafkaConsumer = kafkaConnector.kafkaMapSource(config.kafkaInputTopic); + KafkaSource> kafkaConsumer = kafkaConnector.kafkaMapSource(config.kafkaInputTopic); env.enableCheckpointing(config.checkpointingInterval()); env.getCheckpointConfig().setCheckpointTimeout(config.checkpointingTimeout()); env.getCheckpointConfig().setMinPauseBetweenCheckpoints(config.checkpointingPauseSeconds()); - SingleOutputStreamOperator> eventStream = env.addSource(kafkaConsumer, config.notificationTriggerConsumer) + SingleOutputStreamOperator> eventStream = env.fromSource(kafkaConsumer, WatermarkStrategy.noWatermarks(), config.notificationTriggerConsumer) .uid(config.notificationTriggerConsumer).setParallelism(config.consumerParallelism) .rebalance() .process(new NotificationTriggerProcessFunction(config)).setParallelism(config.downstreamOperatorsParallelism); /** Sink for notify events */ - eventStream.getSideOutput(config.notifyOutputTag).addSink(kafkaConnector.kafkaStringSink(config.kafkaOutputTopic)) + eventStream.getSideOutput(config.notifyOutputTag).sinkTo(kafkaConnector.kafkaStringSink(config.kafkaOutputTopic)) .name(config.notifyProducer).uid(config.notifyProducer).setParallelism(config.downstreamOperatorsParallelism); System.out.println(config.jobName() + " is processing"); diff --git a/hcx-pipeline-jobs/payments-job/pom.xml b/hcx-pipeline-jobs/payments-job/pom.xml index 6fb0d2461..6bb3c83de 100644 --- a/hcx-pipeline-jobs/payments-job/pom.xml +++ b/hcx-pipeline-jobs/payments-job/pom.xml @@ -40,20 +40,20 @@ org.apache.flink - flink-streaming-java_${scala.maj.version} + flink-streaming-java ${flink.version} org.apache.flink - flink-clients_${scala.maj.version} + flink-clients ${flink.version} org.apache.flink - flink-connector-kafka_${scala.maj.version} - ${flink.version} + flink-connector-kafka + 3.0.1-1.18 org.swasth @@ -89,7 +89,7 @@ org.apache.flink - flink-streaming-java_2.12 + flink-streaming-java ${flink.version} test tests diff --git a/hcx-pipeline-jobs/pom.xml b/hcx-pipeline-jobs/pom.xml index dfb2aa03f..9572d159a 100644 --- a/hcx-pipeline-jobs/pom.xml +++ b/hcx-pipeline-jobs/pom.xml @@ -29,9 +29,9 @@ UTF-8 2.12.11 2.12 - 1.12.0 - 2.4.0 - 11 + 1.18.1 + 3.1.0 + 17 1.9.13 1.4.0 release-1.0 diff --git a/hcx-pipeline-jobs/preauth-job/pom.xml b/hcx-pipeline-jobs/preauth-job/pom.xml index 437d03524..4c06d4eb5 100644 --- a/hcx-pipeline-jobs/preauth-job/pom.xml +++ b/hcx-pipeline-jobs/preauth-job/pom.xml @@ -40,20 +40,20 @@ org.apache.flink - flink-streaming-java_${scala.maj.version} + flink-streaming-java ${flink.version} org.apache.flink - flink-clients_${scala.maj.version} + flink-clients ${flink.version} org.apache.flink - flink-connector-kafka_${scala.maj.version} - ${flink.version} + flink-connector-kafka + 3.0.1-1.18 org.swasth @@ -89,7 +89,7 @@ org.apache.flink - flink-streaming-java_2.12 + flink-streaming-java ${flink.version} test tests diff --git a/hcx-pipeline-jobs/predetermination-job/pom.xml b/hcx-pipeline-jobs/predetermination-job/pom.xml index 93b3c19ae..fb5f7b70d 100644 --- a/hcx-pipeline-jobs/predetermination-job/pom.xml +++ b/hcx-pipeline-jobs/predetermination-job/pom.xml @@ -40,20 +40,20 @@ org.apache.flink - flink-streaming-java_${scala.maj.version} + flink-streaming-java ${flink.version} org.apache.flink - flink-clients_${scala.maj.version} + flink-clients ${flink.version} org.apache.flink - flink-connector-kafka_${scala.maj.version} - ${flink.version} + flink-connector-kafka + 3.0.1-1.18 org.swasth @@ -89,7 +89,7 @@ org.apache.flink - flink-streaming-java_2.12 + flink-streaming-java ${flink.version} test tests diff --git a/hcx-pipeline-jobs/protocol-request-processor/pom.xml b/hcx-pipeline-jobs/protocol-request-processor/pom.xml index 98320ca11..591a25d44 100644 --- a/hcx-pipeline-jobs/protocol-request-processor/pom.xml +++ b/hcx-pipeline-jobs/protocol-request-processor/pom.xml @@ -47,20 +47,20 @@ org.apache.flink - flink-streaming-java_${scala.maj.version} + flink-streaming-java ${flink.version} org.apache.flink - flink-clients_${scala.maj.version} + flink-clients ${flink.version} org.apache.flink - flink-connector-kafka_${scala.maj.version} - ${flink.version} + flink-connector-kafka + 3.0.1-1.18 org.swasth @@ -96,7 +96,7 @@ org.apache.flink - flink-streaming-java_2.12 + flink-streaming-java ${flink.version} test tests diff --git a/hcx-pipeline-jobs/protocol-request-processor/src/main/java/org/swasth/dp/task/ProtocolRequestProcessorStreamTask.java b/hcx-pipeline-jobs/protocol-request-processor/src/main/java/org/swasth/dp/task/ProtocolRequestProcessorStreamTask.java index 27fcdefb3..6b65cd072 100644 --- a/hcx-pipeline-jobs/protocol-request-processor/src/main/java/org/swasth/dp/task/ProtocolRequestProcessorStreamTask.java +++ b/hcx-pipeline-jobs/protocol-request-processor/src/main/java/org/swasth/dp/task/ProtocolRequestProcessorStreamTask.java @@ -2,8 +2,10 @@ import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.connector.kafka.source.KafkaSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; @@ -46,9 +48,9 @@ public static void main(String[] args) { void process(BaseJobConfig baseJobConfig) throws Exception { StreamExecutionEnvironment env = FlinkUtil.getExecutionContext(baseJobConfig); - SourceFunction> kafkaConsumer = kafkaConnector.kafkaMapSource(config.kafkaInputTopic); + KafkaSource> kafkaConsumer = kafkaConnector.kafkaMapSource(config.kafkaInputTopic); - SingleOutputStreamOperator> enrichedStream = env.addSource(kafkaConsumer, config.protocolRequestConsumer) + SingleOutputStreamOperator> enrichedStream = env.fromSource(kafkaConsumer, WatermarkStrategy.noWatermarks(), config.protocolRequestConsumer) .uid(config.protocolRequestConsumer).setParallelism(config.consumerParallelism) .rebalance() .process(new ContextEnrichmentFunction(config, TypeExtractor.getForClass(String.class))).setParallelism(config.downstreamOperatorsParallelism); @@ -108,7 +110,7 @@ void process(BaseJobConfig baseJobConfig) throws Exception { private void sendAuditToKafka(SingleOutputStreamOperator> eventStream, ProtocolRequestProcessorConfig config, FlinkKafkaConnector kafkaConnector,String uid) { - eventStream.getSideOutput(config.auditOutputTag()).addSink(kafkaConnector.kafkaStringSink(config.auditTopic())) + eventStream.getSideOutput(config.auditOutputTag()).sinkTo(kafkaConnector.kafkaStringSink(config.auditTopic())) .name(config.auditProducer()).uid(uid).setParallelism(config.downstreamOperatorsParallelism); } } diff --git a/hcx-pipeline-jobs/retry-job/pom.xml b/hcx-pipeline-jobs/retry-job/pom.xml index 41dd93fbb..6f59cd84c 100644 --- a/hcx-pipeline-jobs/retry-job/pom.xml +++ b/hcx-pipeline-jobs/retry-job/pom.xml @@ -40,20 +40,20 @@ org.apache.flink - flink-streaming-java_${scala.maj.version} + flink-streaming-java ${flink.version} org.apache.flink - flink-clients_${scala.maj.version} + flink-clients ${flink.version} org.apache.flink - flink-connector-kafka_${scala.maj.version} - ${flink.version} + flink-connector-kafka + 3.0.1-1.18 org.swasth @@ -89,7 +89,7 @@ org.apache.flink - flink-streaming-java_2.12 + flink-streaming-java ${flink.version} test tests diff --git a/hcx-pipeline-jobs/search-response-job/pom.xml b/hcx-pipeline-jobs/search-response-job/pom.xml index 53064cd2c..0af4983cd 100644 --- a/hcx-pipeline-jobs/search-response-job/pom.xml +++ b/hcx-pipeline-jobs/search-response-job/pom.xml @@ -69,13 +69,13 @@ org.apache.flink - flink-test-utils_2.12 + flink-test-utils ${flink.version} test org.apache.flink - flink-runtime_2.12 + flink-runtime ${flink.version} test tests @@ -100,7 +100,7 @@ org.apache.flink - flink-streaming-java_2.12 + flink-streaming-java ${flink.version} test tests @@ -119,7 +119,7 @@ org.apache.flink - flink-clients_${scala.maj.version} + flink-clients ${flink.version} From 821c1b7776c4db7bba685b57ebe6f99eb93aa8ef Mon Sep 17 00:00:00 2001 From: KrutikaPhirangi <138781661+KrutikaPhirangi@users.noreply.github.com> Date: Thu, 7 Mar 2024 14:30:45 +0530 Subject: [PATCH 2/7] Update Dockerfile --- hcx-pipeline-jobs/jobs-distribution/Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hcx-pipeline-jobs/jobs-distribution/Dockerfile b/hcx-pipeline-jobs/jobs-distribution/Dockerfile index ce77606cd..d38555586 100644 --- a/hcx-pipeline-jobs/jobs-distribution/Dockerfile +++ b/hcx-pipeline-jobs/jobs-distribution/Dockerfile @@ -1,4 +1,4 @@ -FROM anandp504/flink:1.18.1-scala_2.12-java17 +FROM flink:1.18.1-scala_2.12-java17 USER flink ADD target/jobs-distribution-1.0.tar.gz $FLINK_HOME/lib/ From 2532bed5d850d5c55cbdc81c432227e5a0f42a94 Mon Sep 17 00:00:00 2001 From: KrutikaPhirangi <138781661+KrutikaPhirangi@users.noreply.github.com> Date: Thu, 7 Mar 2024 15:11:25 +0530 Subject: [PATCH 3/7] update test --- .../spec/BaseProcessFunctionTestSpec.scala | 19 ++++++-------- .../scala/org/swasth/spec/CoreTestSpec.scala | 25 +++++++++++++------ .../task/NotificationStreamTaskTest.java | 8 +++--- 3 files changed, 30 insertions(+), 22 deletions(-) diff --git a/hcx-pipeline-jobs/core/src/test/scala/org/swasth/spec/BaseProcessFunctionTestSpec.scala b/hcx-pipeline-jobs/core/src/test/scala/org/swasth/spec/BaseProcessFunctionTestSpec.scala index 9d9de7b31..dbc9d61f8 100644 --- a/hcx-pipeline-jobs/core/src/test/scala/org/swasth/spec/BaseProcessFunctionTestSpec.scala +++ b/hcx-pipeline-jobs/core/src/test/scala/org/swasth/spec/BaseProcessFunctionTestSpec.scala @@ -1,11 +1,11 @@ package org.swasth.spec import java.util - import com.google.gson.Gson import com.typesafe.config.{Config, ConfigFactory} import net.manub.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig} import net.manub.embeddedkafka.EmbeddedKafka._ +import org.apache.flink.api.common.eventtime.WatermarkStrategy import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment import org.apache.flink.streaming.api.scala._ @@ -107,21 +107,18 @@ class BaseProcessFunctionTestSpec extends BaseSpec with Matchers { implicit val env: StreamExecutionEnvironment = FlinkUtil.getExecutionContext(bsConfig) - val mapStream = - env.addSource(kafkaConnector.kafkaMapSource(bsConfig.kafkaMapInputTopic), "map-event-consumer") - .process(new TestMapStreamFunc(bsConfig)).name("TestMapEventStream") - + val mapStream = env.fromSource(kafkaConnector.kafkaMapSource(bsConfig.kafkaMapInputTopic), WatermarkStrategy.noWatermarks[util.Map[String, AnyRef]](), + "map-event-consumer").process(new TestMapStreamFunc(bsConfig )).name("TestMapEventStream") mapStream.getSideOutput(bsConfig.mapOutputTag) - .addSink(kafkaConnector.kafkaMapSink(bsConfig.kafkaMapOutputTopic)) + .sinkTo(kafkaConnector.kafkaMapSink(bsConfig.kafkaMapOutputTopic)) .name("Map-Event-Producer") val stringStream = - env.addSource(kafkaConnector.kafkaStringSource(bsConfig.kafkaStringInputTopic), "string-event-consumer") - .process(new TestStringStreamFunc(bsConfig)).name("TestStringEventStream") + env.fromSource(kafkaConnector.kafkaStringSource(bsConfig.kafkaStringInputTopic), WatermarkStrategy.noWatermarks[String](), "string-event-consumer") + .process(new TestStringStreamFunc(bsConfig)).name("TestStringEventStream") - stringStream.getSideOutput(bsConfig.stringOutputTag) - .addSink(kafkaConnector.kafkaStringSink(bsConfig.kafkaStringOutputTopic)) - .name("String-Producer") + .sinkTo(kafkaConnector.kafkaStringSink(bsConfig.kafkaStringOutputTopic)) + .name("String-Producer") Future { env.execute("TestSerDeFunctionality") diff --git a/hcx-pipeline-jobs/core/src/test/scala/org/swasth/spec/CoreTestSpec.scala b/hcx-pipeline-jobs/core/src/test/scala/org/swasth/spec/CoreTestSpec.scala index 5d992cc9b..366be9aec 100644 --- a/hcx-pipeline-jobs/core/src/test/scala/org/swasth/spec/CoreTestSpec.scala +++ b/hcx-pipeline-jobs/core/src/test/scala/org/swasth/spec/CoreTestSpec.scala @@ -1,13 +1,12 @@ package org.swasth.spec -import java.util - -import com.google.gson.Gson import com.typesafe.config.{Config, ConfigFactory} import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.typeutils.TypeExtractor +import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment import org.apache.flink.streaming.api.scala.OutputTag +import org.apache.flink.util.Collector import org.scalatest.Matchers import org.scalatestplus.mockito.MockitoSugar import org.swasth.dp.contentupdater.core.util.RestUtil @@ -18,6 +17,8 @@ import org.swasth.dp.core.util.FlinkUtil import org.swasth.fixture.EventFixture import redis.clients.jedis.exceptions.{JedisDataException, JedisException} +import java.util + class CoreTestSpec extends BaseSpec with Matchers with MockitoSugar { val config: Config = ConfigFactory.load("base-test.conf") @@ -95,16 +96,24 @@ class CoreTestSpec extends BaseSpec with Matchers with MockitoSugar { val stringDeSerialization = new StringDeserializationSchema() val stringSerialization = new StringSerializationSchema(topic, Some("kafka-key")) val mapSerialization: MapSerializationSchema = new MapSerializationSchema(topic, Some("kafka-key")) + val context = mock[KafkaRecordSerializationSchema.KafkaSinkContext] + val collector = new Collector[String] { + var collectedValues: List[String] = List() + override def collect(record: String): Unit = { + collectedValues = collectedValues :+ record + } + override def close(): Unit = {} + } val mapDeSerialization = new MapDeserializationSchema() import org.apache.kafka.clients.consumer.ConsumerRecord val cRecord: ConsumerRecord[Array[Byte], Array[Byte]] = new ConsumerRecord[Array[Byte], Array[Byte]](topic, partition, offset, key, value) - stringDeSerialization.deserialize(cRecord) - stringSerialization.serialize("test", System.currentTimeMillis()) - stringDeSerialization.isEndOfStream("") should be(false) - val map = new util.HashMap[String, AnyRef]() + stringDeSerialization.deserialize(cRecord,collector) + stringSerialization.serialize("test",context, System.currentTimeMillis()) + stringDeSerialization.getProducedType should be + val map: util.Map[String, AnyRef] = new util.HashMap[String, AnyRef]() map.put("country_code", "IN") map.put("country", "INDIA") - mapSerialization.serialize(map, System.currentTimeMillis()) + mapSerialization.serialize(map,context, System.currentTimeMillis()) } "DataCache" should "be able to add the data into redis" in intercept[JedisDataException]{ diff --git a/hcx-pipeline-jobs/notification-job/src/test/java/org/swasth/dp/notification/task/NotificationStreamTaskTest.java b/hcx-pipeline-jobs/notification-job/src/test/java/org/swasth/dp/notification/task/NotificationStreamTaskTest.java index fc99225d3..4edc25df5 100644 --- a/hcx-pipeline-jobs/notification-job/src/test/java/org/swasth/dp/notification/task/NotificationStreamTaskTest.java +++ b/hcx-pipeline-jobs/notification-job/src/test/java/org/swasth/dp/notification/task/NotificationStreamTaskTest.java @@ -3,6 +3,7 @@ import com.google.gson.Gson; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; +import org.apache.flink.connector.kafka.source.KafkaSource; import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.test.util.MiniClusterWithClientResource; @@ -41,9 +42,10 @@ public class NotificationStreamTaskTest { @Before public void beforeClass() throws Exception { - when(mockKafkaUtil.kafkaMapSource(notificationConfig.kafkaInputTopic)).thenReturn(new NotificationSource()); - when(mockKafkaUtil.kafkaMapSource(notificationConfig.subscriptionInputTopic)).thenReturn(new SubscriptionSource()); - when(mockKafkaUtil.kafkaMapSource(notificationConfig.onSubscriptionInputTopic)).thenReturn(new SubscriptionSource()); + KafkaSource mockKafkaSource = mock(KafkaSource.class); + when(mockKafkaUtil.kafkaMapSource(notificationConfig.kafkaInputTopic)).thenReturn(mockKafkaSource); + when(mockKafkaUtil.kafkaMapSource(notificationConfig.subscriptionInputTopic)).thenReturn(mockKafkaSource); + when(mockKafkaUtil.kafkaMapSource(notificationConfig.onSubscriptionInputTopic)).thenReturn(mockKafkaSource); doNothing().when(mockAuditService).indexAudit(anyMap()); doNothing().when(mockEsUtil).addIndex(anyString(),anyString(),anyString(),anyString()); doNothing().when(mockEsUtil).addDocumentWithIndex(anyString(),anyString(),anyString()); From 30d673245884daf4169a3bb615e063a857840c09 Mon Sep 17 00:00:00 2001 From: KrutikaPhirangi <138781661+KrutikaPhirangi@users.noreply.github.com> Date: Thu, 7 Mar 2024 15:31:27 +0530 Subject: [PATCH 4/7] update --- .../org/swasth/dp/core/cache/DataCache.scala | 11 +++++------ .../core/function/BaseDispatcherFunction.scala | 7 +++---- .../function/ContextEnrichmentFunction.scala | 4 +--- .../SubscriptionEnrichmentFunction.scala | 8 +++----- .../org/swasth/dp/core/serde/MapSerde.scala | 16 +++++++--------- .../swasth/dp/core/service/AuditService.scala | 5 ++--- .../swasth/dp/core/service/RegistryService.scala | 6 +++--- 7 files changed, 24 insertions(+), 33 deletions(-) diff --git a/hcx-pipeline-jobs/core/src/main/scala/org/swasth/dp/core/cache/DataCache.scala b/hcx-pipeline-jobs/core/src/main/scala/org/swasth/dp/core/cache/DataCache.scala index e3a56a7b7..a64bf1aa3 100644 --- a/hcx-pipeline-jobs/core/src/main/scala/org/swasth/dp/core/cache/DataCache.scala +++ b/hcx-pipeline-jobs/core/src/main/scala/org/swasth/dp/core/cache/DataCache.scala @@ -1,13 +1,12 @@ package org.swasth.dp.core.cache -import java.util import com.google.gson.Gson import org.slf4j.LoggerFactory import org.swasth.dp.core.job.BaseJobConfig -import org.swasth.dp.core.util.Constants import redis.clients.jedis.Jedis import redis.clients.jedis.exceptions.{JedisConnectionException, JedisException} +import java.util import scala.collection.JavaConverters._ import scala.collection.mutable import scala.collection.mutable.Map @@ -43,10 +42,10 @@ class DataCache(val config: BaseJobConfig, val redisConnect: RedisConnect, val d redisValue.length > 0 && redisValue.startsWith("[") } - def isObject(value: String) = { - val redisValue = value.trim - redisValue.length > 0 && redisValue.startsWith("{") - } + def isObject(value: String) = { + val redisValue = value.trim + redisValue.length > 0 && redisValue.startsWith("{") + } def convertToComplexDataTypes(data: mutable.Map[String, String]): mutable.Map[String, AnyRef] = { val result = mutable.Map[String, AnyRef]() diff --git a/hcx-pipeline-jobs/core/src/main/scala/org/swasth/dp/core/function/BaseDispatcherFunction.scala b/hcx-pipeline-jobs/core/src/main/scala/org/swasth/dp/core/function/BaseDispatcherFunction.scala index 6f295920b..43d3ad8ab 100644 --- a/hcx-pipeline-jobs/core/src/main/scala/org/swasth/dp/core/function/BaseDispatcherFunction.scala +++ b/hcx-pipeline-jobs/core/src/main/scala/org/swasth/dp/core/function/BaseDispatcherFunction.scala @@ -11,7 +11,6 @@ import org.swasth.dp.core.job.{BaseJobConfig, BaseProcessFunction, Metrics} import org.swasth.dp.core.service.AuditService import org.swasth.dp.core.util._ -import java.util import java.util.{Calendar, UUID} case class Response(timestamp: Long, correlation_id: String, error: Option[ErrorResponse]) @@ -23,13 +22,13 @@ case class ValidationResult(status: Boolean, error: Option[ErrorResponse]) case class DispatcherResult(success: Boolean, statusCode: Int, error: Option[ErrorResponse], retry: Boolean) abstract class BaseDispatcherFunction(config: BaseJobConfig) - extends BaseProcessFunction[util.Map[String, AnyRef], util.Map[String, AnyRef]](config) { + extends BaseProcessFunction[java.util.Map[String, AnyRef], java.util.Map[String, AnyRef]](config) { private[this] val logger = LoggerFactory.getLogger(classOf[BaseDispatcherFunction]) var postgresConnect: PostgresConnect = _ var auditService: AuditService = _ - var payload: util.Map[String, AnyRef] = _ + var payload: java.util.Map[String, AnyRef] = _ override def open(parameters: Configuration): Unit = { @@ -83,7 +82,7 @@ abstract class BaseDispatcherFunction(config: BaseJobConfig) errorMap } - def dispatchErrorResponse(event: java.util.Map[String, AnyRef], error: Option[ErrorResponse], correlationId: String, payloadRefId: String, senderCtx: util.Map[String, AnyRef], context: ProcessFunction[util.Map[String, AnyRef], util.Map[String, AnyRef]]#Context, metrics: Metrics): Unit = { + def dispatchErrorResponse(event: java.util.Map[String, AnyRef], error: Option[ErrorResponse], correlationId: String, payloadRefId: String, senderCtx: java.util.Map[String, AnyRef], context: ProcessFunction[java.util.Map[String, AnyRef], java.util.Map[String, AnyRef]]#Context, metrics: Metrics): Unit = { val protectedMap = new java.util.HashMap[String, AnyRef] //Update sender code protectedMap.put(Constants.HCX_SENDER_CODE, config.hcxRegistryCode) diff --git a/hcx-pipeline-jobs/core/src/main/scala/org/swasth/dp/core/function/ContextEnrichmentFunction.scala b/hcx-pipeline-jobs/core/src/main/scala/org/swasth/dp/core/function/ContextEnrichmentFunction.scala index ac4256007..30356fc47 100644 --- a/hcx-pipeline-jobs/core/src/main/scala/org/swasth/dp/core/function/ContextEnrichmentFunction.scala +++ b/hcx-pipeline-jobs/core/src/main/scala/org/swasth/dp/core/function/ContextEnrichmentFunction.scala @@ -8,10 +8,8 @@ import org.slf4j.LoggerFactory import org.swasth.dp.core.job.{BaseJobConfig, BaseProcessFunction, Metrics} import org.swasth.dp.core.util.Constants -import java.util - class ContextEnrichmentFunction(config: BaseJobConfig) (implicit val stringTypeInfo: TypeInformation[String]) - extends BaseProcessFunction[util.Map[String, AnyRef], util.Map[String, AnyRef]](config) { + extends BaseProcessFunction[java.util.Map[String, AnyRef], java.util.Map[String, AnyRef]](config) { private[this] val logger = LoggerFactory.getLogger(classOf[ContextEnrichmentFunction]) diff --git a/hcx-pipeline-jobs/core/src/main/scala/org/swasth/dp/core/function/SubscriptionEnrichmentFunction.scala b/hcx-pipeline-jobs/core/src/main/scala/org/swasth/dp/core/function/SubscriptionEnrichmentFunction.scala index 1ab63d7c2..eec50d525 100644 --- a/hcx-pipeline-jobs/core/src/main/scala/org/swasth/dp/core/function/SubscriptionEnrichmentFunction.scala +++ b/hcx-pipeline-jobs/core/src/main/scala/org/swasth/dp/core/function/SubscriptionEnrichmentFunction.scala @@ -7,23 +7,21 @@ import org.apache.flink.streaming.api.functions.ProcessFunction import org.swasth.dp.core.job.{BaseJobConfig, BaseProcessFunction, Metrics} import org.swasth.dp.core.util.Constants -import java.util - class SubscriptionEnrichmentFunction(config: BaseJobConfig) (implicit val stringTypeInfo: TypeInformation[String]) - extends BaseProcessFunction[util.Map[String, AnyRef], util.Map[String, AnyRef]](config) { + extends BaseProcessFunction[java.util.Map[String, AnyRef], java.util.Map[String, AnyRef]](config) { override def open(parameters: Configuration): Unit = { super.open(parameters) } - override def processElement(event: util.Map[String, AnyRef], context: ProcessFunction[util.Map[String, AnyRef], util.Map[String, AnyRef]]#Context, metrics: Metrics): Unit = { + override def processElement(event: java.util.Map[String, AnyRef], context: ProcessFunction[java.util.Map[String, AnyRef], java.util.Map[String, AnyRef]]#Context, metrics: Metrics): Unit = { val senderCode: String = event.get(Constants.HCX_SENDER_CODE).asInstanceOf[String] val recipientCode: String = event.get(Constants.HCX_RECIPIENT_CODE).asInstanceOf[String] val action: String = event.get(Constants.ACTION).asInstanceOf[String] Console.println(s"Sender: $senderCode : Recipient: $recipientCode : Action: $action") - val result: util.Map[String, AnyRef] = new util.HashMap[String, AnyRef]() + val result: java.util.Map[String, AnyRef] = new java.util.HashMap[String, AnyRef]() // Fetch the sender and receiver details from registry or cache val sender = fetchDetails(senderCode) diff --git a/hcx-pipeline-jobs/core/src/main/scala/org/swasth/dp/core/serde/MapSerde.scala b/hcx-pipeline-jobs/core/src/main/scala/org/swasth/dp/core/serde/MapSerde.scala index 74dc9cd75..ac8773498 100644 --- a/hcx-pipeline-jobs/core/src/main/scala/org/swasth/dp/core/serde/MapSerde.scala +++ b/hcx-pipeline-jobs/core/src/main/scala/org/swasth/dp/core/serde/MapSerde.scala @@ -10,29 +10,27 @@ import org.apache.kafka.clients.producer.ProducerRecord import org.swasth.dp.core.util.JSONUtil import java.nio.charset.StandardCharsets -import java.util -import scala.collection.mutable -class MapDeserializationSchema extends KafkaRecordDeserializationSchema[util.Map[String, AnyRef]] { +class MapDeserializationSchema extends KafkaRecordDeserializationSchema[java.util.Map[String, AnyRef]] { private val serialVersionUID = -3224825136576915426L - override def getProducedType: TypeInformation[util.Map[String, AnyRef]] = TypeExtractor.getForClass(classOf[util.Map[String, AnyRef]]) + override def getProducedType: TypeInformation[java.util.Map[String, AnyRef]] = TypeExtractor.getForClass(classOf[java.util.Map[String, AnyRef]]) - override def deserialize(record: ConsumerRecord[Array[Byte], Array[Byte]], out: Collector[util.Map[String, AnyRef]]): Unit = { - val msg = JSONUtil.deserialize[util.Map[String, AnyRef]](record.value()) + override def deserialize(record: ConsumerRecord[Array[Byte], Array[Byte]], out: Collector[java.util.Map[String, AnyRef]]): Unit = { + val msg = JSONUtil.deserialize[java.util.Map[String, AnyRef]](record.value()) out.collect(msg) } } -class MapSerializationSchema(topic: String, key: Option[String] = None) extends KafkaRecordSerializationSchema[util.Map[String, AnyRef]] { +class MapSerializationSchema(topic: String, key: Option[String] = None) extends KafkaRecordSerializationSchema[java.util.Map[String, AnyRef]] { private val serialVersionUID = -4284080856874185929L - override def serialize(element: util.Map[String, AnyRef], context: KafkaRecordSerializationSchema.KafkaSinkContext, timestamp: java.lang.Long): ProducerRecord[Array[Byte], Array[Byte]] = { + override def serialize(element: java.util.Map[String, AnyRef], context: KafkaRecordSerializationSchema.KafkaSinkContext, timestamp: java.lang.Long): ProducerRecord[Array[Byte], Array[Byte]] = { val out = JSONUtil.serialize(element) key.map { kafkaKey => new ProducerRecord[Array[Byte], Array[Byte]](topic, kafkaKey.getBytes(StandardCharsets.UTF_8), out.getBytes(StandardCharsets.UTF_8)) }.getOrElse(new ProducerRecord[Array[Byte], Array[Byte]](topic, out.getBytes(StandardCharsets.UTF_8))) } -} \ No newline at end of file +} diff --git a/hcx-pipeline-jobs/core/src/main/scala/org/swasth/dp/core/service/AuditService.scala b/hcx-pipeline-jobs/core/src/main/scala/org/swasth/dp/core/service/AuditService.scala index 453477033..4228f507c 100644 --- a/hcx-pipeline-jobs/core/src/main/scala/org/swasth/dp/core/service/AuditService.scala +++ b/hcx-pipeline-jobs/core/src/main/scala/org/swasth/dp/core/service/AuditService.scala @@ -4,7 +4,6 @@ import org.slf4j.LoggerFactory import org.swasth.dp.core.job.BaseJobConfig import org.swasth.dp.core.util.{Constants, ElasticSearchUtil, JSONUtil} -import java.util import java.util.{Calendar, Date, TimeZone} class AuditService(config: BaseJobConfig) { @@ -12,11 +11,11 @@ class AuditService(config: BaseJobConfig) { private[this] val logger = LoggerFactory.getLogger(classOf[AuditService]) val esUtil = new ElasticSearchUtil(config.esUrl, config.auditIndex, config.batchSize) - def indexAudit(auditEvent: util.Map[String, AnyRef]): Unit ={ + def indexAudit(auditEvent: java.util.Map[String, AnyRef]): Unit ={ indexAudit(config.auditIndex, config.auditIndex, auditEvent) } - def indexAudit(index: String, indexAlias: String, auditEvent: util.Map[String, AnyRef]): Unit = { + def indexAudit(index: String, indexAlias: String, auditEvent: java.util.Map[String, AnyRef]): Unit = { try { val settings = "{ \"index\": { } }" val mappings = "{ \"properties\": { \"eid\": { \"type\": \"text\" }, \"x-hcx-sender_code\": { \"type\": \"keyword\" }, \"x-hcx-recipient_code\": { \"type\": \"keyword\" }, \"x-hcx-api_call_id\": { \"type\": \"keyword\" }, \"x-hcx-correlation_id\": { \"type\": \"keyword\" }, \"x-hcx-workflow_id\": { \"type\": \"keyword\" }, \"x-hcx-timestamp\": { \"type\": \"date\" }, \"mid\": { \"type\": \"keyword\" }, \"action\": { \"type\": \"keyword\" }, \"x-hcx-status\": { \"type\": \"keyword\" }, \"ets\": { \"type\": \"long\" }, \"requestTimeStamp\": { \"type\": \"long\" }, \"updatedTimestamp\": { \"type\": \"long\" }, \"x-hcx-error_details\": { \"type\": \"object\" }, \"x-hcx-debug_details\": { \"type\": \"object\" }, \"senderRole\": { \"type\": \"keyword\" }, \"recipientRole\": { \"type\": \"keyword\" }, \"payload\": { \"type\": \"text\" }, \"topic_code\": { \"type\": \"keyword\" }, \"senderName\": { \"type\": \"keyword\" }, \"recipientName\": { \"type\": \"keyword\" }, \"senderPrimaryEmail\": { \"type\": \"keyword\" }, \"recipientPrimaryEmail\": { \"type\": \"keyword\" }, \"subscription_id\": { \"type\": \"keyword\" }, \"subscription_status\": { \"type\": \"keyword\" }, \"x-hcx-notification_headers\": { \"type\": \"object\" },\"tags\": { \"type\": \"keyword\" },\"payload_size\":{\"type\": \"integer\"},\"user_id\":{\"type\": \"keyword\"},\"channel\":{\"type\": \"keyword\"} } } }"; diff --git a/hcx-pipeline-jobs/core/src/main/scala/org/swasth/dp/core/service/RegistryService.scala b/hcx-pipeline-jobs/core/src/main/scala/org/swasth/dp/core/service/RegistryService.scala index 23e727306..8f1fe8598 100644 --- a/hcx-pipeline-jobs/core/src/main/scala/org/swasth/dp/core/service/RegistryService.scala +++ b/hcx-pipeline-jobs/core/src/main/scala/org/swasth/dp/core/service/RegistryService.scala @@ -13,7 +13,7 @@ class RegistryService(config: BaseJobConfig) { val httpClient: CloseableHttpClient = new HttpUtil().getHttpClient() - def getParticipantDetails(filters: String): util.ArrayList[util.Map[String, AnyRef]]= { + def getParticipantDetails(filters: String): util.ArrayList[java.util.Map[String, AnyRef]]= { // payload for registry search val payload = s"""{"entityType":["Organisation"],"filters":$filters}""" //Console.println("registry payload", payload) @@ -27,10 +27,10 @@ class RegistryService(config: BaseJobConfig) { //Console.println("registryAPI statusCode", statusCode) val entity = response.getEntity val inputStream = entity.getContent - val content = JSONUtil.deserialize[util.Map[String, AnyRef]](Source.fromInputStream(inputStream, "UTF-8").getLines.mkString) + val content = JSONUtil.deserialize[java.util.Map[String, AnyRef]](Source.fromInputStream(inputStream, "UTF-8").getLines.mkString) inputStream.close() response.close() - content.get(Constants.PARTICIPANTS).asInstanceOf[util.ArrayList[util.Map[String, AnyRef]]] + content.get(Constants.PARTICIPANTS).asInstanceOf[util.ArrayList[java.util.Map[String, AnyRef]]] } catch { case ex: Exception => { ex.printStackTrace() From 3656ea533dabd899dcd374bcf854d014b75f0e6d Mon Sep 17 00:00:00 2001 From: KrutikaPhirangi <138781661+KrutikaPhirangi@users.noreply.github.com> Date: Thu, 7 Mar 2024 15:41:57 +0530 Subject: [PATCH 5/7] update --- .../dp/core/job/BaseDeduplication.scala | 4 +-- .../swasth/dp/core/job/BaseJobConfig.scala | 12 +++---- .../dp/core/job/BaseProcessFunction.scala | 34 +++++++++---------- .../dp/core/job/FlinkKafkaConnector.scala | 9 +++-- 4 files changed, 29 insertions(+), 30 deletions(-) diff --git a/hcx-pipeline-jobs/core/src/main/scala/org/swasth/dp/core/job/BaseDeduplication.scala b/hcx-pipeline-jobs/core/src/main/scala/org/swasth/dp/core/job/BaseDeduplication.scala index 7cf1cfb21..c27ec5bd2 100644 --- a/hcx-pipeline-jobs/core/src/main/scala/org/swasth/dp/core/job/BaseDeduplication.scala +++ b/hcx-pipeline-jobs/core/src/main/scala/org/swasth/dp/core/job/BaseDeduplication.scala @@ -66,11 +66,11 @@ trait BaseDeduplication { val flags: util.HashMap[String, Boolean] = new util.HashMap[String, Boolean]() flags.put(flagName, value) if (event.isInstanceOf[String]) { - val eventMap = gson.fromJson(event.toString, new util.LinkedHashMap[String, AnyRef]().getClass).asInstanceOf[util.Map[String, AnyRef]] + val eventMap = gson.fromJson(event.toString, new util.LinkedHashMap[String, AnyRef]().getClass).asInstanceOf[java.util.Map[String, AnyRef]] eventMap.put("flags", flags.asInstanceOf[util.HashMap[String, AnyRef]]) eventMap.asInstanceOf[R] } else { - event.asInstanceOf[util.Map[String, AnyRef]].put("flags", flags.asInstanceOf[util.HashMap[String, AnyRef]]) + event.asInstanceOf[java.util.Map[String, AnyRef]].put("flags", flags.asInstanceOf[util.HashMap[String, AnyRef]]) event.asInstanceOf[R] } } diff --git a/hcx-pipeline-jobs/core/src/main/scala/org/swasth/dp/core/job/BaseJobConfig.scala b/hcx-pipeline-jobs/core/src/main/scala/org/swasth/dp/core/job/BaseJobConfig.scala index ebf456b28..c448e8c4a 100644 --- a/hcx-pipeline-jobs/core/src/main/scala/org/swasth/dp/core/job/BaseJobConfig.scala +++ b/hcx-pipeline-jobs/core/src/main/scala/org/swasth/dp/core/job/BaseJobConfig.scala @@ -15,7 +15,7 @@ class BaseJobConfig(val config: Config, val jobName: String) extends Serializabl private val serialVersionUID = - 4515020556926788923L implicit val metricTypeInfo: TypeInformation[String] = TypeExtractor.getForClass(classOf[String]) - implicit val mapTypeInfo: TypeInformation[util.Map[String, AnyRef]] = TypeExtractor.getForClass(classOf[util.Map[String, AnyRef]]) + implicit val mapTypeInfo: TypeInformation[java.util.Map[String, AnyRef]] = TypeExtractor.getForClass(classOf[java.util.Map[String, AnyRef]]) implicit val objectTypeInfo: TypeInformation[Object] = TypeExtractor.getForClass(classOf[Object]) val kafkaBrokerServers: String = config.getString("kafka.broker-servers") @@ -49,9 +49,9 @@ class BaseJobConfig(val config: Config, val jobName: String) extends Serializabl val checkpointingTimeout: Long = if (config.hasPath("task.checkpointing.timeout")) config.getLong("task.checkpointing.timeout") else 1800000L // Default output configurations - val enrichedOutputTag: OutputTag[util.Map[String, AnyRef]] = OutputTag[util.Map[String, AnyRef]]("enriched-events") - val dispatcherOutputTag: OutputTag[util.Map[String, AnyRef]] = OutputTag[util.Map[String, AnyRef]]("dispatched-events") - val enrichedSubscriptionsOutputTag: OutputTag[util.Map[String, AnyRef]] = OutputTag[util.Map[String, AnyRef]]("enriched-subscription-events") + val enrichedOutputTag: OutputTag[java.util.Map[String, AnyRef]] = OutputTag[java.util.Map[String, AnyRef]]("enriched-events") + val dispatcherOutputTag: OutputTag[java.util.Map[String, AnyRef]] = OutputTag[java.util.Map[String, AnyRef]]("dispatched-events") + val enrichedSubscriptionsOutputTag: OutputTag[java.util.Map[String, AnyRef]] = OutputTag[java.util.Map[String, AnyRef]]("enriched-subscription-events") val auditOutputTag: OutputTag[String] = OutputTag[String]("audit-events") val auditTopic = if (config.hasPath("kafka.audit.topic")) config.getString("kafka.audit.topic") else "" @@ -176,8 +176,8 @@ class BaseJobConfig(val config: Config, val jobName: String) extends Serializabl val successCodes: util.List[Integer] = config.getIntList("errorCodes.successCodes") val errorCodes: util.List[Integer] = config.getIntList("errorCodes.errorCodes") - val subscribeOutputTag: OutputTag[util.Map[String, AnyRef]] = OutputTag[util.Map[String, AnyRef]]("subscribed-events") - val onSubscribeOutputTag: OutputTag[util.Map[String, AnyRef]] = OutputTag[util.Map[String, AnyRef]]("on-subscribed-events") + val subscribeOutputTag: OutputTag[java.util.Map[String, AnyRef]] = OutputTag[java.util.Map[String, AnyRef]]("subscribed-events") + val onSubscribeOutputTag: OutputTag[java.util.Map[String, AnyRef]] = OutputTag[java.util.Map[String, AnyRef]]("on-subscribed-events") val hcxInstanceName: String = config.getString("hcx.instanceName") diff --git a/hcx-pipeline-jobs/core/src/main/scala/org/swasth/dp/core/job/BaseProcessFunction.scala b/hcx-pipeline-jobs/core/src/main/scala/org/swasth/dp/core/job/BaseProcessFunction.scala index 8e52fdea9..c83b36184 100644 --- a/hcx-pipeline-jobs/core/src/main/scala/org/swasth/dp/core/job/BaseProcessFunction.scala +++ b/hcx-pipeline-jobs/core/src/main/scala/org/swasth/dp/core/job/BaseProcessFunction.scala @@ -60,28 +60,28 @@ abstract class BaseProcessFunction[T, R](config: BaseJobConfig) extends ProcessF processElement(event, context, metrics) } - def getProtocolStringValue(event: util.Map[String, AnyRef], key: String): String = { - event.get(Constants.HEADERS).asInstanceOf[util.Map[String, AnyRef]].get(Constants.PROTOCOL).asInstanceOf[util.Map[String, AnyRef]].getOrDefault(key,"").asInstanceOf[String] + def getProtocolStringValue(event: java.util.Map[String, AnyRef], key: String): String = { + event.get(Constants.HEADERS).asInstanceOf[java.util.Map[String, AnyRef]].get(Constants.PROTOCOL).asInstanceOf[java.util.Map[String, AnyRef]].getOrDefault(key,"").asInstanceOf[String] } - def getProtocolMapValue(event: util.Map[String, AnyRef], key: String): util.Map[String, AnyRef] = { - event.get(Constants.HEADERS).asInstanceOf[util.Map[String, AnyRef]].get(Constants.PROTOCOL).asInstanceOf[util.Map[String, AnyRef]].getOrDefault(key,new util.HashMap[String, AnyRef]()).asInstanceOf[util.Map[String, AnyRef]] + def getProtocolMapValue(event: java.util.Map[String, AnyRef], key: String): java.util.Map[String, AnyRef] = { + event.get(Constants.HEADERS).asInstanceOf[java.util.Map[String, AnyRef]].get(Constants.PROTOCOL).asInstanceOf[java.util.Map[String, AnyRef]].getOrDefault(key,new util.HashMap[String, AnyRef]()).asInstanceOf[java.util.Map[String, AnyRef]] } - def getCDataListValue(event: util.Map[String, AnyRef], participant: String, key: String): util.List[String] = { - event.getOrDefault(Constants.CDATA, new util.HashMap[String, AnyRef]()).asInstanceOf[util.Map[String, AnyRef]].getOrDefault(participant, new util.HashMap[String, AnyRef]()).asInstanceOf[util.Map[String, AnyRef]].getOrDefault(key, new util.ArrayList[String]()).asInstanceOf[util.List[String]] } + def getCDataListValue(event: java.util.Map[String, AnyRef], participant: String, key: String): util.List[String] = { + event.getOrDefault(Constants.CDATA, new util.HashMap[String, AnyRef]()).asInstanceOf[java.util.Map[String, AnyRef]].getOrDefault(participant, new util.HashMap[String, AnyRef]()).asInstanceOf[java.util.Map[String, AnyRef]].getOrDefault(key, new util.ArrayList[String]()).asInstanceOf[util.List[String]] } - def getCDataStringValue(event: util.Map[String, AnyRef], participant: String, key: String): String = { - event.getOrDefault(Constants.CDATA, new util.HashMap[String, AnyRef]()).asInstanceOf[util.Map[String, AnyRef]].getOrDefault(participant, new util.HashMap[String, AnyRef]()).asInstanceOf[util.Map[String, AnyRef]].get(key).asInstanceOf[String] + def getCDataStringValue(event: java.util.Map[String, AnyRef], participant: String, key: String): String = { + event.getOrDefault(Constants.CDATA, new util.HashMap[String, AnyRef]()).asInstanceOf[java.util.Map[String, AnyRef]].getOrDefault(participant, new util.HashMap[String, AnyRef]()).asInstanceOf[java.util.Map[String, AnyRef]].get(key).asInstanceOf[String] } - def setStatus(event: util.Map[String, AnyRef], status: String): Unit = { - if(Constants.ALLOWED_STATUS_UPDATE.contains(event.get(Constants.HEADERS).asInstanceOf[util.Map[String, AnyRef]].get(Constants.PROTOCOL).asInstanceOf[util.Map[String, AnyRef]].getOrDefault(Constants.HCX_STATUS, ""))) - event.get(Constants.HEADERS).asInstanceOf[util.Map[String, AnyRef]].get(Constants.PROTOCOL).asInstanceOf[util.Map[String, AnyRef]].put(Constants.HCX_STATUS, status) + def setStatus(event: java.util.Map[String, AnyRef], status: String): Unit = { + if(Constants.ALLOWED_STATUS_UPDATE.contains(event.get(Constants.HEADERS).asInstanceOf[java.util.Map[String, AnyRef]].get(Constants.PROTOCOL).asInstanceOf[java.util.Map[String, AnyRef]].getOrDefault(Constants.HCX_STATUS, ""))) + event.get(Constants.HEADERS).asInstanceOf[java.util.Map[String, AnyRef]].get(Constants.PROTOCOL).asInstanceOf[java.util.Map[String, AnyRef]].put(Constants.HCX_STATUS, status) } - def setErrorDetails(event: util.Map[String, AnyRef], errorDetails: util.Map[String, AnyRef]): Unit ={ - event.get(Constants.HEADERS).asInstanceOf[util.Map[String, AnyRef]].get(Constants.PROTOCOL).asInstanceOf[util.Map[String, AnyRef]].put(Constants.ERROR_DETAILS, errorDetails) + def setErrorDetails(event: java.util.Map[String, AnyRef], errorDetails: java.util.Map[String, AnyRef]): Unit ={ + event.get(Constants.HEADERS).asInstanceOf[java.util.Map[String, AnyRef]].get(Constants.PROTOCOL).asInstanceOf[java.util.Map[String, AnyRef]].put(Constants.ERROR_DETAILS, errorDetails) } def getReplacedAction(actionStr: String): String = { @@ -92,7 +92,7 @@ abstract class BaseProcessFunction[T, R](config: BaseJobConfig) extends ProcessF replacedAction } - def createSenderContext(sender: util.Map[String, AnyRef],actionStr: String): util.Map[String, AnyRef] = { + def createSenderContext(sender: java.util.Map[String, AnyRef],actionStr: String): java.util.Map[String, AnyRef] = { //Sender Details var endpointUrl = sender.getOrDefault(Constants.END_POINT, "").asInstanceOf[String] if (!StringUtils.isEmpty(endpointUrl)) { @@ -109,7 +109,7 @@ abstract class BaseProcessFunction[T, R](config: BaseJobConfig) extends ProcessF } - def createRecipientContext(receiver: util.Map[String, AnyRef],actionStr: String): util.Map[String, AnyRef] = { + def createRecipientContext(receiver: java.util.Map[String, AnyRef],actionStr: String): java.util.Map[String, AnyRef] = { //Receiver Details var endpointUrl = receiver.get(Constants.END_POINT).asInstanceOf[String] if (!StringUtils.isEmpty(endpointUrl)) { @@ -123,7 +123,7 @@ abstract class BaseProcessFunction[T, R](config: BaseJobConfig) extends ProcessF } - def fetchDetails(code: String): util.Map[String, AnyRef] = { + def fetchDetails(code: String): java.util.Map[String, AnyRef] = { try { if (registryDataCache.isExists(code)) { Console.println("Getting details from cache for :" + code) @@ -144,7 +144,7 @@ abstract class BaseProcessFunction[T, R](config: BaseJobConfig) extends ProcessF } } - def getDetails(code: String): util.Map[String, AnyRef] = { + def getDetails(code: String): java.util.Map[String, AnyRef] = { val key = Constants.PARTICIPANT_CODE val responseBody = registryService.getParticipantDetails(s"""{"$key":{"eq":"$code"}}""") if (!responseBody.isEmpty) { diff --git a/hcx-pipeline-jobs/core/src/main/scala/org/swasth/dp/core/job/FlinkKafkaConnector.scala b/hcx-pipeline-jobs/core/src/main/scala/org/swasth/dp/core/job/FlinkKafkaConnector.scala index 3ae504def..9e8db6d43 100644 --- a/hcx-pipeline-jobs/core/src/main/scala/org/swasth/dp/core/job/FlinkKafkaConnector.scala +++ b/hcx-pipeline-jobs/core/src/main/scala/org/swasth/dp/core/job/FlinkKafkaConnector.scala @@ -4,20 +4,19 @@ import org.apache.flink.connector.base.DeliveryGuarantee import org.apache.flink.connector.kafka.sink.KafkaSink import org.apache.flink.connector.kafka.source.KafkaSource import org.swasth.dp.core.serde._ -import java.util class FlinkKafkaConnector(config: BaseJobConfig) extends Serializable { - def kafkaMapSource(kafkaTopic: String): KafkaSource[util.Map[String, AnyRef]] = { - KafkaSource.builder[util.Map[String, AnyRef]]() + def kafkaMapSource(kafkaTopic: String): KafkaSource[java.util.Map[String, AnyRef]] = { + KafkaSource.builder[java.util.Map[String, AnyRef]]() .setTopics(kafkaTopic) .setDeserializer(new MapDeserializationSchema) .setProperties(config.kafkaConsumerProperties) .build() } - def kafkaMapSink(kafkaTopic: String): KafkaSink[util.Map[String, AnyRef]] = { - KafkaSink.builder[util.Map[String, AnyRef]]() + def kafkaMapSink(kafkaTopic: String): KafkaSink[java.util.Map[String, AnyRef]] = { + KafkaSink.builder[java.util.Map[String, AnyRef]]() .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE) .setRecordSerializer(new MapSerializationSchema(kafkaTopic)) .setKafkaProducerConfig(config.kafkaProducerProperties) From 4a07c5d8eebe8d242031f88d12fd5493e3452ffc Mon Sep 17 00:00:00 2001 From: KrutikaPhirangi <138781661+KrutikaPhirangi@users.noreply.github.com> Date: Tue, 12 Mar 2024 11:13:02 +0530 Subject: [PATCH 6/7] Update --- .../org/swasth/dp/core/job/BaseDeduplication.scala | 4 ++-- .../org/swasth/dp/core/job/BaseProcessFunction.scala | 12 ++++++------ .../scala/org/swasth/dp/core/serde/MapSerde.scala | 4 ++-- 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/hcx-pipeline-jobs/core/src/main/scala/org/swasth/dp/core/job/BaseDeduplication.scala b/hcx-pipeline-jobs/core/src/main/scala/org/swasth/dp/core/job/BaseDeduplication.scala index c27ec5bd2..7cf1cfb21 100644 --- a/hcx-pipeline-jobs/core/src/main/scala/org/swasth/dp/core/job/BaseDeduplication.scala +++ b/hcx-pipeline-jobs/core/src/main/scala/org/swasth/dp/core/job/BaseDeduplication.scala @@ -66,11 +66,11 @@ trait BaseDeduplication { val flags: util.HashMap[String, Boolean] = new util.HashMap[String, Boolean]() flags.put(flagName, value) if (event.isInstanceOf[String]) { - val eventMap = gson.fromJson(event.toString, new util.LinkedHashMap[String, AnyRef]().getClass).asInstanceOf[java.util.Map[String, AnyRef]] + val eventMap = gson.fromJson(event.toString, new util.LinkedHashMap[String, AnyRef]().getClass).asInstanceOf[util.Map[String, AnyRef]] eventMap.put("flags", flags.asInstanceOf[util.HashMap[String, AnyRef]]) eventMap.asInstanceOf[R] } else { - event.asInstanceOf[java.util.Map[String, AnyRef]].put("flags", flags.asInstanceOf[util.HashMap[String, AnyRef]]) + event.asInstanceOf[util.Map[String, AnyRef]].put("flags", flags.asInstanceOf[util.HashMap[String, AnyRef]]) event.asInstanceOf[R] } } diff --git a/hcx-pipeline-jobs/core/src/main/scala/org/swasth/dp/core/job/BaseProcessFunction.scala b/hcx-pipeline-jobs/core/src/main/scala/org/swasth/dp/core/job/BaseProcessFunction.scala index c83b36184..f0176251c 100644 --- a/hcx-pipeline-jobs/core/src/main/scala/org/swasth/dp/core/job/BaseProcessFunction.scala +++ b/hcx-pipeline-jobs/core/src/main/scala/org/swasth/dp/core/job/BaseProcessFunction.scala @@ -65,14 +65,14 @@ abstract class BaseProcessFunction[T, R](config: BaseJobConfig) extends ProcessF } def getProtocolMapValue(event: java.util.Map[String, AnyRef], key: String): java.util.Map[String, AnyRef] = { - event.get(Constants.HEADERS).asInstanceOf[java.util.Map[String, AnyRef]].get(Constants.PROTOCOL).asInstanceOf[java.util.Map[String, AnyRef]].getOrDefault(key,new util.HashMap[String, AnyRef]()).asInstanceOf[java.util.Map[String, AnyRef]] + event.get(Constants.HEADERS).asInstanceOf[java.util.Map[String, AnyRef]].get(Constants.PROTOCOL).asInstanceOf[java.util.Map[String, AnyRef]].getOrDefault(key,new java.util.HashMap[String, AnyRef]()).asInstanceOf[java.util.Map[String, AnyRef]] } def getCDataListValue(event: java.util.Map[String, AnyRef], participant: String, key: String): util.List[String] = { - event.getOrDefault(Constants.CDATA, new util.HashMap[String, AnyRef]()).asInstanceOf[java.util.Map[String, AnyRef]].getOrDefault(participant, new util.HashMap[String, AnyRef]()).asInstanceOf[java.util.Map[String, AnyRef]].getOrDefault(key, new util.ArrayList[String]()).asInstanceOf[util.List[String]] } + event.getOrDefault(Constants.CDATA, new java.util.HashMap[String, AnyRef]()).asInstanceOf[java.util.Map[String, AnyRef]].getOrDefault(participant, new java.util.HashMap[String, AnyRef]()).asInstanceOf[java.util.Map[String, AnyRef]].getOrDefault(key, new util.ArrayList[String]()).asInstanceOf[util.List[String]] } def getCDataStringValue(event: java.util.Map[String, AnyRef], participant: String, key: String): String = { - event.getOrDefault(Constants.CDATA, new util.HashMap[String, AnyRef]()).asInstanceOf[java.util.Map[String, AnyRef]].getOrDefault(participant, new util.HashMap[String, AnyRef]()).asInstanceOf[java.util.Map[String, AnyRef]].get(key).asInstanceOf[String] + event.getOrDefault(Constants.CDATA, new java.util.HashMap[String, AnyRef]()).asInstanceOf[java.util.Map[String, AnyRef]].getOrDefault(participant, new java.util.HashMap[String, AnyRef]()).asInstanceOf[java.util.Map[String, AnyRef]].get(key).asInstanceOf[String] } def setStatus(event: java.util.Map[String, AnyRef], status: String): Unit = { @@ -105,7 +105,7 @@ abstract class BaseProcessFunction[T, R](config: BaseJobConfig) extends ProcessF val appendedSenderUrl = endpointUrl.concat(replacedAction) sender.put(Constants.END_POINT, appendedSenderUrl) sender - } else new util.HashMap[String, AnyRef]() + } else new java.util.HashMap[String, AnyRef]() } @@ -119,7 +119,7 @@ abstract class BaseProcessFunction[T, R](config: BaseJobConfig) extends ProcessF val appendedReceiverUrl = endpointUrl.concat(actionStr) receiver.put(Constants.END_POINT, appendedReceiverUrl) receiver - } else new util.HashMap[String, AnyRef]() + } else new java.util.HashMap[String, AnyRef]() } @@ -151,7 +151,7 @@ abstract class BaseProcessFunction[T, R](config: BaseJobConfig) extends ProcessF val collectionMap = responseBody.get(0) collectionMap } else { - new util.HashMap[String, AnyRef] + new java.util.HashMap[String, AnyRef] } } } diff --git a/hcx-pipeline-jobs/core/src/main/scala/org/swasth/dp/core/serde/MapSerde.scala b/hcx-pipeline-jobs/core/src/main/scala/org/swasth/dp/core/serde/MapSerde.scala index ac8773498..ccc258128 100644 --- a/hcx-pipeline-jobs/core/src/main/scala/org/swasth/dp/core/serde/MapSerde.scala +++ b/hcx-pipeline-jobs/core/src/main/scala/org/swasth/dp/core/serde/MapSerde.scala @@ -18,8 +18,8 @@ class MapDeserializationSchema extends KafkaRecordDeserializationSchema[java.uti override def getProducedType: TypeInformation[java.util.Map[String, AnyRef]] = TypeExtractor.getForClass(classOf[java.util.Map[String, AnyRef]]) override def deserialize(record: ConsumerRecord[Array[Byte], Array[Byte]], out: Collector[java.util.Map[String, AnyRef]]): Unit = { - val msg = JSONUtil.deserialize[java.util.Map[String, AnyRef]](record.value()) - out.collect(msg) + val recordMap = JSONUtil.deserialize[java.util.HashMap[String, AnyRef]](record.value()) + out.collect(recordMap) } } From 006879388d5f6eec4ab2d9c9eca4809ab8da12f8 Mon Sep 17 00:00:00 2001 From: KrutikaPhirangi <138781661+KrutikaPhirangi@users.noreply.github.com> Date: Wed, 13 Mar 2024 11:02:54 +0530 Subject: [PATCH 7/7] Update MapSerde.scala --- .../src/main/scala/org/swasth/dp/core/serde/MapSerde.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/hcx-pipeline-jobs/core/src/main/scala/org/swasth/dp/core/serde/MapSerde.scala b/hcx-pipeline-jobs/core/src/main/scala/org/swasth/dp/core/serde/MapSerde.scala index ccc258128..ac8773498 100644 --- a/hcx-pipeline-jobs/core/src/main/scala/org/swasth/dp/core/serde/MapSerde.scala +++ b/hcx-pipeline-jobs/core/src/main/scala/org/swasth/dp/core/serde/MapSerde.scala @@ -18,8 +18,8 @@ class MapDeserializationSchema extends KafkaRecordDeserializationSchema[java.uti override def getProducedType: TypeInformation[java.util.Map[String, AnyRef]] = TypeExtractor.getForClass(classOf[java.util.Map[String, AnyRef]]) override def deserialize(record: ConsumerRecord[Array[Byte], Array[Byte]], out: Collector[java.util.Map[String, AnyRef]]): Unit = { - val recordMap = JSONUtil.deserialize[java.util.HashMap[String, AnyRef]](record.value()) - out.collect(recordMap) + val msg = JSONUtil.deserialize[java.util.Map[String, AnyRef]](record.value()) + out.collect(msg) } }