From 6f1bfc212dfdaabb58c7998d5164378609fa1fb1 Mon Sep 17 00:00:00 2001 From: Bryce Buchanan Date: Fri, 22 Nov 2024 14:42:10 -0800 Subject: [PATCH 1/2] made meter provider optional in BatchSpanProcessor --- .../SpanProcessors/BatchSpanProcessor.swift | 72 +++++++++---------- 1 file changed, 32 insertions(+), 40 deletions(-) diff --git a/Sources/OpenTelemetrySdk/Trace/SpanProcessors/BatchSpanProcessor.swift b/Sources/OpenTelemetrySdk/Trace/SpanProcessors/BatchSpanProcessor.swift index c9906fb1..bf6ebb7b 100644 --- a/Sources/OpenTelemetrySdk/Trace/SpanProcessors/BatchSpanProcessor.swift +++ b/Sources/OpenTelemetrySdk/Trace/SpanProcessors/BatchSpanProcessor.swift @@ -27,7 +27,7 @@ public struct BatchSpanProcessor: SpanProcessor { public init( spanExporter: SpanExporter, - meterProvider: StableMeterProvider, + meterProvider: StableMeterProvider? = nil, scheduleDelay: TimeInterval = 5, exportTimeout: TimeInterval = 30, maxQueueSize: Int = 2048, @@ -73,7 +73,7 @@ public struct BatchSpanProcessor: SpanProcessor { /// The list of batched data is protected by a NSCondition which ensures full concurrency. private class BatchWorker: Thread { let spanExporter: SpanExporter - let meterProvider: StableMeterProvider + let meterProvider: StableMeterProvider? let scheduleDelay: TimeInterval let maxQueueSize: Int let exportTimeout: TimeInterval @@ -86,14 +86,11 @@ private class BatchWorker: Thread { private var queueSizeGauge: ObservableLongGauge? private var spanGaugeObserver: ObservableLongGauge? - private var processedSpansCounter: LongCounter? - private let droppedAttrs: [String: AttributeValue] - private let exportedAttrs: [String: AttributeValue] - private let spanGaugeBuilder: LongGaugeBuilder + init( spanExporter: SpanExporter, - meterProvider: StableMeterProvider, + meterProvider: StableMeterProvider? = nil, scheduleDelay: TimeInterval, exportTimeout: TimeInterval, maxQueueSize: Int, @@ -112,47 +109,37 @@ private class BatchWorker: Thread { queue.name = "BatchWorker Queue" queue.maxConcurrentOperationCount = 1 - let meter = meterProvider.meterBuilder(name: "io.opentelemetry.sdk.trace").build() - + if let meter = meterProvider?.meterBuilder(name: "io.opentelemetry.sdk.trace").build() { + var longGaugeSdk = meter.gaugeBuilder(name: "queueSize").ofLongs() as? LongGaugeBuilderSdk longGaugeSdk = longGaugeSdk?.setDescription("The number of items queued") longGaugeSdk = longGaugeSdk?.setUnit("1") self.queueSizeGauge = longGaugeSdk?.buildWithCallback { result in - result.record( - value: maxQueueSize, - attributes: [ - BatchSpanProcessor.SPAN_PROCESSOR_TYPE_LABEL: .string(BatchSpanProcessor.SPAN_PROCESSOR_TYPE_VALUE) - ] - ) + result.record( + value: maxQueueSize, + attributes: [ + BatchSpanProcessor.SPAN_PROCESSOR_TYPE_LABEL: .string(BatchSpanProcessor.SPAN_PROCESSOR_TYPE_VALUE) + ] + ) } - - self.spanGaugeBuilder = meter.gaugeBuilder(name: "spanSize") - .ofLongs() - + var longCounterSdk = meter.counterBuilder(name: "processedSpans") as? LongCounterMeterBuilderSdk longCounterSdk = longCounterSdk?.setUnit("1") longCounterSdk = longCounterSdk?.setDescription("The number of spans processed by the BatchSpanProcessor. [dropped=true if they were dropped due to high throughput]") processedSpansCounter = longCounterSdk?.build() - droppedAttrs = [ - BatchSpanProcessor.SPAN_PROCESSOR_TYPE_LABEL: .string(BatchSpanProcessor.SPAN_PROCESSOR_TYPE_VALUE), - BatchSpanProcessor.SPAN_PROCESSOR_DROPPED_LABEL: .bool(true) - ] - exportedAttrs = [ - BatchSpanProcessor.SPAN_PROCESSOR_TYPE_LABEL: .string(BatchSpanProcessor.SPAN_PROCESSOR_TYPE_VALUE), - BatchSpanProcessor.SPAN_PROCESSOR_DROPPED_LABEL: .bool(false) - ] - // Subscribe to new gauge observer - self.spanGaugeObserver = self.spanGaugeBuilder - .buildWithCallback { [count = spanList.count] result in - result.record( - value: count, - attributes: [ - BatchSpanProcessor.SPAN_PROCESSOR_TYPE_LABEL: .string(BatchSpanProcessor.SPAN_PROCESSOR_TYPE_VALUE) - ] - ) - } + self.spanGaugeObserver = meter.gaugeBuilder(name: "spanSize") + .ofLongs() + .buildWithCallback { [count = spanList.count] result in + result.record( + value: count, + attributes: [ + BatchSpanProcessor.SPAN_PROCESSOR_TYPE_LABEL: .string(BatchSpanProcessor.SPAN_PROCESSOR_TYPE_VALUE) + ] + ) + } + } } deinit { @@ -166,7 +153,10 @@ private class BatchWorker: Thread { defer { cond.unlock() } if spanList.count == maxQueueSize { - processedSpansCounter?.add(value: 1, attribute: droppedAttrs) + processedSpansCounter?.add(value: 1, attribute: [ + BatchSpanProcessor.SPAN_PROCESSOR_TYPE_LABEL: .string(BatchSpanProcessor.SPAN_PROCESSOR_TYPE_VALUE), + BatchSpanProcessor.SPAN_PROCESSOR_DROPPED_LABEL: .bool(true) + ]) return } spanList.append(span) @@ -235,8 +225,10 @@ private class BatchWorker: Thread { let result = spanExporter.export(spans: spansToExport, explicitTimeout: explicitTimeout) if result == .success { cond.lock() - processedSpansCounter?.add(value: spanList.count, attribute: exportedAttrs) - cond.unlock() + processedSpansCounter?.add(value: spanList.count, attribute: [ + BatchSpanProcessor.SPAN_PROCESSOR_TYPE_LABEL: .string(BatchSpanProcessor.SPAN_PROCESSOR_TYPE_VALUE), + BatchSpanProcessor.SPAN_PROCESSOR_DROPPED_LABEL: .bool(false)]) + cond.unlock(); } } } From 82d1bdd3ae2e2837454a9fb6a632df2b82dbc9b7 Mon Sep 17 00:00:00 2001 From: Bryce Buchanan Date: Fri, 22 Nov 2024 14:56:07 -0800 Subject: [PATCH 2/2] re-indented BatchSpanProcessor file. --- .../SpanProcessors/BatchSpanProcessor.swift | 252 +++++++++--------- 1 file changed, 126 insertions(+), 126 deletions(-) diff --git a/Sources/OpenTelemetrySdk/Trace/SpanProcessors/BatchSpanProcessor.swift b/Sources/OpenTelemetrySdk/Trace/SpanProcessors/BatchSpanProcessor.swift index bf6ebb7b..7c79d7b3 100644 --- a/Sources/OpenTelemetrySdk/Trace/SpanProcessors/BatchSpanProcessor.swift +++ b/Sources/OpenTelemetrySdk/Trace/SpanProcessors/BatchSpanProcessor.swift @@ -15,36 +15,36 @@ import OpenTelemetryApi /// exports the spans to wake up and start a new export cycle. /// This batchSpanProcessor can cause high contention in a very high traffic service. public struct BatchSpanProcessor: SpanProcessor { - fileprivate static let SPAN_PROCESSOR_TYPE_LABEL: String = "processorType" - fileprivate static let SPAN_PROCESSOR_DROPPED_LABEL: String = "dropped" - fileprivate static let SPAN_PROCESSOR_TYPE_VALUE: String = BatchSpanProcessor.name - - fileprivate var worker: BatchWorker + fileprivate static let SPAN_PROCESSOR_TYPE_LABEL: String = "processorType" + fileprivate static let SPAN_PROCESSOR_DROPPED_LABEL: String = "dropped" + fileprivate static let SPAN_PROCESSOR_TYPE_VALUE: String = BatchSpanProcessor.name - public static var name: String { - String(describing: Self.self) - } - - public init( - spanExporter: SpanExporter, - meterProvider: StableMeterProvider? = nil, - scheduleDelay: TimeInterval = 5, - exportTimeout: TimeInterval = 30, - maxQueueSize: Int = 2048, - maxExportBatchSize: Int = 512, - willExportCallback: ((inout [SpanData]) -> Void)? = nil - ) { - worker = BatchWorker( - spanExporter: spanExporter, - meterProvider: meterProvider, - scheduleDelay: scheduleDelay, - exportTimeout: exportTimeout, - maxQueueSize: maxQueueSize, - maxExportBatchSize: maxExportBatchSize, - willExportCallback: willExportCallback - ) - worker.start() - } + fileprivate var worker: BatchWorker + + public static var name: String { + String(describing: Self.self) + } + + public init( + spanExporter: SpanExporter, + meterProvider: StableMeterProvider? = nil, + scheduleDelay: TimeInterval = 5, + exportTimeout: TimeInterval = 30, + maxQueueSize: Int = 2048, + maxExportBatchSize: Int = 512, + willExportCallback: ((inout [SpanData]) -> Void)? = nil + ) { + worker = BatchWorker( + spanExporter: spanExporter, + meterProvider: meterProvider, + scheduleDelay: scheduleDelay, + exportTimeout: exportTimeout, + maxQueueSize: maxQueueSize, + maxExportBatchSize: maxExportBatchSize, + willExportCallback: willExportCallback + ) + worker.start() + } public let isStartRequired = false public let isEndRequired = true @@ -72,95 +72,95 @@ public struct BatchSpanProcessor: SpanProcessor { /// the data. /// The list of batched data is protected by a NSCondition which ensures full concurrency. private class BatchWorker: Thread { - let spanExporter: SpanExporter - let meterProvider: StableMeterProvider? - let scheduleDelay: TimeInterval - let maxQueueSize: Int - let exportTimeout: TimeInterval - let maxExportBatchSize: Int - let willExportCallback: ((inout [SpanData]) -> Void)? - let halfMaxQueueSize: Int - private let cond = NSCondition() - var spanList = [ReadableSpan]() - var queue: OperationQueue + let spanExporter: SpanExporter + let meterProvider: StableMeterProvider? + let scheduleDelay: TimeInterval + let maxQueueSize: Int + let exportTimeout: TimeInterval + let maxExportBatchSize: Int + let willExportCallback: ((inout [SpanData]) -> Void)? + let halfMaxQueueSize: Int + private let cond = NSCondition() + var spanList = [ReadableSpan]() + var queue: OperationQueue + + private var queueSizeGauge: ObservableLongGauge? + private var spanGaugeObserver: ObservableLongGauge? + private var processedSpansCounter: LongCounter? + + init( + spanExporter: SpanExporter, + meterProvider: StableMeterProvider? = nil, + scheduleDelay: TimeInterval, + exportTimeout: TimeInterval, + maxQueueSize: Int, + maxExportBatchSize: Int, + willExportCallback: ((inout [SpanData]) -> Void)? + ) { + self.spanExporter = spanExporter + self.meterProvider = meterProvider + self.scheduleDelay = scheduleDelay + self.exportTimeout = exportTimeout + self.maxQueueSize = maxQueueSize + halfMaxQueueSize = maxQueueSize >> 1 + self.maxExportBatchSize = maxExportBatchSize + self.willExportCallback = willExportCallback + queue = OperationQueue() + queue.name = "BatchWorker Queue" + queue.maxConcurrentOperationCount = 1 - private var queueSizeGauge: ObservableLongGauge? - private var spanGaugeObserver: ObservableLongGauge? - private var processedSpansCounter: LongCounter? - - init( - spanExporter: SpanExporter, - meterProvider: StableMeterProvider? = nil, - scheduleDelay: TimeInterval, - exportTimeout: TimeInterval, - maxQueueSize: Int, - maxExportBatchSize: Int, - willExportCallback: ((inout [SpanData]) -> Void)? - ) { - self.spanExporter = spanExporter - self.meterProvider = meterProvider - self.scheduleDelay = scheduleDelay - self.exportTimeout = exportTimeout - self.maxQueueSize = maxQueueSize - halfMaxQueueSize = maxQueueSize >> 1 - self.maxExportBatchSize = maxExportBatchSize - self.willExportCallback = willExportCallback - queue = OperationQueue() - queue.name = "BatchWorker Queue" - queue.maxConcurrentOperationCount = 1 - - if let meter = meterProvider?.meterBuilder(name: "io.opentelemetry.sdk.trace").build() { - - var longGaugeSdk = meter.gaugeBuilder(name: "queueSize").ofLongs() as? LongGaugeBuilderSdk - longGaugeSdk = longGaugeSdk?.setDescription("The number of items queued") - longGaugeSdk = longGaugeSdk?.setUnit("1") - self.queueSizeGauge = longGaugeSdk?.buildWithCallback { result in + if let meter = meterProvider?.meterBuilder(name: "io.opentelemetry.sdk.trace").build() { + + var longGaugeSdk = meter.gaugeBuilder(name: "queueSize").ofLongs() as? LongGaugeBuilderSdk + longGaugeSdk = longGaugeSdk?.setDescription("The number of items queued") + longGaugeSdk = longGaugeSdk?.setUnit("1") + self.queueSizeGauge = longGaugeSdk?.buildWithCallback { result in + result.record( + value: maxQueueSize, + attributes: [ + BatchSpanProcessor.SPAN_PROCESSOR_TYPE_LABEL: .string(BatchSpanProcessor.SPAN_PROCESSOR_TYPE_VALUE) + ] + ) + } + + var longCounterSdk = meter.counterBuilder(name: "processedSpans") as? LongCounterMeterBuilderSdk + longCounterSdk = longCounterSdk?.setUnit("1") + longCounterSdk = longCounterSdk?.setDescription("The number of spans processed by the BatchSpanProcessor. [dropped=true if they were dropped due to high throughput]") + processedSpansCounter = longCounterSdk?.build() + + // Subscribe to new gauge observer + self.spanGaugeObserver = meter.gaugeBuilder(name: "spanSize") + .ofLongs() + .buildWithCallback { [count = spanList.count] result in result.record( - value: maxQueueSize, + value: count, attributes: [ BatchSpanProcessor.SPAN_PROCESSOR_TYPE_LABEL: .string(BatchSpanProcessor.SPAN_PROCESSOR_TYPE_VALUE) ] ) } - - var longCounterSdk = meter.counterBuilder(name: "processedSpans") as? LongCounterMeterBuilderSdk - longCounterSdk = longCounterSdk?.setUnit("1") - longCounterSdk = longCounterSdk?.setDescription("The number of spans processed by the BatchSpanProcessor. [dropped=true if they were dropped due to high throughput]") - processedSpansCounter = longCounterSdk?.build() - - // Subscribe to new gauge observer - self.spanGaugeObserver = meter.gaugeBuilder(name: "spanSize") - .ofLongs() - .buildWithCallback { [count = spanList.count] result in - result.record( - value: count, - attributes: [ - BatchSpanProcessor.SPAN_PROCESSOR_TYPE_LABEL: .string(BatchSpanProcessor.SPAN_PROCESSOR_TYPE_VALUE) - ] - ) - } - } } + } + + deinit { + // Cleanup all gauge observer + self.queueSizeGauge?.close() + self.spanGaugeObserver?.close() + } - deinit { - // Cleanup all gauge observer - self.queueSizeGauge?.close() - self.spanGaugeObserver?.close() - } - func addSpan(span: ReadableSpan) { cond.lock() defer { cond.unlock() } if spanList.count == maxQueueSize { - processedSpansCounter?.add(value: 1, attribute: [ - BatchSpanProcessor.SPAN_PROCESSOR_TYPE_LABEL: .string(BatchSpanProcessor.SPAN_PROCESSOR_TYPE_VALUE), - BatchSpanProcessor.SPAN_PROCESSOR_DROPPED_LABEL: .bool(true) - ]) + processedSpansCounter?.add(value: 1, attribute: [ + BatchSpanProcessor.SPAN_PROCESSOR_TYPE_LABEL: .string(BatchSpanProcessor.SPAN_PROCESSOR_TYPE_VALUE), + BatchSpanProcessor.SPAN_PROCESSOR_DROPPED_LABEL: .bool(true) + ]) return } spanList.append(span) - + // Notify the worker thread that at half of the queue is available. It will take // time anyway for the thread to wake up. if spanList.count >= halfMaxQueueSize { @@ -170,18 +170,18 @@ private class BatchWorker: Thread { override func main() { repeat { - autoreleasepool { - var spansCopy: [ReadableSpan] - cond.lock() - if spanList.count < maxExportBatchSize { - repeat { - cond.wait(until: Date().addingTimeInterval(scheduleDelay)) - } while spanList.isEmpty && !self.isCancelled - } - spansCopy = spanList - spanList.removeAll() - cond.unlock() - self.exportBatch(spanList: spansCopy, explicitTimeout: self.exportTimeout) + autoreleasepool { + var spansCopy: [ReadableSpan] + cond.lock() + if spanList.count < maxExportBatchSize { + repeat { + cond.wait(until: Date().addingTimeInterval(scheduleDelay)) + } while spanList.isEmpty && !self.isCancelled + } + spansCopy = spanList + spanList.removeAll() + cond.unlock() + self.exportBatch(spanList: spansCopy, explicitTimeout: self.exportTimeout) } } while !self.isCancelled } @@ -218,18 +218,18 @@ private class BatchWorker: Thread { timeoutTimer.cancel() } - private func exportAction(spanList: [ReadableSpan], explicitTimeout: TimeInterval? = nil) { - stride(from: 0, to: spanList.endIndex, by: maxExportBatchSize).forEach { - var spansToExport = spanList[$0 ..< min($0 + maxExportBatchSize, spanList.count)].map { $0.toSpanData() } - willExportCallback?(&spansToExport) - let result = spanExporter.export(spans: spansToExport, explicitTimeout: explicitTimeout) - if result == .success { - cond.lock() - processedSpansCounter?.add(value: spanList.count, attribute: [ - BatchSpanProcessor.SPAN_PROCESSOR_TYPE_LABEL: .string(BatchSpanProcessor.SPAN_PROCESSOR_TYPE_VALUE), - BatchSpanProcessor.SPAN_PROCESSOR_DROPPED_LABEL: .bool(false)]) - cond.unlock(); - } - } + private func exportAction(spanList: [ReadableSpan], explicitTimeout: TimeInterval? = nil) { + stride(from: 0, to: spanList.endIndex, by: maxExportBatchSize).forEach { + var spansToExport = spanList[$0 ..< min($0 + maxExportBatchSize, spanList.count)].map { $0.toSpanData() } + willExportCallback?(&spansToExport) + let result = spanExporter.export(spans: spansToExport, explicitTimeout: explicitTimeout) + if result == .success { + cond.lock() + processedSpansCounter?.add(value: spanList.count, attribute: [ + BatchSpanProcessor.SPAN_PROCESSOR_TYPE_LABEL: .string(BatchSpanProcessor.SPAN_PROCESSOR_TYPE_VALUE), + BatchSpanProcessor.SPAN_PROCESSOR_DROPPED_LABEL: .bool(false)]) + cond.unlock(); + } } + } }