Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(config): Make the packet timeline configurable. #2163

Merged
merged 8 commits into from
Jun 10, 2024
59 changes: 38 additions & 21 deletions jitsi-media-transform/src/main/kotlin/org/jitsi/nlj/PacketInfo.kt
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,25 @@
package org.jitsi.nlj

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings
import org.jitsi.config.JitsiConfig
import org.jitsi.metaconfig.config
import org.jitsi.rtp.Packet
import org.jitsi.utils.logging2.createLogger
import java.time.Clock
import java.time.Duration
import java.time.Instant
import java.util.Collections
import java.util.concurrent.ConcurrentLinkedQueue

@SuppressFBWarnings("CN_IMPLEMENTS_CLONE_BUT_NOT_CLONEABLE")
class EventTimeline(
private val timeline: MutableList<Pair<String, Duration>> = mutableListOf(),
/**
* We want this thread safe, because while PacketInfo objects are only handled by a single thread at a time,
* [StatsKeepingNode] may add an "exit" event after the packets has been added to another queue potentially handled
* by a different thread. This is not critical as it only affects the timeline and the result is just some "exit"
* events missing from the trace logs.
*/
private val timeline: ConcurrentLinkedQueue<Pair<String, Duration>> = ConcurrentLinkedQueue(),
bgrozev marked this conversation as resolved.
Show resolved Hide resolved
private val clock: Clock = Clock.systemUTC()
) : Iterable<Pair<String, Duration>> {
/**
Expand All @@ -34,6 +44,9 @@ class EventTimeline(
*/
var referenceTime: Instant? = null

val size: Int
get() = timeline.size

fun addEvent(desc: String) {
val now = clock.instant()
if (referenceTime == null) {
Expand All @@ -43,7 +56,7 @@ class EventTimeline(
}

fun clone(): EventTimeline {
val clone = EventTimeline(timeline.toMutableList())
val clone = EventTimeline(ConcurrentLinkedQueue(timeline))
clone.referenceTime = referenceTime
return clone
}
Expand Down Expand Up @@ -84,7 +97,7 @@ open class PacketInfo @JvmOverloads constructor(
var packet: Packet,
/** The original length of the packet, i.e. before decryption. Stays unchanged even if the packet is updated. */
val originalLength: Int = packet.length,
val timeline: EventTimeline = EventTimeline()
val timeline: EventTimeline? = if (enableTimeline) EventTimeline() else null
) {
/**
* An explicit tag for when this packet was originally received (assuming it
Expand All @@ -93,7 +106,7 @@ open class PacketInfo @JvmOverloads constructor(
var receivedTime: Instant? = null
set(value) {
field = value
if (ENABLE_TIMELINE && timeline.referenceTime == null) {
if (timeline != null && timeline.referenceTime == null) {
timeline.referenceTime = value
}
}
Expand Down Expand Up @@ -121,15 +134,15 @@ open class PacketInfo @JvmOverloads constructor(
* The payload verification string for the packet, or 'null' if payload verification is disabled. Calculating the
* it is expensive, thus we only do it when the flag is enabled.
*/
var payloadVerification = if (ENABLE_PAYLOAD_VERIFICATION) packet.payloadVerification else null
var payloadVerification = if (enablePayloadVerification) packet.payloadVerification else null

/**
* Re-calculates the expected payload verification string. This should be called any time that the code
* intentionally modifies the packet in a way that could change the verification string (for example, re-creates
* it with a new type (parsing), or intentionally modifies the payload (SRTP)).
*/
fun resetPayloadVerification() {
payloadVerification = if (ENABLE_PAYLOAD_VERIFICATION) packet.payloadVerification else null
payloadVerification = if (enablePayloadVerification) packet.payloadVerification else null
}

/**
Expand All @@ -145,13 +158,7 @@ open class PacketInfo @JvmOverloads constructor(
* will be copied for the cloned PacketInfo).
*/
fun clone(): PacketInfo {
val clone = if (ENABLE_TIMELINE) {
PacketInfo(packet.clone(), originalLength, timeline.clone())
} else {
// If the timeline isn't enabled, we can just share the same one.
// (This would change if we allowed enabling the timeline at runtime)
PacketInfo(packet.clone(), originalLength, timeline)
}
val clone = PacketInfo(packet.clone(), originalLength, timeline?.clone())
clone.receivedTime = receivedTime
clone.originalHadCryptex = originalHadCryptex
clone.shouldDiscard = shouldDiscard
Expand All @@ -163,11 +170,7 @@ open class PacketInfo @JvmOverloads constructor(
return clone
}

fun addEvent(desc: String) {
if (ENABLE_TIMELINE) {
timeline.addEvent(desc)
}
}
fun addEvent(desc: String) = timeline?.addEvent(desc)

/**
* The list of pending actions, or [null] if none.
Expand Down Expand Up @@ -209,14 +212,28 @@ open class PacketInfo @JvmOverloads constructor(
}

companion object {
// TODO: we could make this a public var to allow changing this at runtime
private const val ENABLE_TIMELINE = false
private val enableTimeline: Boolean by config {
"jmt.debug.packet-timeline.enabled".from(JitsiConfig.newConfig)
}

private val enablePayloadVerificationDefault: Boolean by config {
"jmt.debug.payload-verification.enabled".from(JitsiConfig.newConfig)
}

/**
* If this is enabled all [Node]s will verify that the payload didn't unexpectedly change. This is expensive.
*/
@field:Suppress("ktlint:standard:property-naming")
var ENABLE_PAYLOAD_VERIFICATION = false
var enablePayloadVerification = enablePayloadVerificationDefault

init {
if (enableTimeline) {
createLogger().info("Packet timeline is enabled.")
}
if (enablePayloadVerification) {
createLogger().info("Payload verification is enabled.")
}
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,15 +144,15 @@ sealed class Node(
if (enable) {
PLUGINS_ENABLED = true
plugins.add(PayloadVerificationPlugin)
PacketInfo.ENABLE_PAYLOAD_VERIFICATION = true
PacketInfo.enablePayloadVerification = true
} else {
plugins.remove(PayloadVerificationPlugin)
PLUGINS_ENABLED = plugins.isNotEmpty()
PacketInfo.ENABLE_PAYLOAD_VERIFICATION = false
PacketInfo.enablePayloadVerification = false
}
}

fun isPayloadVerificationEnabled(): Boolean = PacketInfo.ENABLE_PAYLOAD_VERIFICATION
fun isPayloadVerificationEnabled(): Boolean = PacketInfo.enablePayloadVerification

fun enableNodeTracing(enable: Boolean) {
TRACE_ENABLED = enable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class PayloadVerificationPlugin {
fun getStatsJson() = JSONObject().apply { this["num_payload_verification_failures"] = numFailures.get() }

override fun observe(after: Node, packetInfo: PacketInfo) {
if (PacketInfo.ENABLE_PAYLOAD_VERIFICATION &&
if (PacketInfo.enablePayloadVerification &&
packetInfo.payloadVerification != null
) {
val expected = packetInfo.payloadVerification
Expand Down
14 changes: 12 additions & 2 deletions jitsi-media-transform/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,9 @@ jmt {
pcap {
// Whether to permit the API to dynamically enable the capture of
// unencrypted PCAP files of media traffic.
enabled=false
enabled = false
// The directory in which to place captured PCAP files.
directory="/tmp"
directory = "/tmp"
}
packet-loss {
// Artificial loss to introduce in the receive pipeline.
Expand All @@ -161,5 +161,15 @@ jmt {
burst-interval = 0
}
}
packet-timeline {
// Whether to enable the packet timeline. This is an expensive option used for debugging.
enabled = false
// Log a packet timeline for every one out of [log-fraction] packets.
log-fraction = 10000
}
payload-verification {
// Whether to enable payload verification on startup. This is a very expensive option only used for debugging.
enabled = false
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright @ 2024 - present 8x8, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.jitsi.nlj

import io.kotest.core.spec.IsolationMode
import io.kotest.core.spec.style.ShouldSpec
import io.kotest.matchers.shouldBe

class PacketInfoTest : ShouldSpec() {
override fun isolationMode(): IsolationMode = IsolationMode.InstancePerLeaf

init {
context("EventTimeline test") {
val timeline = EventTimeline().apply {
addEvent("A")
addEvent("B")
}
val clone = timeline.clone()
timeline.size shouldBe 2
clone.size shouldBe 2

timeline.addEvent("original")
timeline.size shouldBe 3
clone.size shouldBe 2

clone.addEvent("clone")
clone.addEvent("clone2")
timeline.size shouldBe 3
clone.size shouldBe 4
}
}
}
12 changes: 7 additions & 5 deletions jvb/src/main/kotlin/org/jitsi/videobridge/Endpoint.kt
Original file line number Diff line number Diff line change
Expand Up @@ -512,14 +512,14 @@ class Endpoint @JvmOverloads constructor(

private fun doSendSrtp(packetInfo: PacketInfo): Boolean {
packetInfo.addEvent(SRTP_QUEUE_EXIT_EVENT)
PacketTransitStats.packetSent(packetInfo)

iceTransport.send(packetInfo.packet.buffer, packetInfo.packet.offset, packetInfo.packet.length)
PacketTransitStats.packetSent(packetInfo)
ByteBufferPool.returnBuffer(packetInfo.packet.buffer)
packetInfo.sent()
if (timelineLogger.isTraceEnabled && logTimeline()) {
timelineLogger.trace { packetInfo.timeline.toString() }
}
iceTransport.send(packetInfo.packet.buffer, packetInfo.packet.offset, packetInfo.packet.length)
ByteBufferPool.returnBuffer(packetInfo.packet.buffer)
return true
}

Expand Down Expand Up @@ -1176,9 +1176,11 @@ class Endpoint @JvmOverloads constructor(
private val epTimeout = 2.mins

private val timelineCounter = AtomicLong()
private val TIMELINE_FRACTION = 10000L
private val timelineFraction: Long by config {
"jmt.debug.packet-timeline.log-fraction".from(JitsiConfig.newConfig)
}

fun logTimeline() = timelineCounter.getAndIncrement() % TIMELINE_FRACTION == 0L
fun logTimeline() = timelineCounter.getAndIncrement() % timelineFraction == 0L

private const val SRTP_QUEUE_ENTRY_EVENT = "Entered Endpoint SRTP sender outgoing queue"
private const val SRTP_QUEUE_EXIT_EVENT = "Exited Endpoint SRTP sender outgoing queue"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ package org.jitsi.videobridge.cc.allocation

import org.jitsi.nlj.MediaSourceDesc
import org.jitsi.nlj.PacketInfo
import org.jitsi.nlj.PacketInfo.Companion.ENABLE_PAYLOAD_VERIFICATION
import org.jitsi.nlj.PacketInfo.Companion.enablePayloadVerification
import org.jitsi.nlj.RtpLayerDesc
import org.jitsi.nlj.rtp.VideoRtpPacket
import org.jitsi.rtp.rtcp.RtcpSrPacket
Expand Down Expand Up @@ -83,7 +83,7 @@ internal class PacketHandler(
adaptiveSourceProjection.rewriteRtp(packetInfo)

// The rewriteRtp operation must not modify the VP8 payload.
if (ENABLE_PAYLOAD_VERIFICATION) {
if (enablePayloadVerification) {
val expected = packetInfo.payloadVerification
val actual = videoPacket.payloadVerification
if ("" != expected && expected != actual) {
Expand Down
Loading