From fa6963275ab38e1230d20caf7fe78747c06395e1 Mon Sep 17 00:00:00 2001 From: "Miroslav Chomut (CZ)" Date: Thu, 10 Oct 2024 12:02:03 +0200 Subject: [PATCH] #17 resource handling helper classes (#18) --- .../reader/UsingsResourceHandling.scala | 26 ++++++ .../writer/UsingsResourceHandling.scala | 26 ++++++ .../reader/UsingsResourceHandling.scala | 32 +++++++ .../writer/UsingsResourceHandling.scala | 32 +++++++ .../absa/kafkacase/examples/KafkaCase.scala | 86 ++++++++----------- .../reader/CustomResourceHandling.scala | 32 +++++++ .../reader/ManualResourceHandling.scala | 34 ++++++++ .../writer/CustomResourceHandling.scala | 32 +++++++ .../writer/ManualResourceHandling.scala | 34 ++++++++ .../models/utils/ResourceHandler.scala | 62 +++++++++++++ 10 files changed, 344 insertions(+), 52 deletions(-) create mode 100644 examples/src/main/scala-2/za.co.absa.kafkacase.examples/reader/UsingsResourceHandling.scala create mode 100644 examples/src/main/scala-2/za.co.absa.kafkacase.examples/writer/UsingsResourceHandling.scala create mode 100644 examples/src/main/scala-3/za/co/absa/kafkacase/examples/reader/UsingsResourceHandling.scala create mode 100644 examples/src/main/scala-3/za/co/absa/kafkacase/examples/writer/UsingsResourceHandling.scala create mode 100644 examples/src/main/scala/za/co/absa/kafkacase/examples/reader/CustomResourceHandling.scala create mode 100644 examples/src/main/scala/za/co/absa/kafkacase/examples/reader/ManualResourceHandling.scala create mode 100644 examples/src/main/scala/za/co/absa/kafkacase/examples/writer/CustomResourceHandling.scala create mode 100644 examples/src/main/scala/za/co/absa/kafkacase/examples/writer/ManualResourceHandling.scala create mode 100644 models/src/main/scala/za/co/absa/kafkacase/models/utils/ResourceHandler.scala diff --git a/examples/src/main/scala-2/za.co.absa.kafkacase.examples/reader/UsingsResourceHandling.scala b/examples/src/main/scala-2/za.co.absa.kafkacase.examples/reader/UsingsResourceHandling.scala new file mode 100644 index 0000000..9b6dc97 --- /dev/null +++ b/examples/src/main/scala-2/za.co.absa.kafkacase.examples/reader/UsingsResourceHandling.scala @@ -0,0 +1,26 @@ +/* + * Copyright 2024 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.kafkacase.examples.reader + +import io.circe.Decoder +import java.util.Properties + +object UsingsResourceHandling { + def apply[T: Decoder](readerProps: Properties, topicName: String): Unit = { + println("Scala 3 feature") + } +} diff --git a/examples/src/main/scala-2/za.co.absa.kafkacase.examples/writer/UsingsResourceHandling.scala b/examples/src/main/scala-2/za.co.absa.kafkacase.examples/writer/UsingsResourceHandling.scala new file mode 100644 index 0000000..08adf1b --- /dev/null +++ b/examples/src/main/scala-2/za.co.absa.kafkacase.examples/writer/UsingsResourceHandling.scala @@ -0,0 +1,26 @@ +/* + * Copyright 2024 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.kafkacase.examples.writer + +import io.circe.Encoder +import java.util.Properties + +object UsingsResourceHandling { + def apply[T: Encoder](writerProps: Properties, topicName: String, sampleMessageToWrite: T): Unit = { + println("Scala 3 feature") + } +} diff --git a/examples/src/main/scala-3/za/co/absa/kafkacase/examples/reader/UsingsResourceHandling.scala b/examples/src/main/scala-3/za/co/absa/kafkacase/examples/reader/UsingsResourceHandling.scala new file mode 100644 index 0000000..e3d9b2e --- /dev/null +++ b/examples/src/main/scala-3/za/co/absa/kafkacase/examples/reader/UsingsResourceHandling.scala @@ -0,0 +1,32 @@ +/* + * Copyright 2024 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.kafkacase.examples.reader + +import io.circe.Decoder +import za.co.absa.kafkacase.reader.ReaderImpl + +import java.util.Properties +import scala.util.Using + +object UsingsResourceHandling { + def apply[T: Decoder](readerProps: Properties, topicName: String): Unit = { + Using(new ReaderImpl[T](readerProps, topicName, neverEnding = false)) { reader => + for (item <- reader) + println(item) + } + } +} diff --git a/examples/src/main/scala-3/za/co/absa/kafkacase/examples/writer/UsingsResourceHandling.scala b/examples/src/main/scala-3/za/co/absa/kafkacase/examples/writer/UsingsResourceHandling.scala new file mode 100644 index 0000000..d73116d --- /dev/null +++ b/examples/src/main/scala-3/za/co/absa/kafkacase/examples/writer/UsingsResourceHandling.scala @@ -0,0 +1,32 @@ +/* + * Copyright 2024 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.kafkacase.examples.writer + +import io.circe.Encoder +import za.co.absa.kafkacase.writer.WriterImpl + +import java.util.Properties +import scala.util.Using + +object UsingsResourceHandling { + def apply[T: Encoder](writerProps: Properties, topicName: String, sampleMessageToWrite: T): Unit = { + Using(new WriterImpl[T](writerProps, topicName)) { writer => + writer.Write("sampleMessageKey1", sampleMessageToWrite) + writer.Write("sampleMessageKey2", sampleMessageToWrite) + } + } +} diff --git a/examples/src/main/scala/za/co/absa/kafkacase/examples/KafkaCase.scala b/examples/src/main/scala/za/co/absa/kafkacase/examples/KafkaCase.scala index eb0f80f..2b716bc 100644 --- a/examples/src/main/scala/za/co/absa/kafkacase/examples/KafkaCase.scala +++ b/examples/src/main/scala/za/co/absa/kafkacase/examples/KafkaCase.scala @@ -19,67 +19,49 @@ package za.co.absa.kafkacase.examples import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.clients.producer.ProducerConfig import za.co.absa.kafkacase.models.topics.EdlaChange -import za.co.absa.kafkacase.reader.ReaderImpl -import za.co.absa.kafkacase.writer.WriterImpl import java.util.{Properties, UUID} object KafkaCase { - private def writer_use_case(): Unit = { - // 0 -> HAVE SOMETHING TO WRITE - val messageToWrite = EdlaChange( - app_id_snow = "N/A", - data_definition_id = "TestingThis", - environment = "DEV", - format = "FooBar", - guid = "DebugId", - location = "ether", - operation = EdlaChange.Operation.Create(), - schema_link = "http://not.here", - source_app = "ThisCode", - timestamp_event = 12345 - ) + // This goes from your application logic + private val sampleMessageToWrite = EdlaChange( + app_id_snow = "N/A", + data_definition_id = "TestingThis", + environment = "DEV", + format = "FooBar", + guid = "DebugId", + location = "ether", + operation = EdlaChange.Operation.Create(), + schema_link = "http://not.here", + source_app = "ThisCode", + timestamp_event = 12345 + ) - // 1 -> DEFINE PROPS - kafka to treat all as string - val writerProps = new Properties() - writerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "ZADALNRAPP00009.corp.dsarena.com:9092") - writerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") - writerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") + // This goes from your config / domain knowledge + private val topicName = "KillMePleaseTopic" - // 2 -> MAKE WRITER - val writer = new WriterImpl[EdlaChange](writerProps, "KillMePleaseTopic") - try { - // 3 -> WRITE - writer.Write("sampleMessageKey", messageToWrite) - } finally { - // Releasing resources should be handled by using block in newer versions of scala - writer.close() - } - } + // This goes from your writer configs + private val writerProps = new Properties() + writerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "ZADALNRAPP00009.corp.dsarena.com:9092") + writerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") + writerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") - private def reader_use_case(): Unit = { - // 1 -> DEFINE PROPS - kafka to treat all as string - val readerProps = new Properties() - readerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "ZADALNRAPP00009.corp.dsarena.com:9092") - readerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer") - readerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer") - readerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, s"DebugConsumer_${UUID.randomUUID()}") - readerProps.put(ConsumerConfig.GROUP_ID_CONFIG, s"DebugGroup_${UUID.randomUUID()}") - readerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") + // This goes from your reader configs + private val readerProps = new Properties() + readerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "ZADALNRAPP00009.corp.dsarena.com:9092") + readerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer") + readerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer") + readerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, s"DebugConsumer_${UUID.randomUUID()}") + readerProps.put(ConsumerConfig.GROUP_ID_CONFIG, s"DebugGroup_${UUID.randomUUID()}") + readerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") - // 2 -> MAKE READER (should be in using block for newer versions of scala) - val reader = new ReaderImpl[EdlaChange](readerProps, "KillMePleaseTopic") - try { - for (item <- reader) - println(item) - } finally { - // Releasing resources should be handled by using block in newer versions of scala - reader.close() - } - } def main(args: Array[String]): Unit = { - writer_use_case() - reader_use_case() + writer.ManualResourceHandling(writerProps, topicName, sampleMessageToWrite) + writer.CustomResourceHandling(writerProps, topicName, sampleMessageToWrite) + writer.UsingsResourceHandling(writerProps, topicName, sampleMessageToWrite) + reader.ManualResourceHandling[EdlaChange](readerProps, topicName) + reader.CustomResourceHandling[EdlaChange](readerProps, topicName) + reader.UsingsResourceHandling[EdlaChange](readerProps, topicName) } } diff --git a/examples/src/main/scala/za/co/absa/kafkacase/examples/reader/CustomResourceHandling.scala b/examples/src/main/scala/za/co/absa/kafkacase/examples/reader/CustomResourceHandling.scala new file mode 100644 index 0000000..64eded9 --- /dev/null +++ b/examples/src/main/scala/za/co/absa/kafkacase/examples/reader/CustomResourceHandling.scala @@ -0,0 +1,32 @@ +/* + * Copyright 2024 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.kafkacase.examples.reader + +import io.circe.Decoder +import za.co.absa.kafkacase.models.utils.ResourceHandler.withResource +import za.co.absa.kafkacase.reader.ReaderImpl + +import java.util.Properties + +object CustomResourceHandling { + def apply[T: Decoder](readerProps: Properties, topicName: String): Unit = { + withResource(new ReaderImpl[T](readerProps, topicName, neverEnding = false))(reader => { + for (item <- reader) + println(item) + }) + } +} diff --git a/examples/src/main/scala/za/co/absa/kafkacase/examples/reader/ManualResourceHandling.scala b/examples/src/main/scala/za/co/absa/kafkacase/examples/reader/ManualResourceHandling.scala new file mode 100644 index 0000000..f04ed25 --- /dev/null +++ b/examples/src/main/scala/za/co/absa/kafkacase/examples/reader/ManualResourceHandling.scala @@ -0,0 +1,34 @@ +/* + * Copyright 2024 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.kafkacase.examples.reader + +import io.circe.Decoder +import za.co.absa.kafkacase.reader.ReaderImpl + +import java.util.Properties + +object ManualResourceHandling { + def apply[T: Decoder](readerProps: Properties, topicName: String): Unit = { + val reader = new ReaderImpl[T](readerProps, topicName, neverEnding = false) + try { + for (item <- reader) + println(item) + } finally { + reader.close() + } + } +} diff --git a/examples/src/main/scala/za/co/absa/kafkacase/examples/writer/CustomResourceHandling.scala b/examples/src/main/scala/za/co/absa/kafkacase/examples/writer/CustomResourceHandling.scala new file mode 100644 index 0000000..829c234 --- /dev/null +++ b/examples/src/main/scala/za/co/absa/kafkacase/examples/writer/CustomResourceHandling.scala @@ -0,0 +1,32 @@ +/* + * Copyright 2024 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.kafkacase.examples.writer + +import io.circe.Encoder +import za.co.absa.kafkacase.models.utils.ResourceHandler.withResource +import za.co.absa.kafkacase.writer.WriterImpl + +import java.util.Properties + +object CustomResourceHandling { + def apply[T: Encoder](writerProps: Properties, topicName: String, sampleMessageToWrite: T): Unit = { + withResource(new WriterImpl[T](writerProps, topicName))(writer => { + writer.Write("sampleMessageKey1", sampleMessageToWrite) + writer.Write("sampleMessageKey2", sampleMessageToWrite) + }) + } +} diff --git a/examples/src/main/scala/za/co/absa/kafkacase/examples/writer/ManualResourceHandling.scala b/examples/src/main/scala/za/co/absa/kafkacase/examples/writer/ManualResourceHandling.scala new file mode 100644 index 0000000..ae2c2b0 --- /dev/null +++ b/examples/src/main/scala/za/co/absa/kafkacase/examples/writer/ManualResourceHandling.scala @@ -0,0 +1,34 @@ +/* + * Copyright 2024 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.kafkacase.examples.writer + +import io.circe.Encoder +import za.co.absa.kafkacase.writer.WriterImpl + +import java.util.Properties + +object ManualResourceHandling { + def apply[T: Encoder](writerProps: Properties, topicName: String, sampleMessageToWrite: T): Unit = { + val writer = new WriterImpl[T](writerProps, topicName) + try { + writer.Write("sampleMessageKey1", sampleMessageToWrite) + writer.Write("sampleMessageKey2", sampleMessageToWrite) + } finally { + writer.close() + } + } +} diff --git a/models/src/main/scala/za/co/absa/kafkacase/models/utils/ResourceHandler.scala b/models/src/main/scala/za/co/absa/kafkacase/models/utils/ResourceHandler.scala new file mode 100644 index 0000000..0b51917 --- /dev/null +++ b/models/src/main/scala/za/co/absa/kafkacase/models/utils/ResourceHandler.scala @@ -0,0 +1,62 @@ +/* + * Copyright 2024 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.kafkacase.models.utils + +import scala.util.control.NonFatal + +// Inspired by https://dkomanov.medium.com/scala-try-with-resources-735baad0fd7d +// and for-comprehension contract by ATUM's ARMImplits +object ResourceHandler { + def withResource[TResource <: AutoCloseable, TResult](resourceFactory: => TResource)(operation: TResource => TResult): TResult = { + val resource: TResource = resourceFactory + require(resource != null, "resource is null") + var exception: Throwable = null + try { + operation(resource) + } catch { + case NonFatal(ex) => + exception = ex + throw ex + } finally { + closeAndAddSuppressed(exception, resource) + } + } + + private def closeAndAddSuppressed(ex: Throwable, resource: AutoCloseable): Unit = { + if (ex != null) { + try { + resource.close() + } catch { + case NonFatal(suppressed) => + ex.addSuppressed(suppressed) + } + } else { + resource.close() + } + } + + // implementing a for-comprehension contract inspired by ATUM's ARMImplits + implicit class ResourceWrapper[TResource <: AutoCloseable](resourceFactory: => TResource) { + def foreach(operation: TResource => Unit): Unit = withResource(resourceFactory)(operation) + + def map[TResult](operation: TResource => TResult): TResult = withResource(resourceFactory)(operation) + + def flatMap[TResult](operation: TResource => TResult): TResult = withResource(resourceFactory)(operation) + + def withFilter(ignored: TResource => Boolean): ResourceWrapper[TResource] = this + } +}