Skip to content

Commit

Permalink
Add ResumeSynchronizationEvent
Browse files Browse the repository at this point in the history
NB: This is potentially breaking as it replaces a `sealed` attribute
  • Loading branch information
freyacodes committed Feb 3, 2025
1 parent ad9a687 commit f584b23
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 4 deletions.
17 changes: 15 additions & 2 deletions src/main/kotlin/dev/arbjerg/lavalink/client/LavalinkNode.kt
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package dev.arbjerg.lavalink.client

import dev.arbjerg.lavalink.client.event.ClientEvent
import dev.arbjerg.lavalink.client.event.ResumeSynchronizationEvent
import dev.arbjerg.lavalink.client.http.HttpBuilder
import dev.arbjerg.lavalink.client.player.*
import dev.arbjerg.lavalink.client.player.Track
Expand All @@ -17,12 +18,14 @@ import kotlinx.serialization.serializer
import okhttp3.Call
import okhttp3.OkHttpClient
import okhttp3.Response
import org.slf4j.LoggerFactory
import reactor.core.Disposable
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import reactor.core.publisher.Sinks
import reactor.core.publisher.Sinks.Many
import reactor.kotlin.core.publisher.toMono
import reactor.util.retry.Retry
import java.io.Closeable
import java.io.IOException
import java.time.Duration
Expand All @@ -38,6 +41,7 @@ class LavalinkNode(
private val nodeOptions: NodeOptions,
val lavalink: LavalinkClient
) : Disposable, Closeable {
private val logger = LoggerFactory.getLogger(LavalinkNode::class.java)
// "safe" uri with all paths removed
val baseUri = "${nodeOptions.serverUri.scheme}://${nodeOptions.serverUri.host}:${nodeOptions.serverUri.port}"

Expand Down Expand Up @@ -238,7 +242,7 @@ class LavalinkNode(
}

/**
* Enables resuming. This causes Lavalink to continue playing for [duration], during which
* Enables resuming. This causes Lavalink to continue playing for [timeout] amount of time, during which
* we can reconnect without losing our session data. */
fun enableResuming(timeout: Duration): Mono<Session> {
return rest.patchSession(Session(resuming = true, timeout.seconds)).doOnSuccess {
Expand Down Expand Up @@ -428,7 +432,9 @@ class LavalinkNode(
}

internal fun synchronizeAfterResume() {
getPlayers().subscribe { players ->
getPlayers()
.retryWhen(Retry.fixedDelay(3, Duration.ofSeconds(1)))
.map { players ->
val remoteGuildIds = players.map { it.guildId }

players.forEach { player ->
Expand All @@ -450,6 +456,13 @@ class LavalinkNode(
val link = lavalink.getLinkIfCached(guildId) ?: return@forEach
if (link.node == this) link.state = LinkState.DISCONNECTED
}

ResumeSynchronizationEvent(this, failureReason = null)
}.doOnError {
logger.error("Failure while attempting synchronization with $this", it)
sink.tryEmitNext(ResumeSynchronizationEvent(this, failureReason = it))
}.subscribe {
sink.tryEmitNext(it)
}
}

Expand Down
11 changes: 9 additions & 2 deletions src/main/kotlin/dev/arbjerg/lavalink/client/event/events.kt
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import dev.arbjerg.lavalink.client.player.toCustom
import dev.arbjerg.lavalink.protocol.v4.*
import dev.arbjerg.lavalink.protocol.v4.Message.EmittedEvent.TrackEndEvent.AudioTrackEndReason

internal fun Message.toClientEvent(node: LavalinkNode) = when (this) {
internal fun Message.toClientEvent(node: LavalinkNode): ClientEvent = when (this) {
is Message.ReadyEvent -> ReadyEvent(node, resumed, sessionId)
is Message.EmittedEvent.TrackEndEvent -> TrackEndEvent(node, guildId.toLong(), track.toCustom(), reason)
is Message.EmittedEvent.TrackExceptionEvent -> TrackExceptionEvent(node, guildId.toLong(), track.toCustom(), exception.toCustom())
Expand All @@ -18,12 +18,19 @@ internal fun Message.toClientEvent(node: LavalinkNode) = when (this) {
is Message.StatsEvent -> StatsEvent(node, frameStats, players, playingPlayers, uptime, memory, cpu)
}

sealed class ClientEvent(open val node: LavalinkNode)
abstract class ClientEvent(open val node: LavalinkNode)

// Normal events
data class ReadyEvent(override val node: LavalinkNode, val resumed: Boolean, val sessionId: String)
: ClientEvent(node)

/**
* Represents a successful or failed synchronization after a [ReadyEvent] with [ReadyEvent.resumed] set to true.
*
* Whether it is successful depends on whether [failureReason] is null.
*/
data class ResumeSynchronizationEvent(override val node: LavalinkNode, val failureReason: Throwable?) : ClientEvent(node)

data class PlayerUpdateEvent(override val node: LavalinkNode, val guildId: Long, val state: PlayerState)
: ClientEvent(node)

Expand Down

0 comments on commit f584b23

Please sign in to comment.