Skip to content

Commit

Permalink
#17 Resource handling helper classes
Browse files Browse the repository at this point in the history
  • Loading branch information
ABMC831 committed Oct 8, 2024
1 parent 029ca61 commit 7b11e9d
Show file tree
Hide file tree
Showing 2 changed files with 143 additions and 42 deletions.
123 changes: 81 additions & 42 deletions examples/src/main/scala/za/co/absa/kafkacase/examples/KafkaCase.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,72 +14,111 @@
* limitations under the License.
*/

package za.co.absa.kafkacase.examples
package za.co.absa.KafkaCase.Examples

import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerConfig
import za.co.absa.kafkacase.models.topics.EdlaChange
import za.co.absa.kafkacase.models.utils.ResourceHandler.withResource
import za.co.absa.kafkacase.reader.ReaderImpl
import za.co.absa.kafkacase.writer.WriterImpl

import java.util.{Properties, UUID}
// scala3 only
// import scala.util.Using

object KafkaCase {
private def writer_use_case(): Unit = {
// 0 -> HAVE SOMETHING TO WRITE
val messageToWrite = EdlaChange(
app_id_snow = "N/A",
data_definition_id = "TestingThis",
environment = "DEV",
format = "FooBar",
guid = "DebugId",
location = "ether",
operation = EdlaChange.Operation.Create(),
schema_link = "http://not.here",
source_app = "ThisCode",
timestamp_event = 12345
)

// 1 -> DEFINE PROPS - kafka to treat all as string
val writerProps = new Properties()
writerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "ZADALNRAPP00009.corp.dsarena.com:9092")
writerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
writerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")

// 2 -> MAKE WRITER
val writer = new WriterImpl[EdlaChange](writerProps, "KillMePleaseTopic")
// This goes from your application logic
private val sampleMessageToWrite = EdlaChange(
app_id_snow = "N/A",
data_definition_id = "TestingThis",
environment = "DEV",
format = "FooBar",
guid = "DebugId",
location = "ether",
operation = EdlaChange.Operation.Create(),
schema_link = "http://not.here",
source_app = "ThisCode",
timestamp_event = 12345
)

// This goes from your config / domain knowledge
private val topicName = "KillMePleaseTopic"

// This goes from your writer configs
private val writerProps = new Properties()
writerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "ZADALNRAPP00009.corp.dsarena.com:9092")
writerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
writerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")

// This goes from your reader configs
private val readerProps = new Properties()
readerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "ZADALNRAPP00009.corp.dsarena.com:9092")
readerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
readerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
readerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, s"DebugConsumer_${UUID.randomUUID()}")
readerProps.put(ConsumerConfig.GROUP_ID_CONFIG, s"DebugGroup_${UUID.randomUUID()}")
readerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

private def writer_use_case_scala2(): Unit = {
val writer = new WriterImpl[EdlaChange](writerProps, topicName)
try {
// 3 -> WRITE
writer.Write("sampleMessageKey", messageToWrite)
writer.Write("sampleMessageKey1", sampleMessageToWrite)
writer.Write("sampleMessageKey2", sampleMessageToWrite)
} finally {
// Releasing resources should be handled by using block in newer versions of scala
writer.close()
}
}

private def reader_use_case(): Unit = {
// 1 -> DEFINE PROPS - kafka to treat all as string
val readerProps = new Properties()
readerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "ZADALNRAPP00009.corp.dsarena.com:9092")
readerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
readerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
readerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, s"DebugConsumer_${UUID.randomUUID()}")
readerProps.put(ConsumerConfig.GROUP_ID_CONFIG, s"DebugGroup_${UUID.randomUUID()}")
readerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

// 2 -> MAKE READER (should be in using block for newer versions of scala)
val reader = new ReaderImpl[EdlaChange](readerProps, "KillMePleaseTopic")
private def writer_use_case_scala2_custom_resource_handler(): Unit = {
withResource(new WriterImpl[EdlaChange](writerProps, topicName))(writer => {
writer.Write("sampleMessageKey1", sampleMessageToWrite)
writer.Write("sampleMessageKey2", sampleMessageToWrite)
})
}

// scala3 only
// private def writer_use_case_scala3(): Unit = {
// Using(new WriterImpl[EdlaChange](writerProps, topicName)) { writer =>
// writer.Write("sampleMessageKey1", sampleMessageToWrite)
// writer.Write("sampleMessageKey2", sampleMessageToWrite)
// }
// }

private def reader_use_case_scala2(): Unit = {
val reader = new ReaderImpl[EdlaChange](readerProps, topicName, neverEnding = false)
try {
for (item <- reader)
println(item)
} finally {
// Releasing resources should be handled by using block in newer versions of scala
reader.close()
}
}

private def reader_use_case_scala2_custom_resource_handler(): Unit = {
withResource(new ReaderImpl[EdlaChange](readerProps, topicName, neverEnding = false))(reader => {
for (item <- reader)
println(item)
})
}

// scala3 only
// private def reader_use_case_scala3(): Unit = {
// Using(new ReaderImpl[EdlaChange](readerProps, topicName, neverEnding = false)) { reader =>
// for (item <- reader)
// println(item)
// }
// }

def main(args: Array[String]): Unit = {
writer_use_case()
reader_use_case()
writer_use_case_scala2()
reader_use_case_scala2()

writer_use_case_scala2_custom_resource_handler()
reader_use_case_scala2_custom_resource_handler()

// scala3 only
// writer_use_case_scala3()
// reader_use_case_scala3()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright 2024 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.kafkacase.models.utils

import scala.util.control.NonFatal

// Inspired by https://dkomanov.medium.com/scala-try-with-resources-735baad0fd7d
// and for-comprehension contract by ATUM's ARMImplits
object ResourceHandler {
def withResource[TResource <: AutoCloseable, TResult](resourceFactory: => TResource)(operation: TResource => TResult): TResult = {
val resource: TResource = resourceFactory
require(resource != null, "resource is null")
var exception: Throwable = null
try {
operation(resource)
} catch {
case NonFatal(ex) =>
exception = ex
throw ex
} finally {
closeAndAddSuppressed(exception, resource)
}
}

private def closeAndAddSuppressed(ex: Throwable, resource: AutoCloseable): Unit = {
if (ex != null) {
try {
resource.close()
} catch {
case NonFatal(suppressed) =>
ex.addSuppressed(suppressed)
}
} else {
resource.close()
}
}

// implementing a for-comprehension contract inspired by ATUM's ARMImplits
implicit class ResourceWrapper[TResource <: AutoCloseable](resourceFactory: => TResource) {
def foreach(operation: TResource => Unit): Unit = withResource(resourceFactory)(operation)

def map[TResult](operation: TResource => TResult): TResult = withResource(resourceFactory)(operation)

def flatMap[TResult](operation: TResource => TResult): TResult = withResource(resourceFactory)(operation)

def withFilter(ignored: TResource => Boolean): ResourceWrapper[TResource] = this
}
}

0 comments on commit 7b11e9d

Please sign in to comment.