From e4f0769c8f8b9c37c84ab1fbf6cba1d317482951 Mon Sep 17 00:00:00 2001 From: Freya Arbjerg Date: Thu, 5 Oct 2023 12:57:44 +0200 Subject: [PATCH 1/4] Automatically reconnect on abnormal close frame --- .../lavakord/audio/internal/NodeImpl.kt | 26 ++++++++++++------- 1 file changed, 17 insertions(+), 9 deletions(-) diff --git a/core/src/commonMain/kotlin/dev/schlaubi/lavakord/audio/internal/NodeImpl.kt b/core/src/commonMain/kotlin/dev/schlaubi/lavakord/audio/internal/NodeImpl.kt index 10d28430..23665465 100644 --- a/core/src/commonMain/kotlin/dev/schlaubi/lavakord/audio/internal/NodeImpl.kt +++ b/core/src/commonMain/kotlin/dev/schlaubi/lavakord/audio/internal/NodeImpl.kt @@ -22,6 +22,7 @@ import io.ktor.websocket.* import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.DelicateCoroutinesApi import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.channels.ClosedReceiveChannelException import kotlinx.coroutines.flow.MutableSharedFlow import kotlinx.coroutines.flow.SharedFlow import kotlinx.coroutines.flow.asSharedFlow @@ -118,30 +119,37 @@ internal class NodeImpl( retry.reset() - LOG.debug { "Successfully connected to node: $name ($host)" } + LOG.info { "Successfully connected to node: $name ($host)" } while (!session.incoming.isClosedForReceive) { try { onEvent(session.receiveDeserialized()) - } catch (e: WebsocketDeserializeException) { - LOG.warn(e) { "An error occurred whilst decoding incoming websocket packet" } + } catch (e: ClosedReceiveChannelException) { + break + } catch (e: Exception) { + LOG.warn(e) { "An exception occurred whilst decoding incoming websocket packet" } } } - val reason = session.closeReason.await() - if (reason?.knownReason == CloseReason.Codes.NORMAL) return + available = false - LOG.warn { "Disconnected from websocket for: $reason. Music will continue playing if we can reconnect within the next $resumeTimeout seconds" } - reconnect(resume = true) + val reason = session.closeReason.await() + val resumeAgain = resume && reason?.knownReason != CloseReason.Codes.NORMAL + if (resumeAgain) { + LOG.warn { "Disconnected from websocket for: $reason. Music will continue playing if we can reconnect within the next $resumeTimeout seconds" } + } else { + LOG.warn { "Disconnected from websocket for: $reason. Not resuming." } + } + reconnect(resume = resumeAgain) } private suspend fun reconnect(e: Throwable? = null, resume: Boolean = false) { - LOG.error(e) { "Exception whilst trying to connect. Reconnecting" } if (retry.hasNext) { + LOG.error(e) { "Exception whilst trying to connect. Reconnecting" } retry.retry() connect(resume) } else { lavakord.removeNode(this) - error("Could not reconnect to websocket after to many attempts") + throw RuntimeException("Could not reconnect to websocket after to many attempts", e) } } From f85a398a87277709145a851376faac6ce740e32e Mon Sep 17 00:00:00 2001 From: Freya Arbjerg Date: Thu, 5 Oct 2023 14:44:03 +0200 Subject: [PATCH 2/4] Reconnect voice session upon reconnect --- .../dev/schlaubi/lavakord/audio/Link.kt | 6 +++++ .../audio/internal/AbstractLavakord.kt | 12 +++++++++- .../lavakord/audio/internal/AbstractLink.kt | 23 ++++++++++++++++++- .../lavakord/audio/internal/NodeImpl.kt | 11 ++++++++- 4 files changed, 49 insertions(+), 3 deletions(-) diff --git a/core/src/commonMain/kotlin/dev/schlaubi/lavakord/audio/Link.kt b/core/src/commonMain/kotlin/dev/schlaubi/lavakord/audio/Link.kt index 21f15af8..737c1b10 100644 --- a/core/src/commonMain/kotlin/dev/schlaubi/lavakord/audio/Link.kt +++ b/core/src/commonMain/kotlin/dev/schlaubi/lavakord/audio/Link.kt @@ -46,6 +46,12 @@ public interface Link { */ public suspend fun onDisconnected() + /** + * Called internally when this link is connected or reconnected to a new node without resuming, thereby creating a + * new session. + */ + public suspend fun onNewSession() + /** * Destroys this link (will no longer be usable). */ diff --git a/core/src/commonMain/kotlin/dev/schlaubi/lavakord/audio/internal/AbstractLavakord.kt b/core/src/commonMain/kotlin/dev/schlaubi/lavakord/audio/internal/AbstractLavakord.kt index 9158b9ed..d624c4d8 100644 --- a/core/src/commonMain/kotlin/dev/schlaubi/lavakord/audio/internal/AbstractLavakord.kt +++ b/core/src/commonMain/kotlin/dev/schlaubi/lavakord/audio/internal/AbstractLavakord.kt @@ -172,11 +172,21 @@ public abstract class AbstractLavakord internal constructor( guildId: ULong, event: VoiceState ) { - link.node.updatePlayer(guildId, request = PlayerUpdate(voice = event.toOmissible())) + (link as AbstractLink).onVoiceServerUpdate(event) } /** * Abstract function to create a new [Link] for this [guild][guildId] using this [node]. */ protected abstract fun buildNewLink(guildId: ULong, node: Node): Link + + /** Called on websocket connect without resuming */ + internal suspend fun onNewSession(node: Node) { + if (!options.link.autoReconnect) return + linksMap.values.filter { it.node == node }.forEach { + launch { + it.onNewSession() + } + } + } } diff --git a/core/src/commonMain/kotlin/dev/schlaubi/lavakord/audio/internal/AbstractLink.kt b/core/src/commonMain/kotlin/dev/schlaubi/lavakord/audio/internal/AbstractLink.kt index 88e50c28..c7c2216e 100644 --- a/core/src/commonMain/kotlin/dev/schlaubi/lavakord/audio/internal/AbstractLink.kt +++ b/core/src/commonMain/kotlin/dev/schlaubi/lavakord/audio/internal/AbstractLink.kt @@ -1,22 +1,38 @@ package dev.schlaubi.lavakord.audio.internal +import dev.arbjerg.lavalink.protocol.v4.PlayerUpdate +import dev.arbjerg.lavalink.protocol.v4.VoiceState +import dev.arbjerg.lavalink.protocol.v4.toOmissible import dev.schlaubi.lavakord.audio.Link import dev.schlaubi.lavakord.audio.Node import dev.schlaubi.lavakord.audio.player.Player import dev.schlaubi.lavakord.rest.destroyPlayer +import dev.schlaubi.lavakord.rest.updatePlayer /** * Abstract implementation of [Link]. */ -public abstract class AbstractLink(final override val node: Node, final override val guildId: ULong) : Link { +public abstract class AbstractLink(node: Node, final override val guildId: ULong) : Link { + + final override var node: Node = node + private set + override val player: Player = WebsocketPlayer(node as NodeImpl, guildId) abstract override val lavakord: AbstractLavakord override var lastChannelId: ULong? = null override var state: Link.State = Link.State.NOT_CONNECTED + private var cachedVoiceState: VoiceState? = null override suspend fun onDisconnected() { state = Link.State.NOT_CONNECTED node.destroyPlayer(guildId) + cachedVoiceState = null + } + + override suspend fun onNewSession() { + cachedVoiceState?.let { + node.updatePlayer(guildId, request = PlayerUpdate(voice = it.toOmissible())) + } } override suspend fun destroy() { @@ -29,4 +45,9 @@ public abstract class AbstractLink(final override val node: Node, final override lavakord.removeDestroyedLink(this) state = Link.State.DESTROYED } + + internal suspend fun onVoiceServerUpdate(update: VoiceState) { + cachedVoiceState = update + node.updatePlayer(guildId, request = PlayerUpdate(voice = update.toOmissible())) + } } diff --git a/core/src/commonMain/kotlin/dev/schlaubi/lavakord/audio/internal/NodeImpl.kt b/core/src/commonMain/kotlin/dev/schlaubi/lavakord/audio/internal/NodeImpl.kt index 23665465..c49ac882 100644 --- a/core/src/commonMain/kotlin/dev/schlaubi/lavakord/audio/internal/NodeImpl.kt +++ b/core/src/commonMain/kotlin/dev/schlaubi/lavakord/audio/internal/NodeImpl.kt @@ -144,7 +144,7 @@ internal class NodeImpl( private suspend fun reconnect(e: Throwable? = null, resume: Boolean = false) { if (retry.hasNext) { - LOG.error(e) { "Exception whilst trying to connect. Reconnecting" } + LOG.error { "Exception whilst trying to connect: '${e?.message}'. Reconnecting" } retry.retry() connect(resume) } else { @@ -181,6 +181,14 @@ internal class NodeImpl( is Message.PlayerUpdateEvent -> (lavakord.getLink(event.guildId).player as WebsocketPlayer) .provideState(event.state) + is Message.EmittedEvent.WebSocketClosedEvent -> { + // These codes represent an invalid session + // See https://discord.com/developers/docs/topics/opcodes-and-status-codes#voice-voice-close-event-codes + if (event.code == 4004 || event.code == 4006 || event.code == 4009 || event.code == 4014) { + lavakord.getLink(event.guildId).onDisconnected() + } + } + is Message.StatsEvent -> { LOG.debug { "Received node statistics for $name: $event" } lastStatsEvent = event @@ -193,6 +201,7 @@ internal class NodeImpl( is Message.ReadyEvent -> { available = true sessionId = event.sessionId + lavakord.onNewSession(this) updateSession( SessionUpdate( resuming = true.toOmissible(), From 6581d8ff465b36751f829f7259afbc3cd3f834b6 Mon Sep 17 00:00:00 2001 From: Freya Arbjerg Date: Tue, 10 Oct 2023 15:46:42 +0200 Subject: [PATCH 3/4] Fix typo --- .../kotlin/dev/schlaubi/lavakord/audio/internal/NodeImpl.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/commonMain/kotlin/dev/schlaubi/lavakord/audio/internal/NodeImpl.kt b/core/src/commonMain/kotlin/dev/schlaubi/lavakord/audio/internal/NodeImpl.kt index c49ac882..f63348d0 100644 --- a/core/src/commonMain/kotlin/dev/schlaubi/lavakord/audio/internal/NodeImpl.kt +++ b/core/src/commonMain/kotlin/dev/schlaubi/lavakord/audio/internal/NodeImpl.kt @@ -149,7 +149,7 @@ internal class NodeImpl( connect(resume) } else { lavakord.removeNode(this) - throw RuntimeException("Could not reconnect to websocket after to many attempts", e) + throw RuntimeException("Could not reconnect to websocket after too many attempts", e) } } From f73feb2bb9c5915bb70b8ff527d8f80d7f0646be Mon Sep 17 00:00:00 2001 From: Freya Arbjerg Date: Thu, 12 Oct 2023 16:04:54 +0200 Subject: [PATCH 4/4] Throw IllegalStateException instead of RuntimeException on too many reconnects --- .../kotlin/dev/schlaubi/lavakord/audio/internal/NodeImpl.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/commonMain/kotlin/dev/schlaubi/lavakord/audio/internal/NodeImpl.kt b/core/src/commonMain/kotlin/dev/schlaubi/lavakord/audio/internal/NodeImpl.kt index f63348d0..272cbb02 100644 --- a/core/src/commonMain/kotlin/dev/schlaubi/lavakord/audio/internal/NodeImpl.kt +++ b/core/src/commonMain/kotlin/dev/schlaubi/lavakord/audio/internal/NodeImpl.kt @@ -149,7 +149,7 @@ internal class NodeImpl( connect(resume) } else { lavakord.removeNode(this) - throw RuntimeException("Could not reconnect to websocket after too many attempts", e) + throw IllegalStateException("Could not reconnect to websocket after too many attempts", e) } }