From 7b11e9d678f33c9d9f9c38dd0fb8b38cbbc90fa7 Mon Sep 17 00:00:00 2001 From: "Miroslav Chomut (CZ)" Date: Fri, 4 Oct 2024 10:28:34 +0200 Subject: [PATCH] #17 Resource handling helper classes --- .../absa/kafkacase/examples/KafkaCase.scala | 123 ++++++++++++------ .../models/utils/ResourceHandler.scala | 62 +++++++++ 2 files changed, 143 insertions(+), 42 deletions(-) create mode 100644 models/src/main/scala/za/co/absa/kafkacase/models/utils/ResourceHandler.scala diff --git a/examples/src/main/scala/za/co/absa/kafkacase/examples/KafkaCase.scala b/examples/src/main/scala/za/co/absa/kafkacase/examples/KafkaCase.scala index eb0f80f..2f41f98 100644 --- a/examples/src/main/scala/za/co/absa/kafkacase/examples/KafkaCase.scala +++ b/examples/src/main/scala/za/co/absa/kafkacase/examples/KafkaCase.scala @@ -14,72 +14,111 @@ * limitations under the License. */ -package za.co.absa.kafkacase.examples +package za.co.absa.KafkaCase.Examples import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.clients.producer.ProducerConfig import za.co.absa.kafkacase.models.topics.EdlaChange +import za.co.absa.kafkacase.models.utils.ResourceHandler.withResource import za.co.absa.kafkacase.reader.ReaderImpl import za.co.absa.kafkacase.writer.WriterImpl import java.util.{Properties, UUID} +// scala3 only +// import scala.util.Using object KafkaCase { - private def writer_use_case(): Unit = { - // 0 -> HAVE SOMETHING TO WRITE - val messageToWrite = EdlaChange( - app_id_snow = "N/A", - data_definition_id = "TestingThis", - environment = "DEV", - format = "FooBar", - guid = "DebugId", - location = "ether", - operation = EdlaChange.Operation.Create(), - schema_link = "http://not.here", - source_app = "ThisCode", - timestamp_event = 12345 - ) - - // 1 -> DEFINE PROPS - kafka to treat all as string - val writerProps = new Properties() - writerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "ZADALNRAPP00009.corp.dsarena.com:9092") - writerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") - writerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") - - // 2 -> MAKE WRITER - val writer = new WriterImpl[EdlaChange](writerProps, "KillMePleaseTopic") + // This goes from your application logic + private val sampleMessageToWrite = EdlaChange( + app_id_snow = "N/A", + data_definition_id = "TestingThis", + environment = "DEV", + format = "FooBar", + guid = "DebugId", + location = "ether", + operation = EdlaChange.Operation.Create(), + schema_link = "http://not.here", + source_app = "ThisCode", + timestamp_event = 12345 + ) + + // This goes from your config / domain knowledge + private val topicName = "KillMePleaseTopic" + + // This goes from your writer configs + private val writerProps = new Properties() + writerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "ZADALNRAPP00009.corp.dsarena.com:9092") + writerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") + writerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") + + // This goes from your reader configs + private val readerProps = new Properties() + readerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "ZADALNRAPP00009.corp.dsarena.com:9092") + readerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer") + readerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer") + readerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, s"DebugConsumer_${UUID.randomUUID()}") + readerProps.put(ConsumerConfig.GROUP_ID_CONFIG, s"DebugGroup_${UUID.randomUUID()}") + readerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") + + private def writer_use_case_scala2(): Unit = { + val writer = new WriterImpl[EdlaChange](writerProps, topicName) try { - // 3 -> WRITE - writer.Write("sampleMessageKey", messageToWrite) + writer.Write("sampleMessageKey1", sampleMessageToWrite) + writer.Write("sampleMessageKey2", sampleMessageToWrite) } finally { - // Releasing resources should be handled by using block in newer versions of scala writer.close() } } - private def reader_use_case(): Unit = { - // 1 -> DEFINE PROPS - kafka to treat all as string - val readerProps = new Properties() - readerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "ZADALNRAPP00009.corp.dsarena.com:9092") - readerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer") - readerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer") - readerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, s"DebugConsumer_${UUID.randomUUID()}") - readerProps.put(ConsumerConfig.GROUP_ID_CONFIG, s"DebugGroup_${UUID.randomUUID()}") - readerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") - - // 2 -> MAKE READER (should be in using block for newer versions of scala) - val reader = new ReaderImpl[EdlaChange](readerProps, "KillMePleaseTopic") + private def writer_use_case_scala2_custom_resource_handler(): Unit = { + withResource(new WriterImpl[EdlaChange](writerProps, topicName))(writer => { + writer.Write("sampleMessageKey1", sampleMessageToWrite) + writer.Write("sampleMessageKey2", sampleMessageToWrite) + }) + } + +// scala3 only +// private def writer_use_case_scala3(): Unit = { +// Using(new WriterImpl[EdlaChange](writerProps, topicName)) { writer => +// writer.Write("sampleMessageKey1", sampleMessageToWrite) +// writer.Write("sampleMessageKey2", sampleMessageToWrite) +// } +// } + + private def reader_use_case_scala2(): Unit = { + val reader = new ReaderImpl[EdlaChange](readerProps, topicName, neverEnding = false) try { for (item <- reader) println(item) } finally { - // Releasing resources should be handled by using block in newer versions of scala reader.close() } } + private def reader_use_case_scala2_custom_resource_handler(): Unit = { + withResource(new ReaderImpl[EdlaChange](readerProps, topicName, neverEnding = false))(reader => { + for (item <- reader) + println(item) + }) + } + +// scala3 only +// private def reader_use_case_scala3(): Unit = { +// Using(new ReaderImpl[EdlaChange](readerProps, topicName, neverEnding = false)) { reader => +// for (item <- reader) +// println(item) +// } +// } + def main(args: Array[String]): Unit = { - writer_use_case() - reader_use_case() + writer_use_case_scala2() + reader_use_case_scala2() + + writer_use_case_scala2_custom_resource_handler() + reader_use_case_scala2_custom_resource_handler() + +// scala3 only +// writer_use_case_scala3() +// reader_use_case_scala3() } } diff --git a/models/src/main/scala/za/co/absa/kafkacase/models/utils/ResourceHandler.scala b/models/src/main/scala/za/co/absa/kafkacase/models/utils/ResourceHandler.scala new file mode 100644 index 0000000..0b51917 --- /dev/null +++ b/models/src/main/scala/za/co/absa/kafkacase/models/utils/ResourceHandler.scala @@ -0,0 +1,62 @@ +/* + * Copyright 2024 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.kafkacase.models.utils + +import scala.util.control.NonFatal + +// Inspired by https://dkomanov.medium.com/scala-try-with-resources-735baad0fd7d +// and for-comprehension contract by ATUM's ARMImplits +object ResourceHandler { + def withResource[TResource <: AutoCloseable, TResult](resourceFactory: => TResource)(operation: TResource => TResult): TResult = { + val resource: TResource = resourceFactory + require(resource != null, "resource is null") + var exception: Throwable = null + try { + operation(resource) + } catch { + case NonFatal(ex) => + exception = ex + throw ex + } finally { + closeAndAddSuppressed(exception, resource) + } + } + + private def closeAndAddSuppressed(ex: Throwable, resource: AutoCloseable): Unit = { + if (ex != null) { + try { + resource.close() + } catch { + case NonFatal(suppressed) => + ex.addSuppressed(suppressed) + } + } else { + resource.close() + } + } + + // implementing a for-comprehension contract inspired by ATUM's ARMImplits + implicit class ResourceWrapper[TResource <: AutoCloseable](resourceFactory: => TResource) { + def foreach(operation: TResource => Unit): Unit = withResource(resourceFactory)(operation) + + def map[TResult](operation: TResource => TResult): TResult = withResource(resourceFactory)(operation) + + def flatMap[TResult](operation: TResource => TResult): TResult = withResource(resourceFactory)(operation) + + def withFilter(ignored: TResource => Boolean): ResourceWrapper[TResource] = this + } +}