Skip to content

Commit

Permalink
Merge pull request #199 from RADAR-base/release-0.5.12
Browse files Browse the repository at this point in the history
Release 0.5.12
  • Loading branch information
blootsvoets authored Aug 22, 2022
2 parents 3ff2f78 + b059a1b commit 7da101d
Show file tree
Hide file tree
Showing 54 changed files with 678 additions and 582 deletions.
4 changes: 2 additions & 2 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ plugins {

allprojects {
group = "org.radarbase"
version = "0.5.11"
version = "0.5.12"
}

subprojects {
Expand Down Expand Up @@ -72,5 +72,5 @@ tasks.withType<DependencyUpdatesTask> {
}

tasks.wrapper {
gradleVersion = "7.4.2"
gradleVersion = "7.5.1"
}
25 changes: 13 additions & 12 deletions gradle.properties
Original file line number Diff line number Diff line change
@@ -1,22 +1,23 @@
kotlinVersion=1.6.21
kotlinApiVersion=1.6
kotlinVersion=1.7.10
kotlinApiVersion=1.7
kotlin.code.style=official
dockerComposeStopContainers=true
dockerComposeVersion=0.16.8

kafkaVersion=3.2.0
okhttpVersion=4.9.3
jacksonVersion=2.12.7
kafkaVersion=3.2.1
okhttpVersion=4.10.0
jacksonVersion=2.13.3
openCsvVersion=5.6
confluentVersion=7.1.1
confluentVersion=7.2.1
radarSchemaVersion=0.7.9
slf4jVersion=1.7.36
minioVersion=8.3.7
minioVersion=8.4.3
jschVersion=0.1.55
radarJerseyVersion=0.8.3
radarJerseyVersion=0.9.0
radarCommonsVersion=0.15.0
jerseyVersion=3.0.4
hsqldbVersion=2.6.1
junitVersion=5.8.2
jerseyVersion=3.0.6
hsqldbVersion=2.7.0
junitVersion=5.9.0
mockitoKotlinVersion=4.0.0
hamcrestVersion=2.2
log4j2Version=2.17.2
log4j2Version=2.18.0
Binary file modified gradle/wrapper/gradle-wrapper.jar
Binary file not shown.
2 changes: 1 addition & 1 deletion gradle/wrapper/gradle-wrapper.properties
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-7.4.2-bin.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-7.5.1-bin.zip
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
6 changes: 6 additions & 0 deletions gradlew
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,12 @@ set -- \
org.gradle.wrapper.GradleWrapperMain \
"$@"

# Stop when "xargs" is not available.
if ! command -v xargs >/dev/null 2>&1
then
die "xargs is not available"
fi

# Use "xargs" to parse quoted args.
#
# With -n1 it outputs one arg per line, with the quotes and backslashes removed.
Expand Down
14 changes: 8 additions & 6 deletions gradlew.bat
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
@rem limitations under the License.
@rem

@if "%DEBUG%" == "" @echo off
@if "%DEBUG%"=="" @echo off
@rem ##########################################################################
@rem
@rem Gradle startup script for Windows
Expand All @@ -25,7 +25,7 @@
if "%OS%"=="Windows_NT" setlocal

set DIRNAME=%~dp0
if "%DIRNAME%" == "" set DIRNAME=.
if "%DIRNAME%"=="" set DIRNAME=.
set APP_BASE_NAME=%~n0
set APP_HOME=%DIRNAME%

Expand All @@ -40,7 +40,7 @@ if defined JAVA_HOME goto findJavaFromJavaHome

set JAVA_EXE=java.exe
%JAVA_EXE% -version >NUL 2>&1
if "%ERRORLEVEL%" == "0" goto execute
if %ERRORLEVEL% equ 0 goto execute

echo.
echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH.
Expand Down Expand Up @@ -75,13 +75,15 @@ set CLASSPATH=%APP_HOME%\gradle\wrapper\gradle-wrapper.jar

:end
@rem End local scope for the variables with windows NT shell
if "%ERRORLEVEL%"=="0" goto mainEnd
if %ERRORLEVEL% equ 0 goto mainEnd

:fail
rem Set variable GRADLE_EXIT_CONSOLE if you need the _script_ return code instead of
rem the _cmd.exe /c_ return code!
if not "" == "%GRADLE_EXIT_CONSOLE%" exit 1
exit /b 1
set EXIT_CODE=%ERRORLEVEL%
if %EXIT_CODE% equ 0 set EXIT_CODE=1
if not ""=="%GRADLE_EXIT_CONSOLE%" exit %EXIT_CODE%
exit /b %EXIT_CODE%

:mainEnd
if "%OS%"=="Windows_NT" endlocal
Expand Down
15 changes: 9 additions & 6 deletions integration-test/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import java.time.Duration
plugins {
java
kotlin("jvm")
id("com.avast.gradle.docker-compose") version "0.16.4"
id("com.avast.gradle.docker-compose")
}

sourceSets {
Expand All @@ -22,13 +22,16 @@ sourceSets {

dependencies {
implementation(kotlin("stdlib-jdk8"))

val confluentVersion: String by project
testImplementation("io.confluent:kafka-connect-avro-converter:$confluentVersion")
val jacksonVersion: String by project
testImplementation("com.fasterxml.jackson.core:jackson-databind:$jacksonVersion")
testImplementation(platform("com.fasterxml.jackson:jackson-bom:$jacksonVersion"))
val okhttpVersion: String by project
testImplementation("com.squareup.okhttp3:okhttp:$okhttpVersion")
testImplementation("com.fasterxml.jackson.module:jackson-module-kotlin:$jacksonVersion")
testImplementation("com.fasterxml.jackson.datatype:jackson-datatype-jsr310:$jacksonVersion")
testImplementation("com.fasterxml.jackson.datatype:jackson-datatype-jdk8:$jacksonVersion")
testImplementation("com.fasterxml.jackson.module:jackson-module-kotlin")
testImplementation("com.fasterxml.jackson.datatype:jackson-datatype-jsr310")
testImplementation("com.fasterxml.jackson.datatype:jackson-datatype-jdk8")

testImplementation(project(":radar-upload-backend"))
testImplementation(project(":kafka-connect-upload-source"))
Expand All @@ -51,7 +54,7 @@ task<Test>("integrationTest") {
}

tasks.withType<KotlinCompile> {
kotlinOptions.jvmTarget = "11"
kotlinOptions.jvmTarget = "17"
}

dockerCompose {
Expand Down
4 changes: 3 additions & 1 deletion integration-test/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
---
version: '3.2'
version: '3.9'

networks:
management:
Expand All @@ -17,6 +17,7 @@ services:
networks:
- default
- management
platform: linux/amd64
depends_on:
- radarbase-postgresql
ports:
Expand All @@ -40,6 +41,7 @@ services:
- ../etc/mp-config/:/mp-includes/config
healthcheck:
test: "curl -f http://localhost:8080/oauth/token -d grant_type=client_credentials -d client_id=radar_upload_test_client -d client_secret=test"
start_period: 600s

radarbase-postgresql:
image: postgres:14
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import com.fasterxml.jackson.databind.JsonNode
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.databind.SerializationFeature
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule
import com.fasterxml.jackson.module.kotlin.KotlinModule
import com.fasterxml.jackson.module.kotlin.*
import jakarta.ws.rs.core.Response
import okhttp3.Credentials
import okhttp3.FormBody
Expand Down Expand Up @@ -162,11 +162,12 @@ object TestBase {
contentTypes = setOf("application/zip")
)

private val mapper: ObjectMapper = ObjectMapper(JsonFactory())
.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES)
.registerModule(KotlinModule())
.registerModule(JavaTimeModule())
.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false)
private val mapper: ObjectMapper = jsonMapper {
disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES)
disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS)
addModule(kotlinModule())
addModule(JavaTimeModule())
}

val clientCredentialsAuthorizer = OAuthClientCredentialsInterceptor(
httpClient,
Expand Down
4 changes: 2 additions & 2 deletions kafka-connect-upload-source/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM --platform=$BUILDPLATFORM gradle:7.4-jdk11 as builder
FROM --platform=$BUILDPLATFORM gradle:7.5-jdk11 as builder

RUN mkdir /code
WORKDIR /code
Expand All @@ -14,7 +14,7 @@ COPY ./kafka-connect-upload-source/src/main/java /code/kafka-connect-upload-sour

RUN gradle jar --no-watch-fs

FROM confluentinc/cp-kafka-connect-base:7.1.1
FROM confluentinc/cp-kafka-connect-base:7.2.1

MAINTAINER @nivemaham @blootsvoets

Expand Down
2 changes: 2 additions & 0 deletions kafka-connect-upload-source/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ dependencies {
val junitVersion: String by project
testImplementation("org.junit.jupiter:junit-jupiter:$junitVersion")

testImplementation("io.confluent:kafka-connect-avro-converter:$confluentVersion")

val hamcrestVersion: String by project
testImplementation("org.hamcrest:hamcrest:$hamcrestVersion")
testImplementation("org.apache.kafka:connect-api:$kafkaVersion")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import org.radarbase.connect.upload.exception.ConversionTemporarilyFailedExcepti
import org.slf4j.LoggerFactory
import java.io.Closeable
import java.time.Duration
import java.time.Instant
import java.util.concurrent.BlockingQueue
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit
Expand All @@ -22,7 +23,7 @@ class ConverterManager(
private val converters: Map<String, ConverterFactory.Converter>,
private val uploadClient: UploadBackendClient,
private val logRepository: LogRepository,
pollDuration: Duration,
private val pollDuration: Duration,
): Closeable {
private val executor = Executors.newSingleThreadScheduledExecutor()

Expand All @@ -38,11 +39,20 @@ class ConverterManager(

private fun poll() {
logger.info("Polling new records...")
val intervalEnd = Instant.now() + pollDuration
val pollSize = 1

do {
val processedRecords = makePoll(pollSize)
} while (processedRecords == pollSize && Instant.now() < intervalEnd)
}

private fun makePoll(numberOfRecords: Int): Int {
val records = try {
uploadClient.pollRecords(PollDTO(1, converters.keys)).records
uploadClient.pollRecords(PollDTO(numberOfRecords, converters.keys)).records
} catch (exe: Throwable) {
logger.error("Could not successfully poll records. Waiting for next polling...", exe)
return
return numberOfRecords
}

try {
Expand All @@ -56,7 +66,9 @@ class ConverterManager(
}
} catch (ex: Throwable) {
logger.error("Failed to process records", ex)
return numberOfRecords
}
return records.size
}

private fun processRecord(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@ import com.fasterxml.jackson.databind.DeserializationFeature
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.databind.SerializationFeature
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule
import com.fasterxml.jackson.module.kotlin.KotlinFeature
import com.fasterxml.jackson.module.kotlin.KotlinModule
import com.fasterxml.jackson.module.kotlin.jsonMapper
import com.fasterxml.jackson.module.kotlin.kotlinModule
import okhttp3.Interceptor
import okhttp3.OkHttpClient
import org.apache.kafka.common.config.AbstractConfig
Expand Down Expand Up @@ -165,11 +168,12 @@ class UploadSourceConnectorConfig(config: ConfigDef, parsedConfig: Map<String, S
.readTimeout(30, TimeUnit.SECONDS)
.build()

var mapper: ObjectMapper = ObjectMapper()
.registerModule(KotlinModule())
.registerModule(JavaTimeModule())
.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS)
.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES)
var mapper: ObjectMapper = jsonMapper {
addModule(kotlinModule())
addModule(JavaTimeModule())
disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS)
disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES)
}

fun conf(): ConfigDef {
val groupName = "upload"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,8 @@ class UploadSourceTask : SourceTask() {

// init converters if configured
converters = connectConfig.converterClasses
.map { className -> ConverterFactory.createConverter(className, props, uploadClient, logRepository)
.let { it.sourceType to it }}
.toMap()
.map { className -> ConverterFactory.createConverter(className, props, uploadClient, logRepository) }
.associateBy { it.sourceType }

val pollIntervalMs = connectConfig.getLong(SOURCE_POLL_INTERVAL_CONFIG)
pollInterval = Duration.ofMillis(pollIntervalMs)
Expand All @@ -97,27 +96,31 @@ class UploadSourceTask : SourceTask() {

override fun stop() {
logger.debug("Stopping source task")
converterManager.close()
uploadClient.close()
commitTimer.cancel()
converters.values.forEach(Converter::close)
if (this::converterManager.isInitialized) {
converterManager.close()
}
if (this::uploadClient.isInitialized) {
uploadClient.close()
}
if (this::commitTimer.isInitialized) {
commitTimer.cancel()
}
if (this::converters.isInitialized) {
converters.values.forEach(Converter::close)
}
}

override fun version(): String = VersionUtil.getVersion()

override fun poll(): List<SourceRecord>? {
val records = generateSequence { queue.poll() } // this will retrieve all non-blocking elements
override fun poll(): List<SourceRecord>? =
generateSequence { queue.poll() } // this will retrieve all non-blocking elements
.take(queueSize) // don't process more than queueSize records at once
.toList()

return if (records.isNotEmpty()) {
records
} else {
// if no non-blocking elements are available, it's ok to wait for them for a bit.
queue.poll(pollInterval.toMillis(), TimeUnit.MILLISECONDS)
?.let { listOf(it) }
}
}
.ifEmpty {
// if no non-blocking elements are available, it's ok to wait for them for a bit.
val lateElement = queue.poll(pollInterval.toMillis(), TimeUnit.MILLISECONDS)
if (lateElement != null) listOf(lateElement) else null
}

override fun commitRecord(record: SourceRecord?, recordMetadata: RecordMetadata?) {
record ?: return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ data class OAuthToken(
@JsonProperty("expires_in") var expiresIn: Long,
@JsonProperty("iat") var issuedAt: Long,
@JsonProperty("scope") var scope: String,
@JsonProperty("sub") var sub: String,
// @JsonProperty("sub") var sub: String,
) {
@get:JsonIgnore
val isExpired: Boolean
Expand Down
Loading

0 comments on commit 7da101d

Please sign in to comment.