diff --git a/jvb/src/main/kotlin/org/jitsi/videobridge/Endpoint.kt b/jvb/src/main/kotlin/org/jitsi/videobridge/Endpoint.kt index 1b6a3c7bab..40a7df8278 100644 --- a/jvb/src/main/kotlin/org/jitsi/videobridge/Endpoint.kt +++ b/jvb/src/main/kotlin/org/jitsi/videobridge/Endpoint.kt @@ -1115,7 +1115,7 @@ class Endpoint @JvmOverloads constructor( sctpHandler?.stop() usrSctpHandler?.stop() sctpManager?.closeConnection() - sctpTransport?.socket?.close() + sctpTransport?.stop() } catch (t: Throwable) { logger.error("Exception while expiring: ", t) } @@ -1240,7 +1240,7 @@ class Endpoint @JvmOverloads constructor( } override fun OnAborted(error: ErrorKind, message: String) { - logger.warn("SCTP aborted with error $error: $message") + logger.info("SCTP aborted with error $error: $message") } override fun OnConnected() { @@ -1249,7 +1249,7 @@ class Endpoint @JvmOverloads constructor( val dataChannelStack = DataChannelStack( { data, sid, ppid -> val message = DcSctpMessage(sid.toShort(), ppid, data.array()) - val status = sctpTransport?.socket?.send(message, DcSctpTransport.DEFAULT_SEND_OPTIONS) + val status = sctpTransport?.send(message, DcSctpTransport.DEFAULT_SEND_OPTIONS) return@DataChannelStack if (status == SendStatus.kSuccess) { 0 } else { diff --git a/jvb/src/main/kotlin/org/jitsi/videobridge/dcsctp/DcSctpTransport.kt b/jvb/src/main/kotlin/org/jitsi/videobridge/dcsctp/DcSctpTransport.kt index 07d64f9343..0435f03a05 100644 --- a/jvb/src/main/kotlin/org/jitsi/videobridge/dcsctp/DcSctpTransport.kt +++ b/jvb/src/main/kotlin/org/jitsi/videobridge/dcsctp/DcSctpTransport.kt @@ -15,11 +15,13 @@ */ package org.jitsi.videobridge.dcsctp +import org.jitsi.dcsctp4j.DcSctpMessage import org.jitsi.dcsctp4j.DcSctpOptions import org.jitsi.dcsctp4j.DcSctpSocketCallbacks import org.jitsi.dcsctp4j.DcSctpSocketFactory import org.jitsi.dcsctp4j.DcSctpSocketInterface import org.jitsi.dcsctp4j.SendOptions +import org.jitsi.dcsctp4j.SendStatus import org.jitsi.dcsctp4j.Timeout import org.jitsi.nlj.PacketInfo import org.jitsi.utils.OrderedJsonObject @@ -40,19 +42,51 @@ class DcSctpTransport( parentLogger: Logger ) { val logger = createChildLogger(parentLogger) - lateinit var socket: DcSctpSocketInterface + private val lock = Any() + private var socket: DcSctpSocketInterface? = null fun start(callbacks: DcSctpSocketCallbacks, options: DcSctpOptions = DEFAULT_SOCKET_OPTIONS) { - socket = factory.create(name, callbacks, null, options) + synchronized(lock) { + socket = factory.create(name, callbacks, null, options) + } } fun handleIncomingSctp(packetInfo: PacketInfo) { val packet = packetInfo.packet - socket.receivePacket(packet.getBuffer(), packet.getOffset(), packet.getLength()) + synchronized(lock) { + socket?.receivePacket(packet.getBuffer(), packet.getOffset(), packet.getLength()) + } + } + + fun stop() { + synchronized(lock) { + socket?.close() + socket = null + } + } + + fun connect() { + synchronized(lock) { + socket?.connect() + } + } + + fun send(message: DcSctpMessage, options: SendOptions): SendStatus { + synchronized(lock) { + return socket?.send(message, options) ?: SendStatus.kErrorShuttingDown + } + } + + fun handleTimeout(timeoutId: Long) { + synchronized(lock) { + socket?.handleTimeout(timeoutId) + } } fun getDebugState(): OrderedJsonObject { - val metrics = socket.metrics + val metrics = synchronized(lock) { + socket?.metrics + } return OrderedJsonObject().apply { if (metrics != null) { put("tx_packets_count", metrics.txPacketsCount) @@ -157,7 +191,7 @@ abstract class DcSctpBaseCallbacks( scheduledFuture = TaskPools.SCHEDULED_POOL.schedule({ /* Execute it on the IO_POOL, because a timer may trigger sending new SCTP packets. */ future = TaskPools.IO_POOL.submit { - transport?.socket?.handleTimeout(timeoutId) + transport?.handleTimeout(timeoutId) } }, duration, TimeUnit.MILLISECONDS) } catch (e: Throwable) { diff --git a/jvb/src/main/kotlin/org/jitsi/videobridge/relay/Relay.kt b/jvb/src/main/kotlin/org/jitsi/videobridge/relay/Relay.kt index 2199ac57f6..3f1052bee1 100644 --- a/jvb/src/main/kotlin/org/jitsi/videobridge/relay/Relay.kt +++ b/jvb/src/main/kotlin/org/jitsi/videobridge/relay/Relay.kt @@ -439,7 +439,7 @@ class Relay @JvmOverloads constructor( scheduleRelayMessageTransportTimeout() } else if (sctpConfig.enabled) { if (sctpRole == Sctp.Role.CLIENT) { - sctpTransport!!.socket.connect() + sctpTransport!!.connect() } } } @@ -498,7 +498,7 @@ class Relay @JvmOverloads constructor( it.start(SctpCallbacks(it)) sctpHandler!!.setSctpTransport(it) if (dtlsTransport.isConnected && sctpDesc.role == Sctp.Role.CLIENT) { - it.socket.connect() + it.connect() } } } @@ -1172,7 +1172,7 @@ class Relay @JvmOverloads constructor( sctpHandler?.stop() usrSctpHandler?.stop() sctpManager?.closeConnection() - sctpTransport?.socket?.close() + sctpTransport?.stop() } catch (t: Throwable) { logger.error("Exception while expiring: ", t) } @@ -1266,7 +1266,7 @@ class Relay @JvmOverloads constructor( } override fun OnAborted(error: ErrorKind, message: String) { - logger.warn("SCTP aborted with error $error: $message") + logger.info("SCTP aborted with error $error: $message") } override fun OnConnected() { @@ -1275,7 +1275,7 @@ class Relay @JvmOverloads constructor( val dataChannelStack = DataChannelStack( { data, sid, ppid -> val message = DcSctpMessage(sid.toShort(), ppid, data.array()) - val status = sctpTransport?.socket?.send(message, DcSctpTransport.DEFAULT_SEND_OPTIONS) + val status = sctpTransport?.send(message, DcSctpTransport.DEFAULT_SEND_OPTIONS) return@DataChannelStack if (status == SendStatus.kSuccess) { 0 } else {