Skip to content

Commit

Permalink
re-indented BatchSpanProcessor file.
Browse files Browse the repository at this point in the history
  • Loading branch information
bryce-b committed Nov 22, 2024
1 parent 6f1bfc2 commit 82d1bdd
Showing 1 changed file with 126 additions and 126 deletions.
252 changes: 126 additions & 126 deletions Sources/OpenTelemetrySdk/Trace/SpanProcessors/BatchSpanProcessor.swift
Original file line number Diff line number Diff line change
Expand Up @@ -15,36 +15,36 @@ import OpenTelemetryApi
/// exports the spans to wake up and start a new export cycle.
/// This batchSpanProcessor can cause high contention in a very high traffic service.
public struct BatchSpanProcessor: SpanProcessor {
fileprivate static let SPAN_PROCESSOR_TYPE_LABEL: String = "processorType"
fileprivate static let SPAN_PROCESSOR_DROPPED_LABEL: String = "dropped"
fileprivate static let SPAN_PROCESSOR_TYPE_VALUE: String = BatchSpanProcessor.name

fileprivate var worker: BatchWorker
fileprivate static let SPAN_PROCESSOR_TYPE_LABEL: String = "processorType"
fileprivate static let SPAN_PROCESSOR_DROPPED_LABEL: String = "dropped"
fileprivate static let SPAN_PROCESSOR_TYPE_VALUE: String = BatchSpanProcessor.name

public static var name: String {
String(describing: Self.self)
}

public init(
spanExporter: SpanExporter,
meterProvider: StableMeterProvider? = nil,
scheduleDelay: TimeInterval = 5,
exportTimeout: TimeInterval = 30,
maxQueueSize: Int = 2048,
maxExportBatchSize: Int = 512,
willExportCallback: ((inout [SpanData]) -> Void)? = nil
) {
worker = BatchWorker(
spanExporter: spanExporter,
meterProvider: meterProvider,
scheduleDelay: scheduleDelay,
exportTimeout: exportTimeout,
maxQueueSize: maxQueueSize,
maxExportBatchSize: maxExportBatchSize,
willExportCallback: willExportCallback
)
worker.start()
}
fileprivate var worker: BatchWorker

public static var name: String {
String(describing: Self.self)
}

public init(
spanExporter: SpanExporter,
meterProvider: StableMeterProvider? = nil,
scheduleDelay: TimeInterval = 5,
exportTimeout: TimeInterval = 30,
maxQueueSize: Int = 2048,
maxExportBatchSize: Int = 512,
willExportCallback: ((inout [SpanData]) -> Void)? = nil
) {
worker = BatchWorker(
spanExporter: spanExporter,
meterProvider: meterProvider,
scheduleDelay: scheduleDelay,
exportTimeout: exportTimeout,
maxQueueSize: maxQueueSize,
maxExportBatchSize: maxExportBatchSize,
willExportCallback: willExportCallback
)
worker.start()
}

public let isStartRequired = false
public let isEndRequired = true
Expand Down Expand Up @@ -72,95 +72,95 @@ public struct BatchSpanProcessor: SpanProcessor {
/// the data.
/// The list of batched data is protected by a NSCondition which ensures full concurrency.
private class BatchWorker: Thread {
let spanExporter: SpanExporter
let meterProvider: StableMeterProvider?
let scheduleDelay: TimeInterval
let maxQueueSize: Int
let exportTimeout: TimeInterval
let maxExportBatchSize: Int
let willExportCallback: ((inout [SpanData]) -> Void)?
let halfMaxQueueSize: Int
private let cond = NSCondition()
var spanList = [ReadableSpan]()
var queue: OperationQueue
let spanExporter: SpanExporter
let meterProvider: StableMeterProvider?
let scheduleDelay: TimeInterval
let maxQueueSize: Int
let exportTimeout: TimeInterval
let maxExportBatchSize: Int
let willExportCallback: ((inout [SpanData]) -> Void)?
let halfMaxQueueSize: Int
private let cond = NSCondition()
var spanList = [ReadableSpan]()
var queue: OperationQueue

private var queueSizeGauge: ObservableLongGauge?
private var spanGaugeObserver: ObservableLongGauge?
private var processedSpansCounter: LongCounter?

init(
spanExporter: SpanExporter,
meterProvider: StableMeterProvider? = nil,
scheduleDelay: TimeInterval,
exportTimeout: TimeInterval,
maxQueueSize: Int,
maxExportBatchSize: Int,
willExportCallback: ((inout [SpanData]) -> Void)?
) {
self.spanExporter = spanExporter
self.meterProvider = meterProvider
self.scheduleDelay = scheduleDelay
self.exportTimeout = exportTimeout
self.maxQueueSize = maxQueueSize
halfMaxQueueSize = maxQueueSize >> 1
self.maxExportBatchSize = maxExportBatchSize
self.willExportCallback = willExportCallback
queue = OperationQueue()
queue.name = "BatchWorker Queue"
queue.maxConcurrentOperationCount = 1

private var queueSizeGauge: ObservableLongGauge?
private var spanGaugeObserver: ObservableLongGauge?
private var processedSpansCounter: LongCounter?

init(
spanExporter: SpanExporter,
meterProvider: StableMeterProvider? = nil,
scheduleDelay: TimeInterval,
exportTimeout: TimeInterval,
maxQueueSize: Int,
maxExportBatchSize: Int,
willExportCallback: ((inout [SpanData]) -> Void)?
) {
self.spanExporter = spanExporter
self.meterProvider = meterProvider
self.scheduleDelay = scheduleDelay
self.exportTimeout = exportTimeout
self.maxQueueSize = maxQueueSize
halfMaxQueueSize = maxQueueSize >> 1
self.maxExportBatchSize = maxExportBatchSize
self.willExportCallback = willExportCallback
queue = OperationQueue()
queue.name = "BatchWorker Queue"
queue.maxConcurrentOperationCount = 1

if let meter = meterProvider?.meterBuilder(name: "io.opentelemetry.sdk.trace").build() {

var longGaugeSdk = meter.gaugeBuilder(name: "queueSize").ofLongs() as? LongGaugeBuilderSdk
longGaugeSdk = longGaugeSdk?.setDescription("The number of items queued")
longGaugeSdk = longGaugeSdk?.setUnit("1")
self.queueSizeGauge = longGaugeSdk?.buildWithCallback { result in
if let meter = meterProvider?.meterBuilder(name: "io.opentelemetry.sdk.trace").build() {

var longGaugeSdk = meter.gaugeBuilder(name: "queueSize").ofLongs() as? LongGaugeBuilderSdk
longGaugeSdk = longGaugeSdk?.setDescription("The number of items queued")
longGaugeSdk = longGaugeSdk?.setUnit("1")
self.queueSizeGauge = longGaugeSdk?.buildWithCallback { result in
result.record(
value: maxQueueSize,
attributes: [
BatchSpanProcessor.SPAN_PROCESSOR_TYPE_LABEL: .string(BatchSpanProcessor.SPAN_PROCESSOR_TYPE_VALUE)
]
)
}

var longCounterSdk = meter.counterBuilder(name: "processedSpans") as? LongCounterMeterBuilderSdk
longCounterSdk = longCounterSdk?.setUnit("1")
longCounterSdk = longCounterSdk?.setDescription("The number of spans processed by the BatchSpanProcessor. [dropped=true if they were dropped due to high throughput]")
processedSpansCounter = longCounterSdk?.build()

// Subscribe to new gauge observer
self.spanGaugeObserver = meter.gaugeBuilder(name: "spanSize")
.ofLongs()
.buildWithCallback { [count = spanList.count] result in
result.record(
value: maxQueueSize,
value: count,
attributes: [
BatchSpanProcessor.SPAN_PROCESSOR_TYPE_LABEL: .string(BatchSpanProcessor.SPAN_PROCESSOR_TYPE_VALUE)
]
)
}

var longCounterSdk = meter.counterBuilder(name: "processedSpans") as? LongCounterMeterBuilderSdk
longCounterSdk = longCounterSdk?.setUnit("1")
longCounterSdk = longCounterSdk?.setDescription("The number of spans processed by the BatchSpanProcessor. [dropped=true if they were dropped due to high throughput]")
processedSpansCounter = longCounterSdk?.build()

// Subscribe to new gauge observer
self.spanGaugeObserver = meter.gaugeBuilder(name: "spanSize")
.ofLongs()
.buildWithCallback { [count = spanList.count] result in
result.record(
value: count,
attributes: [
BatchSpanProcessor.SPAN_PROCESSOR_TYPE_LABEL: .string(BatchSpanProcessor.SPAN_PROCESSOR_TYPE_VALUE)
]
)
}
}
}
}

deinit {
// Cleanup all gauge observer
self.queueSizeGauge?.close()
self.spanGaugeObserver?.close()
}

deinit {
// Cleanup all gauge observer
self.queueSizeGauge?.close()
self.spanGaugeObserver?.close()
}

func addSpan(span: ReadableSpan) {
cond.lock()
defer { cond.unlock() }

if spanList.count == maxQueueSize {
processedSpansCounter?.add(value: 1, attribute: [
BatchSpanProcessor.SPAN_PROCESSOR_TYPE_LABEL: .string(BatchSpanProcessor.SPAN_PROCESSOR_TYPE_VALUE),
BatchSpanProcessor.SPAN_PROCESSOR_DROPPED_LABEL: .bool(true)
])
processedSpansCounter?.add(value: 1, attribute: [
BatchSpanProcessor.SPAN_PROCESSOR_TYPE_LABEL: .string(BatchSpanProcessor.SPAN_PROCESSOR_TYPE_VALUE),
BatchSpanProcessor.SPAN_PROCESSOR_DROPPED_LABEL: .bool(true)
])
return
}
spanList.append(span)

// Notify the worker thread that at half of the queue is available. It will take
// time anyway for the thread to wake up.
if spanList.count >= halfMaxQueueSize {
Expand All @@ -170,18 +170,18 @@ private class BatchWorker: Thread {

override func main() {
repeat {
autoreleasepool {
var spansCopy: [ReadableSpan]
cond.lock()
if spanList.count < maxExportBatchSize {
repeat {
cond.wait(until: Date().addingTimeInterval(scheduleDelay))
} while spanList.isEmpty && !self.isCancelled
}
spansCopy = spanList
spanList.removeAll()
cond.unlock()
self.exportBatch(spanList: spansCopy, explicitTimeout: self.exportTimeout)
autoreleasepool {
var spansCopy: [ReadableSpan]
cond.lock()
if spanList.count < maxExportBatchSize {
repeat {
cond.wait(until: Date().addingTimeInterval(scheduleDelay))
} while spanList.isEmpty && !self.isCancelled
}
spansCopy = spanList
spanList.removeAll()
cond.unlock()
self.exportBatch(spanList: spansCopy, explicitTimeout: self.exportTimeout)
}
} while !self.isCancelled
}
Expand Down Expand Up @@ -218,18 +218,18 @@ private class BatchWorker: Thread {
timeoutTimer.cancel()
}

private func exportAction(spanList: [ReadableSpan], explicitTimeout: TimeInterval? = nil) {
stride(from: 0, to: spanList.endIndex, by: maxExportBatchSize).forEach {
var spansToExport = spanList[$0 ..< min($0 + maxExportBatchSize, spanList.count)].map { $0.toSpanData() }
willExportCallback?(&spansToExport)
let result = spanExporter.export(spans: spansToExport, explicitTimeout: explicitTimeout)
if result == .success {
cond.lock()
processedSpansCounter?.add(value: spanList.count, attribute: [
BatchSpanProcessor.SPAN_PROCESSOR_TYPE_LABEL: .string(BatchSpanProcessor.SPAN_PROCESSOR_TYPE_VALUE),
BatchSpanProcessor.SPAN_PROCESSOR_DROPPED_LABEL: .bool(false)])
cond.unlock();
}
}
private func exportAction(spanList: [ReadableSpan], explicitTimeout: TimeInterval? = nil) {
stride(from: 0, to: spanList.endIndex, by: maxExportBatchSize).forEach {
var spansToExport = spanList[$0 ..< min($0 + maxExportBatchSize, spanList.count)].map { $0.toSpanData() }
willExportCallback?(&spansToExport)
let result = spanExporter.export(spans: spansToExport, explicitTimeout: explicitTimeout)
if result == .success {
cond.lock()
processedSpansCounter?.add(value: spanList.count, attribute: [
BatchSpanProcessor.SPAN_PROCESSOR_TYPE_LABEL: .string(BatchSpanProcessor.SPAN_PROCESSOR_TYPE_VALUE),
BatchSpanProcessor.SPAN_PROCESSOR_DROPPED_LABEL: .bool(false)])
cond.unlock();
}
}
}
}

0 comments on commit 82d1bdd

Please sign in to comment.