Skip to content

Commit

Permalink
added fields and unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
shraddha-ca committed Apr 24, 2024
1 parent 315e152 commit 2b16430
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import org.apache.kafka.connect.data.SchemaBuilder
import org.apache.kafka.connect.data.Struct
import org.apache.kafka.connect.errors.DataException
import org.apache.kafka.connect.json.JsonConverter
import org.apache.kafka.connect.sink.SinkRecord
import org.apache.kafka.connect.transforms.Transformation
import org.apache.kafka.connect.transforms.util.Requirements
import org.apache.kafka.connect.transforms.util.SchemaUtil
Expand Down Expand Up @@ -189,10 +190,19 @@ class RedShiftComplexDataTypeTransformer<R : ConnectRecord<R>> : Transformation<
}
builder.field("topic_key", convertFieldSchema(SchemaBuilder.string().build(), false, ""))
builder.field("tombstone", convertFieldSchema(SchemaBuilder.bool().build(), false, false))
builder.field("_kafka_metadata_partition", convertFieldSchema(SchemaBuilder.string().build(), true, null))
builder.field("_kafka_metadata_offset", convertFieldSchema(SchemaBuilder.string().build(), true, null))
builder.field("_kafka_metadata_timestamp", convertFieldSchema(SchemaBuilder.string().build(), true, null))
updatedSchema = builder.build()
schemaUpdateCache.put(sourceSchema, updatedSchema)
}
val updatedValue = Struct(updatedSchema)
updatedValue.put("_kafka_metadata_partition", record.kafkaPartition().toString())
if (record is SinkRecord) {
updatedValue.put("_kafka_metadata_timestamp", record.timestamp().toString())
updatedValue.put("_kafka_metadata_offset", record.kafkaOffset().toString())
}

