Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Fix a leak of WebSocketClients. #2051

Merged
merged 1 commit into from
Sep 25, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ import org.jitsi.videobridge.message.EndpointStats
import org.jitsi.videobridge.message.ServerHelloMessage
import org.jitsi.videobridge.message.SourceVideoTypeMessage
import org.jitsi.videobridge.message.VideoTypeMessage
import org.jitsi.videobridge.util.TaskPools
import org.jitsi.videobridge.websocket.ColibriWebSocket
import org.json.simple.JSONObject
import java.lang.ref.WeakReference
Expand Down Expand Up @@ -67,11 +66,6 @@ class RelayMessageTransport(
*/
private var url: String? = null

/**
* An active websocket client.
*/
private var outgoingWebsocket: WebSocketClient? = null

/**
* Use to synchronize access to [webSocket]
*/
Expand Down Expand Up @@ -113,15 +107,11 @@ class RelayMessageTransport(
webSocket = null
}

// this.webSocket should only be initialized when it has connected (via [webSocketConnected]).
val newWebSocket = ColibriWebSocket(relay.id, this)
outgoingWebsocket?.let {
logger.warn("Re-connecting while outgoingWebsocket != null, possible leak.")
it.stop()
}
outgoingWebsocket = WebSocketClient().also {
it.start()
it.connect(newWebSocket, URI(url), ClientUpgradeRequest())
ColibriWebSocket(relay.id, this).also {
webSocketClient.connect(it, URI(url), ClientUpgradeRequest())
synchronized(webSocketSyncRoot) {
webSocket = it
}
}
}

Expand Down Expand Up @@ -311,15 +301,15 @@ class RelayMessageTransport(
get() = getActiveTransportChannel() != null

val isActive: Boolean
get() = outgoingWebsocket != null
get() = url != null

/**
* {@inheritDoc}
*/
override fun webSocketConnected(ws: ColibriWebSocket) {
synchronized(webSocketSyncRoot) {
// If we already have a web-socket, discard it and use the new one.
if (ws != webSocket) {
if (ws != webSocket && webSocket != null) {
logger.info("Replacing an existing websocket.")
webSocket?.session?.close(CloseStatus.NORMAL, "replaced")
webSocketLastActive = true
Expand Down Expand Up @@ -355,10 +345,10 @@ class RelayMessageTransport(
logger.debug { "Web socket closed, statusCode $statusCode ( $reason)." }
}
}
outgoingWebsocket?.let {
// Try to reconnect. TODO: how to handle failures?
it.stop()
outgoingWebsocket = null

// This check avoids trying to establish a new WS when the closing of the existing WS races the signaling to
// expire the relay. 1001 with RELAY_CLOSED means that the remote side willingly closed the socket.
if (statusCode != 1001 || reason != RELAY_CLOSED) {
doConnect()
}
}
Expand All @@ -374,22 +364,11 @@ class RelayMessageTransport(
if (webSocket != null) {
// 410 Gone indicates that the resource requested is no longer
// available and will not be available again.
webSocket?.session?.close(CloseStatus.SHUTDOWN, "relay closed")
webSocket?.session?.close(CloseStatus.SHUTDOWN, RELAY_CLOSED)
webSocket = null
logger.debug { "Relay expired, closed colibri web-socket." }
}
}
outgoingWebsocket?.let {
// Stopping might block and we don't want to hold the thread processing signaling.
TaskPools.IO_POOL.submit {
try {
it.stop()
} catch (e: Exception) {
logger.warn("Error while stopping outgoing web socket", e)
}
}
}
outgoingWebsocket = null
}

/**
Expand Down Expand Up @@ -509,4 +488,16 @@ class RelayMessageTransport(
conference.sendMessageFromRelay(message, true, relay.meshId)
return null
}

companion object {
/**
* The single [WebSocketClient] instance that all [Relay]s use to initiate a web socket connection.
*/
val webSocketClient = WebSocketClient().apply { start() }

/**
* Reason to use when closing a WS due to the relay being expired.
*/
const val RELAY_CLOSED = "relay_closed"
}
}
Loading