Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

made meter provider optional in BatchSpanProcessor #645

Merged
merged 2 commits into from
Nov 23, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
270 changes: 131 additions & 139 deletions Sources/OpenTelemetrySdk/Trace/SpanProcessors/BatchSpanProcessor.swift
Original file line number Diff line number Diff line change
Expand Up @@ -15,36 +15,36 @@ import OpenTelemetryApi
/// exports the spans to wake up and start a new export cycle.
/// This batchSpanProcessor can cause high contention in a very high traffic service.
public struct BatchSpanProcessor: SpanProcessor {
fileprivate static let SPAN_PROCESSOR_TYPE_LABEL: String = "processorType"
fileprivate static let SPAN_PROCESSOR_DROPPED_LABEL: String = "dropped"
fileprivate static let SPAN_PROCESSOR_TYPE_VALUE: String = BatchSpanProcessor.name

fileprivate var worker: BatchWorker
fileprivate static let SPAN_PROCESSOR_TYPE_LABEL: String = "processorType"
fileprivate static let SPAN_PROCESSOR_DROPPED_LABEL: String = "dropped"
fileprivate static let SPAN_PROCESSOR_TYPE_VALUE: String = BatchSpanProcessor.name

public static var name: String {
String(describing: Self.self)
}

public init(
spanExporter: SpanExporter,
meterProvider: StableMeterProvider,
scheduleDelay: TimeInterval = 5,
exportTimeout: TimeInterval = 30,
maxQueueSize: Int = 2048,
maxExportBatchSize: Int = 512,
willExportCallback: ((inout [SpanData]) -> Void)? = nil
) {
worker = BatchWorker(
spanExporter: spanExporter,
meterProvider: meterProvider,
scheduleDelay: scheduleDelay,
exportTimeout: exportTimeout,
maxQueueSize: maxQueueSize,
maxExportBatchSize: maxExportBatchSize,
willExportCallback: willExportCallback
)
worker.start()
}
fileprivate var worker: BatchWorker

public static var name: String {
String(describing: Self.self)
}

public init(
spanExporter: SpanExporter,
meterProvider: StableMeterProvider? = nil,
scheduleDelay: TimeInterval = 5,
exportTimeout: TimeInterval = 30,
maxQueueSize: Int = 2048,
maxExportBatchSize: Int = 512,
willExportCallback: ((inout [SpanData]) -> Void)? = nil
) {
worker = BatchWorker(
spanExporter: spanExporter,
meterProvider: meterProvider,
scheduleDelay: scheduleDelay,
exportTimeout: exportTimeout,
maxQueueSize: maxQueueSize,
maxExportBatchSize: maxExportBatchSize,
willExportCallback: willExportCallback
)
worker.start()
}

public let isStartRequired = false
public let isEndRequired = true
Expand Down Expand Up @@ -72,105 +72,95 @@ public struct BatchSpanProcessor: SpanProcessor {
/// the data.
/// The list of batched data is protected by a NSCondition which ensures full concurrency.
private class BatchWorker: Thread {
let spanExporter: SpanExporter
let meterProvider: StableMeterProvider
let scheduleDelay: TimeInterval
let maxQueueSize: Int
let exportTimeout: TimeInterval
let maxExportBatchSize: Int
let willExportCallback: ((inout [SpanData]) -> Void)?
let halfMaxQueueSize: Int
private let cond = NSCondition()
var spanList = [ReadableSpan]()
var queue: OperationQueue

private var queueSizeGauge: ObservableLongGauge?
private var spanGaugeObserver: ObservableLongGauge?
let spanExporter: SpanExporter
let meterProvider: StableMeterProvider?
let scheduleDelay: TimeInterval
let maxQueueSize: Int
let exportTimeout: TimeInterval
let maxExportBatchSize: Int
let willExportCallback: ((inout [SpanData]) -> Void)?
let halfMaxQueueSize: Int
private let cond = NSCondition()
var spanList = [ReadableSpan]()
var queue: OperationQueue

private var queueSizeGauge: ObservableLongGauge?
private var spanGaugeObserver: ObservableLongGauge?
private var processedSpansCounter: LongCounter?

init(
spanExporter: SpanExporter,
meterProvider: StableMeterProvider? = nil,
scheduleDelay: TimeInterval,
exportTimeout: TimeInterval,
maxQueueSize: Int,
maxExportBatchSize: Int,
willExportCallback: ((inout [SpanData]) -> Void)?
) {
self.spanExporter = spanExporter
self.meterProvider = meterProvider
self.scheduleDelay = scheduleDelay
self.exportTimeout = exportTimeout
self.maxQueueSize = maxQueueSize
halfMaxQueueSize = maxQueueSize >> 1
self.maxExportBatchSize = maxExportBatchSize
self.willExportCallback = willExportCallback
queue = OperationQueue()
queue.name = "BatchWorker Queue"
queue.maxConcurrentOperationCount = 1

private var processedSpansCounter: LongCounter?
private let droppedAttrs: [String: AttributeValue]
private let exportedAttrs: [String: AttributeValue]
private let spanGaugeBuilder: LongGaugeBuilder
init(
spanExporter: SpanExporter,
meterProvider: StableMeterProvider,
scheduleDelay: TimeInterval,
exportTimeout: TimeInterval,
maxQueueSize: Int,
maxExportBatchSize: Int,
willExportCallback: ((inout [SpanData]) -> Void)?
) {
self.spanExporter = spanExporter
self.meterProvider = meterProvider
self.scheduleDelay = scheduleDelay
self.exportTimeout = exportTimeout
self.maxQueueSize = maxQueueSize
halfMaxQueueSize = maxQueueSize >> 1
self.maxExportBatchSize = maxExportBatchSize
self.willExportCallback = willExportCallback
queue = OperationQueue()
queue.name = "BatchWorker Queue"
queue.maxConcurrentOperationCount = 1

let meter = meterProvider.meterBuilder(name: "io.opentelemetry.sdk.trace").build()

var longGaugeSdk = meter.gaugeBuilder(name: "queueSize").ofLongs() as? LongGaugeBuilderSdk
longGaugeSdk = longGaugeSdk?.setDescription("The number of items queued")
longGaugeSdk = longGaugeSdk?.setUnit("1")
self.queueSizeGauge = longGaugeSdk?.buildWithCallback { result in
result.record(
value: maxQueueSize,
attributes: [
BatchSpanProcessor.SPAN_PROCESSOR_TYPE_LABEL: .string(BatchSpanProcessor.SPAN_PROCESSOR_TYPE_VALUE)
]
)
if let meter = meterProvider?.meterBuilder(name: "io.opentelemetry.sdk.trace").build() {

var longGaugeSdk = meter.gaugeBuilder(name: "queueSize").ofLongs() as? LongGaugeBuilderSdk
longGaugeSdk = longGaugeSdk?.setDescription("The number of items queued")
longGaugeSdk = longGaugeSdk?.setUnit("1")
self.queueSizeGauge = longGaugeSdk?.buildWithCallback { result in
result.record(
value: maxQueueSize,
attributes: [
BatchSpanProcessor.SPAN_PROCESSOR_TYPE_LABEL: .string(BatchSpanProcessor.SPAN_PROCESSOR_TYPE_VALUE)
]
)
}

var longCounterSdk = meter.counterBuilder(name: "processedSpans") as? LongCounterMeterBuilderSdk
longCounterSdk = longCounterSdk?.setUnit("1")
longCounterSdk = longCounterSdk?.setDescription("The number of spans processed by the BatchSpanProcessor. [dropped=true if they were dropped due to high throughput]")
processedSpansCounter = longCounterSdk?.build()

// Subscribe to new gauge observer
self.spanGaugeObserver = meter.gaugeBuilder(name: "spanSize")
.ofLongs()
.buildWithCallback { [count = spanList.count] result in
result.record(
value: count,
attributes: [
BatchSpanProcessor.SPAN_PROCESSOR_TYPE_LABEL: .string(BatchSpanProcessor.SPAN_PROCESSOR_TYPE_VALUE)
]
)
}

self.spanGaugeBuilder = meter.gaugeBuilder(name: "spanSize")
.ofLongs()

var longCounterSdk = meter.counterBuilder(name: "processedSpans") as? LongCounterMeterBuilderSdk
longCounterSdk = longCounterSdk?.setUnit("1")
longCounterSdk = longCounterSdk?.setDescription("The number of spans processed by the BatchSpanProcessor. [dropped=true if they were dropped due to high throughput]")
processedSpansCounter = longCounterSdk?.build()

droppedAttrs = [
BatchSpanProcessor.SPAN_PROCESSOR_TYPE_LABEL: .string(BatchSpanProcessor.SPAN_PROCESSOR_TYPE_VALUE),
BatchSpanProcessor.SPAN_PROCESSOR_DROPPED_LABEL: .bool(true)
]
exportedAttrs = [
BatchSpanProcessor.SPAN_PROCESSOR_TYPE_LABEL: .string(BatchSpanProcessor.SPAN_PROCESSOR_TYPE_VALUE),
BatchSpanProcessor.SPAN_PROCESSOR_DROPPED_LABEL: .bool(false)
]

// Subscribe to new gauge observer
self.spanGaugeObserver = self.spanGaugeBuilder
.buildWithCallback { [count = spanList.count] result in
result.record(
value: count,
attributes: [
BatchSpanProcessor.SPAN_PROCESSOR_TYPE_LABEL: .string(BatchSpanProcessor.SPAN_PROCESSOR_TYPE_VALUE)
]
)
}
}
}

deinit {
// Cleanup all gauge observer
self.queueSizeGauge?.close()
self.spanGaugeObserver?.close()
}

deinit {
// Cleanup all gauge observer
self.queueSizeGauge?.close()
self.spanGaugeObserver?.close()
}

func addSpan(span: ReadableSpan) {
cond.lock()
defer { cond.unlock() }

if spanList.count == maxQueueSize {
processedSpansCounter?.add(value: 1, attribute: droppedAttrs)
processedSpansCounter?.add(value: 1, attribute: [
BatchSpanProcessor.SPAN_PROCESSOR_TYPE_LABEL: .string(BatchSpanProcessor.SPAN_PROCESSOR_TYPE_VALUE),
BatchSpanProcessor.SPAN_PROCESSOR_DROPPED_LABEL: .bool(true)
])
return
}
spanList.append(span)

// Notify the worker thread that at half of the queue is available. It will take
// time anyway for the thread to wake up.
if spanList.count >= halfMaxQueueSize {
Expand All @@ -180,18 +170,18 @@ private class BatchWorker: Thread {

override func main() {
repeat {
autoreleasepool {
var spansCopy: [ReadableSpan]
cond.lock()
if spanList.count < maxExportBatchSize {
repeat {
cond.wait(until: Date().addingTimeInterval(scheduleDelay))
} while spanList.isEmpty && !self.isCancelled
}
spansCopy = spanList
spanList.removeAll()
cond.unlock()
self.exportBatch(spanList: spansCopy, explicitTimeout: self.exportTimeout)
autoreleasepool {
var spansCopy: [ReadableSpan]
cond.lock()
if spanList.count < maxExportBatchSize {
repeat {
cond.wait(until: Date().addingTimeInterval(scheduleDelay))
} while spanList.isEmpty && !self.isCancelled
}
spansCopy = spanList
spanList.removeAll()
cond.unlock()
self.exportBatch(spanList: spansCopy, explicitTimeout: self.exportTimeout)
}
} while !self.isCancelled
}
Expand Down Expand Up @@ -228,16 +218,18 @@ private class BatchWorker: Thread {
timeoutTimer.cancel()
}

private func exportAction(spanList: [ReadableSpan], explicitTimeout: TimeInterval? = nil) {
stride(from: 0, to: spanList.endIndex, by: maxExportBatchSize).forEach {
var spansToExport = spanList[$0 ..< min($0 + maxExportBatchSize, spanList.count)].map { $0.toSpanData() }
willExportCallback?(&spansToExport)
let result = spanExporter.export(spans: spansToExport, explicitTimeout: explicitTimeout)
if result == .success {
cond.lock()
processedSpansCounter?.add(value: spanList.count, attribute: exportedAttrs)
cond.unlock()
}
}
private func exportAction(spanList: [ReadableSpan], explicitTimeout: TimeInterval? = nil) {
stride(from: 0, to: spanList.endIndex, by: maxExportBatchSize).forEach {
var spansToExport = spanList[$0 ..< min($0 + maxExportBatchSize, spanList.count)].map { $0.toSpanData() }
willExportCallback?(&spansToExport)
let result = spanExporter.export(spans: spansToExport, explicitTimeout: explicitTimeout)
if result == .success {
cond.lock()
processedSpansCounter?.add(value: spanList.count, attribute: [
BatchSpanProcessor.SPAN_PROCESSOR_TYPE_LABEL: .string(BatchSpanProcessor.SPAN_PROCESSOR_TYPE_VALUE),
BatchSpanProcessor.SPAN_PROCESSOR_DROPPED_LABEL: .bool(false)])
cond.unlock();
}
}
}
}
Loading