Skip to content

Commit

Permalink
send hendelser i tx
Browse files Browse the repository at this point in the history
Uten dette har vi et potensielt case hvor meldinger blir mistet.
Siden man gjør delete returning så er det viktig at vi sender alle meldinger på kafka før commit.
Dersom det blir en feil vil det bli muligens bli sendt dobbelt opp, men det er bedre enn ingen.
  • Loading branch information
kenglxn committed Jan 14, 2025
1 parent 7645422 commit eb9ad2e
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ class Database private constructor(

suspend fun <T> transaction(
rollback: (e: Exception) -> T = { throw it },
body: Transaction.() -> T,
body: suspend Transaction.() -> T,
): T =
dataSource.withConnection { connection ->
val savedAutoCommit = connection.autoCommit
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,49 +173,64 @@ class SkedulertUtgåttRepository(
}
}

suspend fun hentOgFjernAlleUtgåtteOppgaver(localDateNow: LocalDate) = database.nonTransactionalExecuteQuery(
"""
suspend fun hentOgFjernAlleUtgåtteOppgaver(
localDateNow: LocalDate,
action: suspend (Oppgave) -> Unit
) = database.transaction {
executeQuery(
"""
delete from skedulert_utgatt as s
using oppgave as o
where o.oppgave_id = s.aggregat_id
and s.aggregat_type = ? and o.frist < ?
returning *
""", {
text(AggregatType.OPPGAVE.name)
localDateAsText(localDateNow)
}) {
Oppgave(
oppgaveId = getUUID("oppgave_id"),
frist = getLocalDate("frist"),
virksomhetsnummer = getString("virksomhetsnummer"),
produsentId = getString("produsent_id"),
)
text(AggregatType.OPPGAVE.name)
localDateAsText(localDateNow)
}) {
Oppgave(
oppgaveId = getUUID("oppgave_id"),
frist = getLocalDate("frist"),
virksomhetsnummer = getString("virksomhetsnummer"),
produsentId = getString("produsent_id"),
)
}.forEach {
action(it)
}
}

suspend fun hentOgFjernAlleAvholdteKalenderavtaler(localDateTimeNow: LocalDateTime) =
database.nonTransactionalExecuteQuery(
"""

suspend fun hentOgFjernAlleAvholdteKalenderavtaler(
localDateTimeNow: LocalDateTime,
action: suspend (Kalenderavtale) -> Unit
) =
database.transaction {
executeQuery(
"""
delete from skedulert_utgatt as s
using kalenderavtale as k
where k.kalenderavtale_id = s.aggregat_id
and s.aggregat_type = ? and k.start_tidspunkt < ?
returning *
""", {
text(AggregatType.KALENDERAVTALE.name)
localDateTimeAsText(localDateTimeNow)
}) {
Kalenderavtale(
kalenderavtaleId = getUUID("kalenderavtale_id"),
startTidspunkt = getLocalDateTime("start_tidspunkt"),
virksomhetsnummer = getString("virksomhetsnummer"),
tilstand = getString("tilstand").let { KalenderavtaleTilstand.valueOf(it) },
produsentId = getString("produsent_id"),
merkelapp = getString("merkelapp"),
grupperingsid = getString("grupperingsid"),
opprettetTidspunkt = Instant.parse(
getString("opprettet_tidspunkt"),
text(AggregatType.KALENDERAVTALE.name)
localDateTimeAsText(localDateTimeNow)
}) {
Kalenderavtale(
kalenderavtaleId = getUUID("kalenderavtale_id"),
startTidspunkt = getLocalDateTime("start_tidspunkt"),
virksomhetsnummer = getString("virksomhetsnummer"),
tilstand = getString("tilstand").let { KalenderavtaleTilstand.valueOf(it) },
produsentId = getString("produsent_id"),
merkelapp = getString("merkelapp"),
grupperingsid = getString("grupperingsid"),
opprettetTidspunkt = Instant.parse(
getString("opprettet_tidspunkt"),
)
)
)
}.forEach {
action(it)
}
}

private suspend fun upsertSkedulertUtgåttOppgave(oppgave: Oppgave) = database.transaction {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ class SkedulertUtgåttService(
private val osloTid: OsloTid = OsloTidImpl
) {
suspend fun settOppgaverUtgåttBasertPåFrist(now: LocalDate = osloTid.localDateNow()) {
val utgåttFrist = repository.hentOgFjernAlleUtgåtteOppgaver(now)
utgåttFrist.forEach { utgått ->
repository.hentOgFjernAlleUtgåtteOppgaver(now) { utgått ->
val fristLocalDateTime = LocalDateTime.of(utgått.frist, LocalTime.MAX)
hendelseProdusent.send(HendelseModel.OppgaveUtgått(
virksomhetsnummer = utgått.virksomhetsnummer,
Expand All @@ -36,7 +35,7 @@ class SkedulertUtgåttService(
}

suspend fun settKalenderavtalerAvholdtBasertPåTidspunkt(now: LocalDateTime = osloTid.localDateTimeNow()) {
repository.hentOgFjernAlleAvholdteKalenderavtaler(now).forEach { avholdt ->
repository.hentOgFjernAlleAvholdteKalenderavtaler(now) { avholdt ->
hendelseProdusent.send(HendelseModel.KalenderavtaleOppdatert(
virksomhetsnummer = avholdt.virksomhetsnummer,
notifikasjonId = avholdt.kalenderavtaleId,
Expand Down

0 comments on commit eb9ad2e

Please sign in to comment.