Skip to content

Commit

Permalink
#17 resource handling helper classes (#18)
Browse files Browse the repository at this point in the history
  • Loading branch information
ABMC831 authored Oct 10, 2024
1 parent 029ca61 commit fa69632
Show file tree
Hide file tree
Showing 10 changed files with 344 additions and 52 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright 2024 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.kafkacase.examples.reader

import io.circe.Decoder
import java.util.Properties

object UsingsResourceHandling {
def apply[T: Decoder](readerProps: Properties, topicName: String): Unit = {
println("Scala 3 feature")
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright 2024 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.kafkacase.examples.writer

import io.circe.Encoder
import java.util.Properties

object UsingsResourceHandling {
def apply[T: Encoder](writerProps: Properties, topicName: String, sampleMessageToWrite: T): Unit = {
println("Scala 3 feature")
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright 2024 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.kafkacase.examples.reader

import io.circe.Decoder
import za.co.absa.kafkacase.reader.ReaderImpl

import java.util.Properties
import scala.util.Using

object UsingsResourceHandling {
def apply[T: Decoder](readerProps: Properties, topicName: String): Unit = {
Using(new ReaderImpl[T](readerProps, topicName, neverEnding = false)) { reader =>
for (item <- reader)
println(item)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright 2024 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.kafkacase.examples.writer

import io.circe.Encoder
import za.co.absa.kafkacase.writer.WriterImpl

import java.util.Properties
import scala.util.Using

object UsingsResourceHandling {
def apply[T: Encoder](writerProps: Properties, topicName: String, sampleMessageToWrite: T): Unit = {
Using(new WriterImpl[T](writerProps, topicName)) { writer =>
writer.Write("sampleMessageKey1", sampleMessageToWrite)
writer.Write("sampleMessageKey2", sampleMessageToWrite)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,67 +19,49 @@ package za.co.absa.kafkacase.examples
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerConfig
import za.co.absa.kafkacase.models.topics.EdlaChange
import za.co.absa.kafkacase.reader.ReaderImpl
import za.co.absa.kafkacase.writer.WriterImpl

import java.util.{Properties, UUID}

object KafkaCase {
private def writer_use_case(): Unit = {
// 0 -> HAVE SOMETHING TO WRITE
val messageToWrite = EdlaChange(
app_id_snow = "N/A",
data_definition_id = "TestingThis",
environment = "DEV",
format = "FooBar",
guid = "DebugId",
location = "ether",
operation = EdlaChange.Operation.Create(),
schema_link = "http://not.here",
source_app = "ThisCode",
timestamp_event = 12345
)
// This goes from your application logic
private val sampleMessageToWrite = EdlaChange(
app_id_snow = "N/A",
data_definition_id = "TestingThis",
environment = "DEV",
format = "FooBar",
guid = "DebugId",
location = "ether",
operation = EdlaChange.Operation.Create(),
schema_link = "http://not.here",
source_app = "ThisCode",
timestamp_event = 12345
)

// 1 -> DEFINE PROPS - kafka to treat all as string
val writerProps = new Properties()
writerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "ZADALNRAPP00009.corp.dsarena.com:9092")
writerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
writerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
// This goes from your config / domain knowledge
private val topicName = "KillMePleaseTopic"

// 2 -> MAKE WRITER
val writer = new WriterImpl[EdlaChange](writerProps, "KillMePleaseTopic")
try {
// 3 -> WRITE
writer.Write("sampleMessageKey", messageToWrite)
} finally {
// Releasing resources should be handled by using block in newer versions of scala
writer.close()
}
}
// This goes from your writer configs
private val writerProps = new Properties()
writerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "ZADALNRAPP00009.corp.dsarena.com:9092")
writerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
writerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")

private def reader_use_case(): Unit = {
// 1 -> DEFINE PROPS - kafka to treat all as string
val readerProps = new Properties()
readerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "ZADALNRAPP00009.corp.dsarena.com:9092")
readerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
readerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
readerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, s"DebugConsumer_${UUID.randomUUID()}")
readerProps.put(ConsumerConfig.GROUP_ID_CONFIG, s"DebugGroup_${UUID.randomUUID()}")
readerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
// This goes from your reader configs
private val readerProps = new Properties()
readerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "ZADALNRAPP00009.corp.dsarena.com:9092")
readerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
readerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
readerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, s"DebugConsumer_${UUID.randomUUID()}")
readerProps.put(ConsumerConfig.GROUP_ID_CONFIG, s"DebugGroup_${UUID.randomUUID()}")
readerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

// 2 -> MAKE READER (should be in using block for newer versions of scala)
val reader = new ReaderImpl[EdlaChange](readerProps, "KillMePleaseTopic")
try {
for (item <- reader)
println(item)
} finally {
// Releasing resources should be handled by using block in newer versions of scala
reader.close()
}
}

def main(args: Array[String]): Unit = {
writer_use_case()
reader_use_case()
writer.ManualResourceHandling(writerProps, topicName, sampleMessageToWrite)
writer.CustomResourceHandling(writerProps, topicName, sampleMessageToWrite)
writer.UsingsResourceHandling(writerProps, topicName, sampleMessageToWrite)
reader.ManualResourceHandling[EdlaChange](readerProps, topicName)
reader.CustomResourceHandling[EdlaChange](readerProps, topicName)
reader.UsingsResourceHandling[EdlaChange](readerProps, topicName)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright 2024 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.kafkacase.examples.reader

import io.circe.Decoder
import za.co.absa.kafkacase.models.utils.ResourceHandler.withResource
import za.co.absa.kafkacase.reader.ReaderImpl

import java.util.Properties

object CustomResourceHandling {
def apply[T: Decoder](readerProps: Properties, topicName: String): Unit = {
withResource(new ReaderImpl[T](readerProps, topicName, neverEnding = false))(reader => {
for (item <- reader)
println(item)
})
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright 2024 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.kafkacase.examples.reader

import io.circe.Decoder
import za.co.absa.kafkacase.reader.ReaderImpl

import java.util.Properties

object ManualResourceHandling {
def apply[T: Decoder](readerProps: Properties, topicName: String): Unit = {
val reader = new ReaderImpl[T](readerProps, topicName, neverEnding = false)
try {
for (item <- reader)
println(item)
} finally {
reader.close()
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright 2024 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.kafkacase.examples.writer

import io.circe.Encoder
import za.co.absa.kafkacase.models.utils.ResourceHandler.withResource
import za.co.absa.kafkacase.writer.WriterImpl

import java.util.Properties

object CustomResourceHandling {
def apply[T: Encoder](writerProps: Properties, topicName: String, sampleMessageToWrite: T): Unit = {
withResource(new WriterImpl[T](writerProps, topicName))(writer => {
writer.Write("sampleMessageKey1", sampleMessageToWrite)
writer.Write("sampleMessageKey2", sampleMessageToWrite)
})
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright 2024 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.kafkacase.examples.writer

import io.circe.Encoder
import za.co.absa.kafkacase.writer.WriterImpl

import java.util.Properties

object ManualResourceHandling {
def apply[T: Encoder](writerProps: Properties, topicName: String, sampleMessageToWrite: T): Unit = {
val writer = new WriterImpl[T](writerProps, topicName)
try {
writer.Write("sampleMessageKey1", sampleMessageToWrite)
writer.Write("sampleMessageKey2", sampleMessageToWrite)
} finally {
writer.close()
}
}
}
Loading

0 comments on commit fa69632

Please sign in to comment.