Skip to content

Commit

Permalink
fjerner avhengighet til egen database
Browse files Browse the repository at this point in the history
spre-subsumsjon henter dokumenter fra spedisjon

Co-authored-by: Simen Ullern <[email protected]>
Co-authored-by: Håkon Arneng Holmstedt <[email protected]>
  • Loading branch information
3 people committed Nov 25, 2024
1 parent fc8447e commit f96d18b
Show file tree
Hide file tree
Showing 29 changed files with 13 additions and 967 deletions.
4 changes: 0 additions & 4 deletions config/subsumsjon/dev-gcp.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,3 @@
gcp:
type: POSTGRES_12
databaseName: spre-subsumsjon
sqlInstanceTier: db-f1-micro
azure:
enabled: true
tenant: trygdeetaten.no
Expand Down
3 changes: 0 additions & 3 deletions config/subsumsjon/prod-gcp.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
gcp:
type: POSTGRES_15
databaseName: spre-subsumsjon
azure:
enabled: true
tenant: nav.no
Expand Down
10 changes: 0 additions & 10 deletions subsumsjon/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -1,24 +1,14 @@
val hikariCPVersion: String by project
val postgresqlVersion: String by project
val kotliqueryVersion: String by project
val flywayCoreVersion: String by project
val tbdLibsVersion: String by project
val mockkVersion: String by project
val jsonSchemaValidatorVersion = "1.0.65"
val kotestAssertionsCoreVersion = "5.1.0"

dependencies {
implementation("com.zaxxer:HikariCP:$hikariCPVersion")
implementation("org.postgresql:postgresql:$postgresqlVersion")
implementation("org.flywaydb:flyway-database-postgresql:$flywayCoreVersion")
implementation("com.github.seratch:kotliquery:$kotliqueryVersion")

implementation("com.github.navikt.tbd-libs:azure-token-client-default:$tbdLibsVersion")
implementation("com.github.navikt.tbd-libs:retry:$tbdLibsVersion")
implementation("com.github.navikt.tbd-libs:spedisjon-client:$tbdLibsVersion")

testImplementation("com.github.navikt.tbd-libs:rapids-and-rivers-test:$tbdLibsVersion")
testImplementation("com.github.navikt.tbd-libs:postgres-testdatabaser:$tbdLibsVersion")
testImplementation("com.networknt:json-schema-validator:$jsonSchemaValidatorVersion")
testImplementation("io.kotest:kotest-assertions-core:$kotestAssertionsCoreVersion")
testImplementation("io.mockk:mockk:$mockkVersion")
Expand Down
53 changes: 8 additions & 45 deletions subsumsjon/src/main/kotlin/no/nav/helse/spre/subsumsjon/App.kt
Original file line number Diff line number Diff line change
Expand Up @@ -5,34 +5,27 @@ import com.fasterxml.jackson.databind.SerializationFeature
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule
import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
import com.github.navikt.tbd_libs.azure.createAzureTokenClientFromEnvironment
import com.github.navikt.tbd_libs.rapids_and_rivers_api.RapidsConnection
import com.github.navikt.tbd_libs.kafka.AivenConfig
import com.github.navikt.tbd_libs.kafka.ConsumerProducerFactory
import com.github.navikt.tbd_libs.spedisjon.SpedisjonClient
import no.nav.helse.rapids_rivers.*
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerConfig
import no.nav.helse.rapids_rivers.RapidApplication
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.config.SslConfigs
import org.apache.kafka.common.errors.*
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.serialization.StringSerializer
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import java.net.http.HttpClient
import java.util.*

internal val log = LoggerFactory.getLogger("spre-subsumsjoner")
internal val sikkerLogg: Logger = LoggerFactory.getLogger("tjenestekall")

fun main() {
val env = System.getenv()
val kafkaProducer = createProducer(env)
val config = Config.fromEnv()
val dataSourceBuilder = DataSourceBuilder(config.jdbcUrl, config.username, config.password)
val mappingDao = MappingDao(dataSourceBuilder.datasource())
val rapid = RapidApplication.create(env)
val factory = ConsumerProducerFactory(AivenConfig.default)
val kafkaProducer = factory.createProducer()
val rapid = RapidApplication.create(env, factory)
val subsumsjonTopic = requireNotNull(env["SUBSUMSJON_TOPIC"]) { " SUBSUMSJON_TOPIC is required config " }
val publisher = { key: String, value: String ->
kafkaProducer.send(ProducerRecord(config.subsumsjonTopic, key, value)) { _, err ->
kafkaProducer.send(ProducerRecord(subsumsjonTopic, key, value)) { _, err ->
if (err == null || !isFatalError(err)) return@send
log.error("Shutting down due to fatal error in subsumsjon-producer: ${err.message}", err)
rapid.stop()
Expand All @@ -46,43 +39,13 @@ fun main() {
tokenProvider = azureClient
)

// Migrer databasen før vi starter å konsumere fra rapid
rapid.register(object : RapidsConnection.StatusListener {
override fun onStartup(rapidsConnection: RapidsConnection) {
dataSourceBuilder.migrate()
}
})

rapid.apply {
SubsumsjonRiver(this, spedisjonClient) { key, value -> publisher(key, value) }
SykemeldingRiver(this, mappingDao)
SøknadRiver(this, mappingDao)
DokumentAliasRiver(this, mappingDao)
InntektsmeldingRiver(this, mappingDao)
VedtakFattetRiver(this) { key, value -> publisher(key, value) }
VedtakForkastetRiver(this) { key, value -> publisher(key, value) }
}.start()
}

private fun createProducer(env: Map<String, String>): KafkaProducer<String, String> {
val properties = Properties().apply {
put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, env.getValue("KAFKA_BROKERS"))
put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SSL.name)
put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "")
put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, "jks")
put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "PKCS12")
put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, env.getValue("KAFKA_TRUSTSTORE_PATH"))
put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, env.getValue("KAFKA_CREDSTORE_PASSWORD"))
put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, env.getValue("KAFKA_KEYSTORE_PATH"))
put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, env.getValue("KAFKA_CREDSTORE_PASSWORD"))
put(ProducerConfig.CLIENT_ID_CONFIG, env.getValue("KAFKA_CONSUMER_GROUP_ID"))
put(ProducerConfig.ACKS_CONFIG, "all")
put(ProducerConfig.LINGER_MS_CONFIG, "0")
put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1")
}
return KafkaProducer(properties, StringSerializer(), StringSerializer())
}

private fun isFatalError(err: Exception) = when (err) {
is InvalidTopicException,
is RecordBatchTooLargeException,
Expand Down
33 changes: 0 additions & 33 deletions subsumsjon/src/main/kotlin/no/nav/helse/spre/subsumsjon/Config.kt

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package no.nav.helse.spre.subsumsjon

import com.fasterxml.jackson.databind.JsonNode
import com.github.navikt.tbd_libs.rapids_and_rivers.JsonMessage
import com.github.navikt.tbd_libs.rapids_and_rivers.River
import com.github.navikt.tbd_libs.rapids_and_rivers.isMissingOrNull
Expand Down Expand Up @@ -130,3 +131,6 @@ internal class SubsumsjonRiver(
return objectMapper.writeValueAsString(subsumsjonsmelding).also { sikkerLogg.info("sender subsumsjon: $it") }
}
}

internal fun JsonNode.toUUID() = UUID.fromString(this.asText())
internal fun JsonNode.toUUIDs() = this.map { it.toUUID() }
Loading

0 comments on commit f96d18b

Please sign in to comment.