Skip to content

Commit

Permalink
Merge branch 'airbytehq:master' into teradata_master
Browse files Browse the repository at this point in the history
  • Loading branch information
sc250072 authored Jan 27, 2025
2 parents 59ad20b + db04610 commit f9fd802
Show file tree
Hide file tree
Showing 1,072 changed files with 17,362 additions and 12,319 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.load.message.QueueReader
import io.airbyte.cdk.load.task.internal.ForceFlushEvent
import io.micronaut.context.annotation.Secondary
import io.micronaut.context.annotation.Value
import jakarta.inject.Singleton
import java.util.concurrent.ConcurrentHashMap

Expand All @@ -35,7 +36,9 @@ interface FlushStrategy {
@Secondary
class DefaultFlushStrategy(
private val config: DestinationConfiguration,
private val eventQueue: QueueReader<ForceFlushEvent>
private val eventQueue: QueueReader<ForceFlushEvent>,
@Value("\${airbyte.destination.record-batch-size-override}")
private val recordBatchSizeOverride: Long? = null
) : FlushStrategy {
private val forceFlushIndexes = ConcurrentHashMap<DestinationStream.Descriptor, Long>()

Expand All @@ -44,7 +47,7 @@ class DefaultFlushStrategy(
rangeRead: Range<Long>,
bytesProcessed: Long
): Boolean {
if (bytesProcessed >= config.recordBatchSizeBytes) {
if (bytesProcessed >= (recordBatchSizeOverride ?: config.recordBatchSizeBytes)) {
return true
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock

interface DestinationTaskLauncher : TaskLauncher {
suspend fun handleSetupComplete()
suspend fun handleNewBatch(stream: DestinationStream.Descriptor, wrapped: BatchEnvelope<*>)
suspend fun handleStreamClosed(stream: DestinationStream.Descriptor)
suspend fun handleTeardownComplete(success: Boolean = true)
Expand Down Expand Up @@ -184,12 +185,9 @@ class DefaultDestinationTaskLauncher(

// Launch the client interface setup task
log.info { "Starting startup task" }
val setupTask = setupTaskFactory.make()
val setupTask = setupTaskFactory.make(this)
launch(setupTask)

log.info { "Enqueueing open stream tasks" }
catalog.streams.forEach { openStreamQueue.publish(it) }
openStreamQueue.close()
repeat(config.numOpenStreamWorkers) {
log.info { "Launching open stream task $it" }
launch(openStreamTaskFactory.make())
Expand Down Expand Up @@ -247,6 +245,12 @@ class DefaultDestinationTaskLauncher(
}
}

override suspend fun handleSetupComplete() {
log.info { "Setup task complete, opening streams" }
catalog.streams.forEach { openStreamQueue.publish(it) }
openStreamQueue.close()
}

/**
* Called for each new batch. Enqueues processing for any incomplete batch, and enqueues closing
* the stream if all batches are complete.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package io.airbyte.cdk.load.task.implementor

import io.airbyte.cdk.load.task.DestinationTaskLauncher
import io.airbyte.cdk.load.task.SelfTerminating
import io.airbyte.cdk.load.task.Task
import io.airbyte.cdk.load.task.TerminalCondition
Expand All @@ -16,24 +17,26 @@ interface SetupTask : Task
/** Wraps @[DestinationWriter.setup] and starts the open stream tasks. */
class DefaultSetupTask(
private val destination: DestinationWriter,
private val taskLauncher: DestinationTaskLauncher,
) : SetupTask {
override val terminalCondition: TerminalCondition = SelfTerminating

override suspend fun execute() {
destination.setup()
taskLauncher.handleSetupComplete()
}
}

interface SetupTaskFactory {
fun make(): SetupTask
fun make(taskLauncher: DestinationTaskLauncher): SetupTask
}

@Singleton
@Secondary
class DefaultSetupTaskFactory(
private val destination: DestinationWriter,
) : SetupTaskFactory {
override fun make(): SetupTask {
return DefaultSetupTask(destination)
override fun make(taskLauncher: DestinationTaskLauncher): SetupTask {
return DefaultSetupTask(destination, taskLauncher)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import io.airbyte.cdk.load.message.ChannelMessageQueue
import io.airbyte.cdk.load.task.internal.ForceFlushEvent
import io.micronaut.context.annotation.Primary
import io.micronaut.context.annotation.Requires
import io.micronaut.context.annotation.Value
import io.micronaut.test.extensions.junit5.annotation.MicronautTest
import jakarta.inject.Singleton
import kotlinx.coroutines.test.runTest
Expand All @@ -28,6 +29,9 @@ import org.junit.jupiter.api.Test
class DefaultFlushStrategyTest {
val stream1 = MockDestinationCatalogFactory.stream1

@Value("\${airbyte.destination.record-batch-size-override}")
private var recordBatchSizeOverride: Long? = null

@Singleton
@Primary
@Requires(env = ["FlushStrategyTest"])
Expand All @@ -40,7 +44,7 @@ class DefaultFlushStrategyTest {
flushStrategy.shouldFlush(
stream1.descriptor,
Range.all(),
config.recordBatchSizeBytes - 1L
(recordBatchSizeOverride ?: config.recordBatchSizeBytes) - 1L
)
)
Assertions.assertTrue(
Expand All @@ -63,10 +67,10 @@ class DefaultFlushStrategyTest {
fun testFlushByIndex(
flushStrategy: DefaultFlushStrategy,
config: DestinationConfiguration,
forceFlushEventProducer: MockForceFlushEventQueue
forceFlushEventProducer: MockForceFlushEventQueue,
) = runTest {
// Ensure the size trigger is not a factor
val insufficientSize = config.recordBatchSizeBytes - 1L
val insufficientSize = (recordBatchSizeOverride ?: config.recordBatchSizeBytes) - 1L

Assertions.assertFalse(
flushStrategy.shouldFlush(stream1.descriptor, Range.all(), insufficientSize),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ class DestinationTaskLauncherTest {
class MockSetupTaskFactory : SetupTaskFactory {
val hasRun: Channel<Unit> = Channel(Channel.UNLIMITED)

override fun make(): SetupTask {
override fun make(taskLauncher: DestinationTaskLauncher): SetupTask {
return object : SetupTask {
override val terminalCondition: TerminalCondition = SelfTerminating

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,5 +242,18 @@ class DestinationTaskLauncherUTest {
job.join()

coVerify(exactly = numOpenStreamWorkers) { openStreamTaskFactory.make() }

coVerify(exactly = 0) { openStreamQueue.publish(any()) }
}

@Test
fun `test streams opened when setup completes`() = runTest {
val launcher = getDefaultDestinationTaskLauncher(false)

coEvery { openStreamQueue.publish(any()) } returns Unit

launcher.handleSetupComplete()

coVerify(exactly = catalog.streams.size) { openStreamQueue.publish(any()) }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ import jakarta.inject.Singleton
class MockTaskLauncher : DestinationTaskLauncher {
val batchEnvelopes = mutableListOf<BatchEnvelope<*>>()

override suspend fun handleSetupComplete() {
throw NotImplementedError()
}

override suspend fun handleNewBatch(
stream: DestinationStream.Descriptor,
wrapped: BatchEnvelope<*>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package io.airbyte.cdk.load.test.util

import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.load.data.AirbyteType
import io.airbyte.cdk.load.data.AirbyteValue
import io.airbyte.cdk.load.data.ArrayValue
Expand All @@ -18,6 +19,7 @@ import java.time.format.DateTimeFormatter

fun interface ExpectedRecordMapper {
fun mapRecord(expectedRecord: OutputRecord, schema: AirbyteType): OutputRecord
fun mapStreamDescriptor(descriptor: DestinationStream.Descriptor) = descriptor
}

object NoopExpectedRecordMapper : ExpectedRecordMapper {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ abstract class IntegrationTest(
val actualRecords: List<OutputRecord> = dataDumper.dumpRecords(config, stream)
val expectedRecords: List<OutputRecord> =
canonicalExpectedRecords.map { recordMangler.mapRecord(it, stream.schema) }
val descriptor = recordMangler.mapStreamDescriptor(stream.descriptor)

RecordDiffer(
primaryKey = primaryKey.map { nameMapper.mapFieldName(it) },
Expand All @@ -116,7 +117,7 @@ abstract class IntegrationTest(
.diffRecords(expectedRecords, actualRecords)
?.let {
var message =
"Incorrect records for ${stream.descriptor.namespace}.${stream.descriptor.name}:\n$it"
"Incorrect records for ${descriptor.namespace}.${descriptor.name}:\n$it"
if (reason != null) {
message = reason + "\n" + message
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -543,6 +543,10 @@ abstract class BasicFunctionalityIntegrationTest(
makeStream("stream_with_underscores"),
makeStream("STREAM_WITH_ALL_CAPS"),
makeStream("CapitalCase"),
makeStream("stream_with_spécial_character"),
makeStream("stream_name_with_operator+1"),
makeStream("stream_name_with_numbers_123"),
makeStream("1stream_with_a_leading_number"),
makeStream(
"stream_with_edge_case_field_names_and_values",
linkedMapOf(
Expand All @@ -551,6 +555,9 @@ abstract class BasicFunctionalityIntegrationTest(
"field_with_underscore" to stringType,
"FIELD_WITH_ALL_CAPS" to stringType,
"field_with_spécial_character" to stringType,
"field_name_with_operator+1" to stringType,
"field_name_with_numbers_123" to stringType,
"1field_with_a_leading_number" to stringType,
// "order" is a reserved word in many sql engines
"order" to stringType,
"ProperCase" to stringType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package io.airbyte.cdk.load.data.avro

import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.load.data.AirbyteType
import io.airbyte.cdk.load.data.AirbyteValue
import io.airbyte.cdk.load.data.ArrayValue
Expand All @@ -28,13 +29,25 @@ object AvroExpectedRecordMapper : ExpectedRecordMapper {
return expectedRecord.copy(data = withRemappedFieldNames as ObjectValue)
}

override fun mapStreamDescriptor(
descriptor: DestinationStream.Descriptor
): DestinationStream.Descriptor {
// Map the special character but not the '+', because only the former is replaced in file
// paths.
return descriptor.copy(name = descriptor.name.replace("é", "e"))
}

private fun fieldNameMangler(value: AirbyteValue): AirbyteValue =
when (value) {
is ObjectValue ->
ObjectValue(
LinkedHashMap(
value.values
.map { (k, v) -> k.replace("é", "e") to fieldNameMangler(v) }
.map { (k, v) ->
k.replace("é", "e").replace("+", "_").replace(Regex("(^\\d+)")) {
"_${it.groupValues[0]}"
} to fieldNameMangler(v)
}
.toMap()
)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,13 @@ interface IcebergCatalogSpecifications {
GlueCatalogConfiguration(
(catalogType as GlueCatalogSpecification).glueId,
(catalogType as GlueCatalogSpecification).toAWSArnRoleConfiguration(),
(catalogType as GlueCatalogSpecification).databaseName,
)
is NessieCatalogSpecification ->
NessieCatalogConfiguration(
(catalogType as NessieCatalogSpecification).serverUri,
(catalogType as NessieCatalogSpecification).accessToken
(catalogType as NessieCatalogSpecification).accessToken,
(catalogType as NessieCatalogSpecification).namespace,
)
}

Expand Down Expand Up @@ -163,7 +165,23 @@ class NessieCatalogSpecification(
"order":2
}""",
)
val accessToken: String?
val accessToken: String?,

/**
* The namespace to be used when building the Table identifier
*
* This namespace will only be used if the stream namespace is null, meaning when the
* `Destination Namespace` setting for the connection is set to `Destination-defined` or
* `Source-defined`
*/
@get:JsonSchemaTitle("Namespace")
@get:JsonPropertyDescription(
"""The Nessie namespace to be used in the Table identifier.
|This will ONLY be used if the `Destination Namespace` setting for the connection is set to
| `Destination-defined` or `Source-defined`"""
)
@get:JsonProperty("namespace")
val namespace: String?
) : CatalogType(catalogType)

/**
Expand Down Expand Up @@ -193,6 +211,20 @@ class GlueCatalogSpecification(
@JsonSchemaInject(json = """{"order":1}""")
val glueId: String,
override val roleArn: String? = null,

/**
* The name of the database to be used when building the Table identifier
*
* This database name will only be used if the stream namespace is null, meaning when the
* `Destination Namespace` setting for the connection is set to `Destination-defined` or
* `Source-defined`
*/
@get:JsonSchemaTitle("Database Name")
@get:JsonPropertyDescription(
"""The Glue database name. This will ONLY be used if the `Destination Namespace` setting for the connection is set to `Destination-defined` or `Source-defined`"""
)
@get:JsonProperty("database_name")
val databaseName: String?
) : CatalogType(catalogType), AWSArnRoleSpecification

/**
Expand Down Expand Up @@ -239,6 +271,11 @@ data class GlueCatalogConfiguration(
@JsonPropertyDescription("The AWS Account ID associated with the Glue service.")
val glueId: String,
override val awsArnRoleConfiguration: AWSArnRoleConfiguration,
@get:JsonSchemaTitle("Database Name")
@get:JsonPropertyDescription(
"""The Glue database name. This will ONLY be used if the `Destination Namespace` setting for the connection is set to `Destination-defined` or `Source-defined`"""
)
val databaseName: String?
) : CatalogConfiguration, AWSArnRoleConfigurationProvider

/**
Expand All @@ -254,7 +291,14 @@ data class NessieCatalogConfiguration(
val serverUri: String,
@JsonSchemaTitle("Nessie Access Token")
@JsonPropertyDescription("An optional token for authentication with the Nessie server.")
val accessToken: String?
val accessToken: String?,
@get:JsonSchemaTitle("Namespace")
@get:JsonPropertyDescription(
"""The Nessie namespace to be used in the Table identifier.
|This will ONLY be used if the `Destination Namespace` setting for the connection is set to
| `Destination-defined` or `Source-defined`"""
)
val namespace: String?
) : CatalogConfiguration

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,22 @@ class ObjectStoragePathFactory(

private fun getPathVariableToPattern(stream: DestinationStream): Map<String, String> {
return PATH_VARIABLES.associate {
it.variable to (it.pattern ?: it.provider(VariableContext(stream)))
it.variable to
(
// Only escape the pattern if
// A) it's not already provided
// B) the value from context is not blank
// This is to ensure stream names/namespaces with special characters (foo+1) match
// correctly,
// but that blank patterns are ignored completely.
it.pattern
?: (it.provider(VariableContext(stream)).let { s ->
if (s.isNotBlank()) {
Regex.escape(s)
} else {
s
}
}))
} +
FILENAME_VARIABLES.associate {
it.variable to
Expand Down
Loading

0 comments on commit f9fd802

Please sign in to comment.