diff --git a/jitsi-media-transform/src/main/kotlin/org/jitsi/nlj/PacketInfo.kt b/jitsi-media-transform/src/main/kotlin/org/jitsi/nlj/PacketInfo.kt index f23e064045..c943f9325c 100644 --- a/jitsi-media-transform/src/main/kotlin/org/jitsi/nlj/PacketInfo.kt +++ b/jitsi-media-transform/src/main/kotlin/org/jitsi/nlj/PacketInfo.kt @@ -16,7 +16,10 @@ package org.jitsi.nlj import edu.umd.cs.findbugs.annotations.SuppressFBWarnings +import org.jitsi.config.JitsiConfig +import org.jitsi.metaconfig.config import org.jitsi.rtp.Packet +import org.jitsi.utils.logging2.createLogger import java.time.Clock import java.time.Duration import java.time.Instant @@ -24,9 +27,18 @@ import java.util.Collections @SuppressFBWarnings("CN_IMPLEMENTS_CLONE_BUT_NOT_CLONEABLE") class EventTimeline( - private val timeline: MutableList> = mutableListOf(), + /** + * We want this thread safe, because while PacketInfo objects are only handled by a single thread at a time, + * [StatsKeepingNode] may add an "exit" event after the packets has been added to another queue potentially handled + * by a different thread. This is not critical as it only affects the timeline and the result is just some "exit" + * events missing from the trace logs. + */ + timelineArg: MutableList> = mutableListOf(), private val clock: Clock = Clock.systemUTC() ) : Iterable> { + + private val timeline = Collections.synchronizedList(timelineArg) + /** * The [referenceTime] refers to the first timestamp we have * in the timeline. In the timeline this is used as time "0" and @@ -34,6 +46,9 @@ class EventTimeline( */ var referenceTime: Instant? = null + val size: Int + get() = timeline.size + fun addEvent(desc: String) { val now = clock.instant() if (referenceTime == null) { @@ -43,7 +58,7 @@ class EventTimeline( } fun clone(): EventTimeline { - val clone = EventTimeline(timeline.toMutableList()) + val clone = EventTimeline(timeline.toMutableList(), clock) clone.referenceTime = referenceTime return clone } @@ -64,7 +79,9 @@ class EventTimeline( return with(StringBuffer()) { referenceTime?.let { append("Reference time: $referenceTime; ") - append(timeline.joinToString(separator = "; ")) + synchronized(timeline) { + append(timeline.joinToString(separator = "; ")) + } } ?: run { append("[No timeline]") } @@ -84,7 +101,7 @@ open class PacketInfo @JvmOverloads constructor( var packet: Packet, /** The original length of the packet, i.e. before decryption. Stays unchanged even if the packet is updated. */ val originalLength: Int = packet.length, - val timeline: EventTimeline = EventTimeline() + val timeline: EventTimeline? = if (enableTimeline) EventTimeline() else null ) { /** * An explicit tag for when this packet was originally received (assuming it @@ -93,7 +110,7 @@ open class PacketInfo @JvmOverloads constructor( var receivedTime: Instant? = null set(value) { field = value - if (ENABLE_TIMELINE && timeline.referenceTime == null) { + if (timeline != null && timeline.referenceTime == null) { timeline.referenceTime = value } } @@ -121,7 +138,7 @@ open class PacketInfo @JvmOverloads constructor( * The payload verification string for the packet, or 'null' if payload verification is disabled. Calculating the * it is expensive, thus we only do it when the flag is enabled. */ - var payloadVerification = if (ENABLE_PAYLOAD_VERIFICATION) packet.payloadVerification else null + var payloadVerification = if (enablePayloadVerification) packet.payloadVerification else null /** * Re-calculates the expected payload verification string. This should be called any time that the code @@ -129,7 +146,7 @@ open class PacketInfo @JvmOverloads constructor( * it with a new type (parsing), or intentionally modifies the payload (SRTP)). */ fun resetPayloadVerification() { - payloadVerification = if (ENABLE_PAYLOAD_VERIFICATION) packet.payloadVerification else null + payloadVerification = if (enablePayloadVerification) packet.payloadVerification else null } /** @@ -145,13 +162,7 @@ open class PacketInfo @JvmOverloads constructor( * will be copied for the cloned PacketInfo). */ fun clone(): PacketInfo { - val clone = if (ENABLE_TIMELINE) { - PacketInfo(packet.clone(), originalLength, timeline.clone()) - } else { - // If the timeline isn't enabled, we can just share the same one. - // (This would change if we allowed enabling the timeline at runtime) - PacketInfo(packet.clone(), originalLength, timeline) - } + val clone = PacketInfo(packet.clone(), originalLength, timeline?.clone()) clone.receivedTime = receivedTime clone.originalHadCryptex = originalHadCryptex clone.shouldDiscard = shouldDiscard @@ -163,11 +174,7 @@ open class PacketInfo @JvmOverloads constructor( return clone } - fun addEvent(desc: String) { - if (ENABLE_TIMELINE) { - timeline.addEvent(desc) - } - } + fun addEvent(desc: String) = timeline?.addEvent(desc) /** * The list of pending actions, or [null] if none. @@ -209,14 +216,28 @@ open class PacketInfo @JvmOverloads constructor( } companion object { - // TODO: we could make this a public var to allow changing this at runtime - private const val ENABLE_TIMELINE = false + private val enableTimeline: Boolean by config { + "jmt.debug.packet-timeline.enabled".from(JitsiConfig.newConfig) + } + + private val enablePayloadVerificationDefault: Boolean by config { + "jmt.debug.payload-verification.enabled".from(JitsiConfig.newConfig) + } /** * If this is enabled all [Node]s will verify that the payload didn't unexpectedly change. This is expensive. */ @field:Suppress("ktlint:standard:property-naming") - var ENABLE_PAYLOAD_VERIFICATION = false + var enablePayloadVerification = enablePayloadVerificationDefault + + init { + if (enableTimeline) { + createLogger().info("Packet timeline is enabled.") + } + if (enablePayloadVerification) { + createLogger().info("Payload verification is enabled.") + } + } } } diff --git a/jitsi-media-transform/src/main/kotlin/org/jitsi/nlj/transform/node/Node.kt b/jitsi-media-transform/src/main/kotlin/org/jitsi/nlj/transform/node/Node.kt index a2ece3eacd..5a9ea245d5 100644 --- a/jitsi-media-transform/src/main/kotlin/org/jitsi/nlj/transform/node/Node.kt +++ b/jitsi-media-transform/src/main/kotlin/org/jitsi/nlj/transform/node/Node.kt @@ -144,15 +144,15 @@ sealed class Node( if (enable) { PLUGINS_ENABLED = true plugins.add(PayloadVerificationPlugin) - PacketInfo.ENABLE_PAYLOAD_VERIFICATION = true + PacketInfo.enablePayloadVerification = true } else { plugins.remove(PayloadVerificationPlugin) PLUGINS_ENABLED = plugins.isNotEmpty() - PacketInfo.ENABLE_PAYLOAD_VERIFICATION = false + PacketInfo.enablePayloadVerification = false } } - fun isPayloadVerificationEnabled(): Boolean = PacketInfo.ENABLE_PAYLOAD_VERIFICATION + fun isPayloadVerificationEnabled(): Boolean = PacketInfo.enablePayloadVerification fun enableNodeTracing(enable: Boolean) { TRACE_ENABLED = enable diff --git a/jitsi-media-transform/src/main/kotlin/org/jitsi/nlj/transform/node/debug/PayloadVerificationPlugin.kt b/jitsi-media-transform/src/main/kotlin/org/jitsi/nlj/transform/node/debug/PayloadVerificationPlugin.kt index 952ffb5093..2a37a862fb 100644 --- a/jitsi-media-transform/src/main/kotlin/org/jitsi/nlj/transform/node/debug/PayloadVerificationPlugin.kt +++ b/jitsi-media-transform/src/main/kotlin/org/jitsi/nlj/transform/node/debug/PayloadVerificationPlugin.kt @@ -38,7 +38,7 @@ class PayloadVerificationPlugin { fun getStatsJson() = JSONObject().apply { this["num_payload_verification_failures"] = numFailures.get() } override fun observe(after: Node, packetInfo: PacketInfo) { - if (PacketInfo.ENABLE_PAYLOAD_VERIFICATION && + if (PacketInfo.enablePayloadVerification && packetInfo.payloadVerification != null ) { val expected = packetInfo.payloadVerification diff --git a/jitsi-media-transform/src/main/resources/reference.conf b/jitsi-media-transform/src/main/resources/reference.conf index 3c3e1438cc..52ee7c5e21 100644 --- a/jitsi-media-transform/src/main/resources/reference.conf +++ b/jitsi-media-transform/src/main/resources/reference.conf @@ -137,9 +137,9 @@ jmt { pcap { // Whether to permit the API to dynamically enable the capture of // unencrypted PCAP files of media traffic. - enabled=false + enabled = false // The directory in which to place captured PCAP files. - directory="/tmp" + directory = "/tmp" } packet-loss { // Artificial loss to introduce in the receive pipeline. @@ -161,5 +161,15 @@ jmt { burst-interval = 0 } } + packet-timeline { + // Whether to enable the packet timeline. This is an expensive option used for debugging. + enabled = false + // Log a packet timeline for every one out of [log-fraction] packets. + log-fraction = 10000 + } + payload-verification { + // Whether to enable payload verification on startup. This is a very expensive option only used for debugging. + enabled = false + } } } diff --git a/jitsi-media-transform/src/test/kotlin/org/jitsi/nlj/PacketInfoTest.kt b/jitsi-media-transform/src/test/kotlin/org/jitsi/nlj/PacketInfoTest.kt new file mode 100644 index 0000000000..99f4959130 --- /dev/null +++ b/jitsi-media-transform/src/test/kotlin/org/jitsi/nlj/PacketInfoTest.kt @@ -0,0 +1,45 @@ +/* + * Copyright @ 2024 - present 8x8, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.jitsi.nlj + +import io.kotest.core.spec.IsolationMode +import io.kotest.core.spec.style.ShouldSpec +import io.kotest.matchers.shouldBe + +class PacketInfoTest : ShouldSpec() { + override fun isolationMode(): IsolationMode = IsolationMode.InstancePerLeaf + + init { + context("EventTimeline test") { + val timeline = EventTimeline().apply { + addEvent("A") + addEvent("B") + } + val clone = timeline.clone() + timeline.size shouldBe 2 + clone.size shouldBe 2 + + timeline.addEvent("original") + timeline.size shouldBe 3 + clone.size shouldBe 2 + + clone.addEvent("clone") + clone.addEvent("clone2") + timeline.size shouldBe 3 + clone.size shouldBe 4 + } + } +} diff --git a/jvb/src/main/kotlin/org/jitsi/videobridge/Endpoint.kt b/jvb/src/main/kotlin/org/jitsi/videobridge/Endpoint.kt index ae893252ed..21103e37da 100644 --- a/jvb/src/main/kotlin/org/jitsi/videobridge/Endpoint.kt +++ b/jvb/src/main/kotlin/org/jitsi/videobridge/Endpoint.kt @@ -512,14 +512,14 @@ class Endpoint @JvmOverloads constructor( private fun doSendSrtp(packetInfo: PacketInfo): Boolean { packetInfo.addEvent(SRTP_QUEUE_EXIT_EVENT) - PacketTransitStats.packetSent(packetInfo) + iceTransport.send(packetInfo.packet.buffer, packetInfo.packet.offset, packetInfo.packet.length) + PacketTransitStats.packetSent(packetInfo) + ByteBufferPool.returnBuffer(packetInfo.packet.buffer) packetInfo.sent() if (timelineLogger.isTraceEnabled && logTimeline()) { timelineLogger.trace { packetInfo.timeline.toString() } } - iceTransport.send(packetInfo.packet.buffer, packetInfo.packet.offset, packetInfo.packet.length) - ByteBufferPool.returnBuffer(packetInfo.packet.buffer) return true } @@ -1176,9 +1176,11 @@ class Endpoint @JvmOverloads constructor( private val epTimeout = 2.mins private val timelineCounter = AtomicLong() - private val TIMELINE_FRACTION = 10000L + private val timelineFraction: Long by config { + "jmt.debug.packet-timeline.log-fraction".from(JitsiConfig.newConfig) + } - fun logTimeline() = timelineCounter.getAndIncrement() % TIMELINE_FRACTION == 0L + fun logTimeline() = timelineCounter.getAndIncrement() % timelineFraction == 0L private const val SRTP_QUEUE_ENTRY_EVENT = "Entered Endpoint SRTP sender outgoing queue" private const val SRTP_QUEUE_EXIT_EVENT = "Exited Endpoint SRTP sender outgoing queue" diff --git a/jvb/src/main/kotlin/org/jitsi/videobridge/cc/allocation/PacketHandler.kt b/jvb/src/main/kotlin/org/jitsi/videobridge/cc/allocation/PacketHandler.kt index 3cff98225c..6fd407dd43 100644 --- a/jvb/src/main/kotlin/org/jitsi/videobridge/cc/allocation/PacketHandler.kt +++ b/jvb/src/main/kotlin/org/jitsi/videobridge/cc/allocation/PacketHandler.kt @@ -17,7 +17,7 @@ package org.jitsi.videobridge.cc.allocation import org.jitsi.nlj.MediaSourceDesc import org.jitsi.nlj.PacketInfo -import org.jitsi.nlj.PacketInfo.Companion.ENABLE_PAYLOAD_VERIFICATION +import org.jitsi.nlj.PacketInfo.Companion.enablePayloadVerification import org.jitsi.nlj.RtpLayerDesc import org.jitsi.nlj.rtp.VideoRtpPacket import org.jitsi.rtp.rtcp.RtcpSrPacket @@ -83,7 +83,7 @@ internal class PacketHandler( adaptiveSourceProjection.rewriteRtp(packetInfo) // The rewriteRtp operation must not modify the VP8 payload. - if (ENABLE_PAYLOAD_VERIFICATION) { + if (enablePayloadVerification) { val expected = packetInfo.payloadVerification val actual = videoPacket.payloadVerification if ("" != expected && expected != actual) { diff --git a/jvb/src/main/kotlin/org/jitsi/videobridge/relay/Relay.kt b/jvb/src/main/kotlin/org/jitsi/videobridge/relay/Relay.kt index 4badda98d7..2199ac57f6 100644 --- a/jvb/src/main/kotlin/org/jitsi/videobridge/relay/Relay.kt +++ b/jvb/src/main/kotlin/org/jitsi/videobridge/relay/Relay.kt @@ -766,16 +766,14 @@ class Relay @JvmOverloads constructor( fun doSendSrtp(packetInfo: PacketInfo): Boolean { packetInfo.addEvent(SRTP_QUEUE_EXIT_EVENT) - PacketTransitStats.packetSent(packetInfo) + iceTransport.send(packetInfo.packet.buffer, packetInfo.packet.offset, packetInfo.packet.length) + PacketTransitStats.packetSent(packetInfo) + ByteBufferPool.returnBuffer(packetInfo.packet.buffer) packetInfo.sent() - if (timelineLogger.isTraceEnabled && Endpoint.logTimeline()) { timelineLogger.trace { packetInfo.timeline.toString() } } - - iceTransport.send(packetInfo.packet.buffer, packetInfo.packet.offset, packetInfo.packet.length) - ByteBufferPool.returnBuffer(packetInfo.packet.buffer) return true }