Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add non-resume reconnect handling #28

Merged
merged 4 commits into from
Oct 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@ public interface Link {
*/
public suspend fun onDisconnected()

/**
* Called internally when this link is connected or reconnected to a new node without resuming, thereby creating a
* new session.
*/
public suspend fun onNewSession()

/**
* Destroys this link (will no longer be usable).
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,11 +172,21 @@ public abstract class AbstractLavakord internal constructor(
guildId: ULong,
event: VoiceState
) {
link.node.updatePlayer(guildId, request = PlayerUpdate(voice = event.toOmissible()))
(link as AbstractLink).onVoiceServerUpdate(event)
}

/**
* Abstract function to create a new [Link] for this [guild][guildId] using this [node].
*/
protected abstract fun buildNewLink(guildId: ULong, node: Node): Link

/** Called on websocket connect without resuming */
internal suspend fun onNewSession(node: Node) {
if (!options.link.autoReconnect) return
linksMap.values.filter { it.node == node }.forEach {
launch {
it.onNewSession()
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,22 +1,38 @@
package dev.schlaubi.lavakord.audio.internal

import dev.arbjerg.lavalink.protocol.v4.PlayerUpdate
import dev.arbjerg.lavalink.protocol.v4.VoiceState
import dev.arbjerg.lavalink.protocol.v4.toOmissible
import dev.schlaubi.lavakord.audio.Link
import dev.schlaubi.lavakord.audio.Node
import dev.schlaubi.lavakord.audio.player.Player
import dev.schlaubi.lavakord.rest.destroyPlayer
import dev.schlaubi.lavakord.rest.updatePlayer

/**
* Abstract implementation of [Link].
*/
public abstract class AbstractLink(final override val node: Node, final override val guildId: ULong) : Link {
public abstract class AbstractLink(node: Node, final override val guildId: ULong) : Link {

final override var node: Node = node
private set

override val player: Player = WebsocketPlayer(node as NodeImpl, guildId)
abstract override val lavakord: AbstractLavakord
override var lastChannelId: ULong? = null
override var state: Link.State = Link.State.NOT_CONNECTED
private var cachedVoiceState: VoiceState? = null

override suspend fun onDisconnected() {
state = Link.State.NOT_CONNECTED
node.destroyPlayer(guildId)
cachedVoiceState = null
}

override suspend fun onNewSession() {
cachedVoiceState?.let {
node.updatePlayer(guildId, request = PlayerUpdate(voice = it.toOmissible()))
}
}

override suspend fun destroy() {
Expand All @@ -29,4 +45,9 @@ public abstract class AbstractLink(final override val node: Node, final override
lavakord.removeDestroyedLink(this)
state = Link.State.DESTROYED
}

internal suspend fun onVoiceServerUpdate(update: VoiceState) {
cachedVoiceState = update
node.updatePlayer(guildId, request = PlayerUpdate(voice = update.toOmissible()))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import io.ktor.websocket.*
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.DelicateCoroutinesApi
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.ClosedReceiveChannelException
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.SharedFlow
import kotlinx.coroutines.flow.asSharedFlow
Expand Down Expand Up @@ -118,30 +119,37 @@ internal class NodeImpl(

retry.reset()

LOG.debug { "Successfully connected to node: $name ($host)" }
LOG.info { "Successfully connected to node: $name ($host)" }

while (!session.incoming.isClosedForReceive) {
try {
onEvent(session.receiveDeserialized())
} catch (e: WebsocketDeserializeException) {
LOG.warn(e) { "An error occurred whilst decoding incoming websocket packet" }
} catch (e: ClosedReceiveChannelException) {
break
} catch (e: Exception) {
LOG.warn(e) { "An exception occurred whilst decoding incoming websocket packet" }
}
}
val reason = session.closeReason.await()
if (reason?.knownReason == CloseReason.Codes.NORMAL) return

available = false
LOG.warn { "Disconnected from websocket for: $reason. Music will continue playing if we can reconnect within the next $resumeTimeout seconds" }
reconnect(resume = true)
val reason = session.closeReason.await()
val resumeAgain = resume && reason?.knownReason != CloseReason.Codes.NORMAL
if (resumeAgain) {
LOG.warn { "Disconnected from websocket for: $reason. Music will continue playing if we can reconnect within the next $resumeTimeout seconds" }
} else {
LOG.warn { "Disconnected from websocket for: $reason. Not resuming." }
}
reconnect(resume = resumeAgain)
}

private suspend fun reconnect(e: Throwable? = null, resume: Boolean = false) {
LOG.error(e) { "Exception whilst trying to connect. Reconnecting" }
if (retry.hasNext) {
LOG.error { "Exception whilst trying to connect: '${e?.message}'. Reconnecting" }
retry.retry()
connect(resume)
} else {
lavakord.removeNode(this)
error("Could not reconnect to websocket after to many attempts")
throw IllegalStateException("Could not reconnect to websocket after too many attempts", e)
}
}

Expand Down Expand Up @@ -173,6 +181,14 @@ internal class NodeImpl(
is Message.PlayerUpdateEvent -> (lavakord.getLink(event.guildId).player as WebsocketPlayer)
.provideState(event.state)

is Message.EmittedEvent.WebSocketClosedEvent -> {
// These codes represent an invalid session
// See https://discord.com/developers/docs/topics/opcodes-and-status-codes#voice-voice-close-event-codes
if (event.code == 4004 || event.code == 4006 || event.code == 4009 || event.code == 4014) {
lavakord.getLink(event.guildId).onDisconnected()
}
}

is Message.StatsEvent -> {
LOG.debug { "Received node statistics for $name: $event" }
lastStatsEvent = event
Expand All @@ -185,6 +201,7 @@ internal class NodeImpl(
is Message.ReadyEvent -> {
available = true
sessionId = event.sessionId
lavakord.onNewSession(this)
updateSession(
SessionUpdate(
resuming = true.toOmissible(),
Expand Down