diff --git a/src/main/kotlin/dev/arbjerg/lavalink/client/LavalinkNode.kt b/src/main/kotlin/dev/arbjerg/lavalink/client/LavalinkNode.kt index 6a54546..649e794 100644 --- a/src/main/kotlin/dev/arbjerg/lavalink/client/LavalinkNode.kt +++ b/src/main/kotlin/dev/arbjerg/lavalink/client/LavalinkNode.kt @@ -1,6 +1,7 @@ package dev.arbjerg.lavalink.client import dev.arbjerg.lavalink.client.event.ClientEvent +import dev.arbjerg.lavalink.client.event.ResumeSynchronizationEvent import dev.arbjerg.lavalink.client.http.HttpBuilder import dev.arbjerg.lavalink.client.player.* import dev.arbjerg.lavalink.client.player.Track @@ -17,12 +18,14 @@ import kotlinx.serialization.serializer import okhttp3.Call import okhttp3.OkHttpClient import okhttp3.Response +import org.slf4j.LoggerFactory import reactor.core.Disposable import reactor.core.publisher.Flux import reactor.core.publisher.Mono import reactor.core.publisher.Sinks import reactor.core.publisher.Sinks.Many import reactor.kotlin.core.publisher.toMono +import reactor.util.retry.Retry import java.io.Closeable import java.io.IOException import java.time.Duration @@ -38,6 +41,7 @@ class LavalinkNode( private val nodeOptions: NodeOptions, val lavalink: LavalinkClient ) : Disposable, Closeable { + private val logger = LoggerFactory.getLogger(LavalinkNode::class.java) // "safe" uri with all paths removed val baseUri = "${nodeOptions.serverUri.scheme}://${nodeOptions.serverUri.host}:${nodeOptions.serverUri.port}" @@ -238,7 +242,7 @@ class LavalinkNode( } /** - * Enables resuming. This causes Lavalink to continue playing for [duration], during which + * Enables resuming. This causes Lavalink to continue playing for [timeout] amount of time, during which * we can reconnect without losing our session data. */ fun enableResuming(timeout: Duration): Mono { return rest.patchSession(Session(resuming = true, timeout.seconds)).doOnSuccess { @@ -428,7 +432,9 @@ class LavalinkNode( } internal fun synchronizeAfterResume() { - getPlayers().subscribe { players -> + getPlayers() + .retryWhen(Retry.fixedDelay(3, Duration.ofSeconds(1))) + .map { players -> val remoteGuildIds = players.map { it.guildId } players.forEach { player -> @@ -450,6 +456,13 @@ class LavalinkNode( val link = lavalink.getLinkIfCached(guildId) ?: return@forEach if (link.node == this) link.state = LinkState.DISCONNECTED } + + ResumeSynchronizationEvent(this, failureReason = null) + }.doOnError { + logger.error("Failure while attempting synchronization with $this", it) + sink.tryEmitNext(ResumeSynchronizationEvent(this, failureReason = it)) + }.subscribe { + sink.tryEmitNext(it) } } diff --git a/src/main/kotlin/dev/arbjerg/lavalink/client/event/events.kt b/src/main/kotlin/dev/arbjerg/lavalink/client/event/events.kt index 2974e6c..2a71cb3 100644 --- a/src/main/kotlin/dev/arbjerg/lavalink/client/event/events.kt +++ b/src/main/kotlin/dev/arbjerg/lavalink/client/event/events.kt @@ -7,7 +7,7 @@ import dev.arbjerg.lavalink.client.player.toCustom import dev.arbjerg.lavalink.protocol.v4.* import dev.arbjerg.lavalink.protocol.v4.Message.EmittedEvent.TrackEndEvent.AudioTrackEndReason -internal fun Message.toClientEvent(node: LavalinkNode) = when (this) { +internal fun Message.toClientEvent(node: LavalinkNode): ClientEvent = when (this) { is Message.ReadyEvent -> ReadyEvent(node, resumed, sessionId) is Message.EmittedEvent.TrackEndEvent -> TrackEndEvent(node, guildId.toLong(), track.toCustom(), reason) is Message.EmittedEvent.TrackExceptionEvent -> TrackExceptionEvent(node, guildId.toLong(), track.toCustom(), exception.toCustom()) @@ -18,12 +18,19 @@ internal fun Message.toClientEvent(node: LavalinkNode) = when (this) { is Message.StatsEvent -> StatsEvent(node, frameStats, players, playingPlayers, uptime, memory, cpu) } -sealed class ClientEvent(open val node: LavalinkNode) +abstract class ClientEvent(open val node: LavalinkNode) // Normal events data class ReadyEvent(override val node: LavalinkNode, val resumed: Boolean, val sessionId: String) : ClientEvent(node) +/** + * Represents a successful or failed synchronization after a [ReadyEvent] with [ReadyEvent.resumed] set to true. + * + * Whether it is successful depends on whether [failureReason] is null. + */ +data class ResumeSynchronizationEvent(override val node: LavalinkNode, val failureReason: Throwable?) : ClientEvent(node) + data class PlayerUpdateEvent(override val node: LavalinkNode, val guildId: Long, val state: PlayerState) : ClientEvent(node)