if (record.key() != null) {
updatedValue.put("topic_key", record.key().toString())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@ import com.mongodb.kafka.connect.source.schema.AvroSchema
import com.mongodb.kafka.connect.source.schema.BsonValueToSchemaAndValue
import com.mongodb.kafka.connect.util.ClassHelper
import com.mongodb.kafka.connect.util.ConfigHelper
import org.apache.kafka.common.record.TimestampType
import org.apache.kafka.connect.data.Schema
import org.apache.kafka.connect.data.SchemaAndValue
import org.apache.kafka.connect.data.SchemaBuilder
import org.apache.kafka.connect.data.Struct
import org.apache.kafka.connect.source.SourceRecord
import org.apache.kafka.connect.sink.SinkRecord
import org.apache.kafka.connect.transforms.util.SchemaUtil
import org.junit.Before
import java.io.File
Expand All @@ -35,9 +36,9 @@ import kotlin.test.assertTrue
*/
class RedShiftComplexDataTypeTransformerTest {

private lateinit var transformer: RedShiftComplexDataTypeTransformer<SourceRecord>
private lateinit var transformer: RedShiftComplexDataTypeTransformer<SinkRecord>

private fun hasNoComplexTypes(obj: SourceRecord): Boolean {
private fun hasNoComplexTypes(obj: SinkRecord): Boolean {

var hasNoComplexTypes = true
for (field in obj.valueSchema().fields()) {
Expand All @@ -57,23 +58,25 @@ class RedShiftComplexDataTypeTransformerTest {
fun `can transform ECST Employee data that has arrays into string fields`() {

val avroRecord = payload("com/cultureamp/employee-data.employees-v1.json")
val sourceRecord = SourceRecord(
val sinkRecord = SinkRecord(
"employee data ecst test",
1,
null,
null,
"employee data ecst test",
avroRecord.schema(),
avroRecord.value()
avroRecord.value(),
156,
)

val transformedRecord = transformer.apply(sourceRecord)
hasNoComplexTypes(sourceRecord)
val transformedRecord = transformer.apply(sinkRecord)
hasNoComplexTypes(sinkRecord)
assertTrue(hasNoComplexTypes(transformedRecord))

val expectedValue = struct(
id, account_id, employee_id, event_created_at, body_source, body_employee_id, body_email, body_name, body_preferred_name, body_locale, body_observer, body_gdpr_erasure_request_id, body_test_map, body_test_map_1, body_test_array_of_structs, body_manager_assignment_manager_id, body_manager_assignment_demographic_id, body_erased, body_created_at, body_updated_at, body_deleted_at, metadata_correlation_id, metadata_causation_id, metadata_executor_id, metadata_service, test_array_of_structs, test_string_array, test_array_of_arrays, test_map,
topic_key = "",
tombstone = false
)
).put("_kafka_metadata_partition", "1").put("_kafka_metadata_offset", "156").put("_kafka_metadata_timestamp", "null")

assertEquals(expectedValue, transformedRecord.value())
assertEquals(getExpectedSchema(), transformedRecord.valueSchema())
Expand All @@ -83,23 +86,25 @@ class RedShiftComplexDataTypeTransformerTest {
fun `can transform ECST Employee data with null body`() {

val avroRecord = payload("com/cultureamp/employee-data.employees-v2.json")
val sourceRecord = SourceRecord(
val sinkRecord = SinkRecord(
"employee data ecst test",
1,
null,
null,
"employee data ecst test",
avroRecord.schema(),
avroRecord.value()
avroRecord.value(),
156,
)

val transformedRecord = transformer.apply(sourceRecord)
hasNoComplexTypes(sourceRecord)
val transformedRecord = transformer.apply(sinkRecord)
hasNoComplexTypes(sinkRecord)
assertTrue(hasNoComplexTypes(transformedRecord))

val expectedValue = nullBodyStruct(
id, account_id, employee_id, event_created_at, metadata_correlation_id, metadata_causation_id, metadata_executor_id, "Default-Service", test_array_of_structs, test_string_array, test_array_of_arrays, test_map,
topic_key = "",
tombstone = true
)
).put("_kafka_metadata_partition", "1").put("_kafka_metadata_offset", "156").put("_kafka_metadata_timestamp", "null")

assertEquals(expectedValue, transformedRecord.value())
assertEquals(getExpectedSchema(), transformedRecord.valueSchema())
Expand All @@ -109,26 +114,27 @@ class RedShiftComplexDataTypeTransformerTest {
fun `can transform ECST Employee data that has key as field`() {

val avroRecord = payload("com/cultureamp/employee-data.employees-v1.json")
val sourceRecord = SourceRecord(
null,
null,
val sinkRecord = SinkRecord(
"employee data ecst test",
null,
1,
Schema.STRING_SCHEMA,
"hellp",
avroRecord.schema(),
avroRecord.value()
avroRecord.value(),
156,
1713922160,
TimestampType.CREATE_TIME,
)

val transformedRecord = transformer.apply(sourceRecord)
hasNoComplexTypes(sourceRecord)
val transformedRecord = transformer.apply(sinkRecord)
hasNoComplexTypes(sinkRecord)
assertTrue(hasNoComplexTypes(transformedRecord))

val expectedValue = struct(
id, account_id, employee_id, event_created_at, body_source, body_employee_id, body_email, body_name, body_preferred_name, body_locale, body_observer, body_gdpr_erasure_request_id, body_test_map, body_test_map_1, body_test_array_of_structs, body_manager_assignment_manager_id, body_manager_assignment_demographic_id, body_erased, body_created_at, body_updated_at, body_deleted_at, metadata_correlation_id, metadata_causation_id, metadata_executor_id, metadata_service, test_array_of_structs, test_string_array, test_array_of_arrays, test_map,
topic_key = "hellp",
tombstone = false
)
).put("_kafka_metadata_partition", "1").put("_kafka_metadata_offset", "156").put("_kafka_metadata_timestamp", "1713922160")

assertEquals(expectedValue, transformedRecord.value())
assertEquals(getExpectedSchema(), transformedRecord.valueSchema())
Expand All @@ -138,61 +144,61 @@ class RedShiftComplexDataTypeTransformerTest {
fun `can transform ECST Employee data with tombstone message and non-null key`() {

val avroRecord = payload("com/cultureamp/employee-data.employees-v1.json")
val sourceRecord = SourceRecord(
null,
null,
val sinkRecord = SinkRecord(
"employee data ecst test",
null,
0,
Schema.STRING_SCHEMA,
"hellp",
avroRecord.schema(),
null
null,
156,
1713922160,
TimestampType.CREATE_TIME,
)

val transformedRecord = transformer.apply(sourceRecord)
hasNoComplexTypes(sourceRecord)
val transformedRecord = transformer.apply(sinkRecord)
hasNoComplexTypes(sinkRecord)
assertTrue(hasNoComplexTypes(transformedRecord))
}

@Test
fun `can transform ECST Employee data with tombstone message and null key`() {

val avroRecord = payload("com/cultureamp/employee-data.employees-v1.json")
val sourceRecord = SourceRecord(
null,
null,
val sinkRecord = SinkRecord(
"employee data ecst test",
null,
0,
null,
null,
avroRecord.schema(),
null
null,
156,
1713922160,
TimestampType.CREATE_TIME,
)

val transformedRecord = transformer.apply(sourceRecord)
hasNoComplexTypes(sourceRecord)
val transformedRecord = transformer.apply(sinkRecord)
hasNoComplexTypes(sinkRecord)
assertTrue(hasNoComplexTypes(transformedRecord))

val expectedValue = Struct(getExpectedSchema()).put("tombstone", true)

val expectedValue = Struct(getExpectedSchema()).put("tombstone", true).put("_kafka_metadata_partition", "0").put("_kafka_metadata_offset", "156").put("_kafka_metadata_timestamp", "1713922160")
assertEquals(expectedValue, transformedRecord.value())
assertEquals(getExpectedSchema(), transformedRecord.valueSchema())
}

@Test
fun `can transform ECST Employee data with tombstone message and null key and null value schema`() {
val sourceRecord = SourceRecord(
null,
null,
val sinkRecord = SinkRecord(
"employee data ecst test",
5,
null,
null,
null,
null,
null
156,
)

val transformedRecord = transformer.apply(sourceRecord)
val transformedRecord = transformer.apply(sinkRecord)
assertTrue(hasNoComplexTypes(transformedRecord))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,30 @@
"name": "tombstone",
"type": "boolean",
"default": false
},
{
"name": "_kafka_metadata_partition",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "_kafka_metadata_offset",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "_kafka_metadata_timestamp",
"type": [
"null",
"string"
],
"default": null
}
]
}

0 comments on commit 2b16430

Please sign in to comment.