Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added BatchSpanProcessor metrics on drop and export span #635

Merged
26 changes: 13 additions & 13 deletions Sources/OpenTelemetryApi/Metrics/Stable/DefaultStableMeter.swift
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public class DefaultStableMeter : StableMeter {
}
}

private class NoopLongGaugeBuilder : LongGaugeBuilder {
private class NoopLongGaugeBuilder : LongGaugeBuilder {
mamunto marked this conversation as resolved.
Show resolved Hide resolved
func buildWithCallback(_ callback: @escaping (ObservableLongMeasurement) -> Void) -> ObservableLongGauge {
NoopObservableLongGauge()
}
Expand Down Expand Up @@ -123,18 +123,18 @@ public class DefaultStableMeter : StableMeter {
func add(value: Int, attribute: [String : AttributeValue]) {}
}

private class NoopLongCounterBuilder : LongCounterBuilder {
func ofDoubles() -> DoubleCounterBuilder {
NoopDoubleCounterBuilder()
}

func build() -> LongCounter {
NoopLongCounter()
}

func buildWithCallback(_ callback: @escaping (ObservableLongMeasurement) -> Void) -> ObservableLongCounter {
NoopObservableLongCounter()
}
private class NoopLongCounterBuilder : LongCounterBuilder {
mamunto marked this conversation as resolved.
Show resolved Hide resolved
func ofDoubles() -> DoubleCounterBuilder {
nachoBonafonte marked this conversation as resolved.
Show resolved Hide resolved
NoopDoubleCounterBuilder()
}
func build() -> LongCounter {
NoopLongCounter()
}
func buildWithCallback(_ callback: @escaping (ObservableLongMeasurement) -> Void) -> ObservableLongCounter {
NoopObservableLongCounter()
}
}

private class NoopDoubleCounterBuilder : DoubleCounterBuilder {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,4 @@ public class LongGaugeBuilderSdk : LongGaugeBuilder, InstrumentBuilder {
public func buildWithCallback(_ callback: @escaping (OpenTelemetryApi.ObservableLongMeasurement) -> Void) -> OpenTelemetryApi.ObservableLongGauge {
registerLongAsynchronousInstrument(type: type, updater: callback)
}




}
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,36 @@ import OpenTelemetryApi
/// exports the spans to wake up and start a new export cycle.
/// This batchSpanProcessor can cause high contention in a very high traffic service.
public struct BatchSpanProcessor: SpanProcessor {
fileprivate static let SPAN_PROCESSOR_TYPE_LABEL: String = "processorType"
fileprivate static let SPAN_PROCESSOR_DROPPED_LABEL: String = "dropped"
fileprivate static let SPAN_PROCESSOR_TYPE_VALUE: String = BatchSpanProcessor.name

fileprivate var worker: BatchWorker


fileprivate var worker: BatchWorker

public init(spanExporter: SpanExporter, scheduleDelay: TimeInterval = 5, exportTimeout: TimeInterval = 30,
maxQueueSize: Int = 2048, maxExportBatchSize: Int = 512, willExportCallback: ((inout [SpanData]) -> Void)? = nil)
{
worker = BatchWorker(spanExporter: spanExporter,
scheduleDelay: scheduleDelay,
exportTimeout: exportTimeout,
maxQueueSize: maxQueueSize,
maxExportBatchSize: maxExportBatchSize,
willExportCallback: willExportCallback)
worker.start()
}
public static var name: String {
String(describing: Self.self)
}

public init(
spanExporter: SpanExporter,
meterProvider: StableMeterProvider,
scheduleDelay: TimeInterval = 5,
exportTimeout: TimeInterval = 30,
maxQueueSize: Int = 2048,
maxExportBatchSize: Int = 512,
willExportCallback: ((inout [SpanData]) -> Void)? = nil
) {
worker = BatchWorker(
spanExporter: spanExporter,
meterProvider: meterProvider,
scheduleDelay: scheduleDelay,
exportTimeout: exportTimeout,
maxQueueSize: maxQueueSize,
maxExportBatchSize: maxExportBatchSize,
willExportCallback: willExportCallback
)
worker.start()
}

public let isStartRequired = false
public let isEndRequired = true
Expand Down Expand Up @@ -57,40 +72,107 @@ public struct BatchSpanProcessor: SpanProcessor {
/// the data.
/// The list of batched data is protected by a NSCondition which ensures full concurrency.
private class BatchWorker: Thread {
let spanExporter: SpanExporter
let scheduleDelay: TimeInterval
let maxQueueSize: Int
let exportTimeout: TimeInterval
let maxExportBatchSize: Int
let willExportCallback: ((inout [SpanData]) -> Void)?
let halfMaxQueueSize: Int
private let cond = NSCondition()
var spanList = [ReadableSpan]()
var queue: OperationQueue

init(spanExporter: SpanExporter, scheduleDelay: TimeInterval, exportTimeout: TimeInterval, maxQueueSize: Int, maxExportBatchSize: Int, willExportCallback: ((inout [SpanData]) -> Void)?) {
self.spanExporter = spanExporter
self.scheduleDelay = scheduleDelay
self.exportTimeout = exportTimeout
self.maxQueueSize = maxQueueSize
halfMaxQueueSize = maxQueueSize >> 1
self.maxExportBatchSize = maxExportBatchSize
self.willExportCallback = willExportCallback
queue = OperationQueue()
queue.name = "BatchWorker Queue"
queue.maxConcurrentOperationCount = 1
}
let spanExporter: SpanExporter
let meterProvider: StableMeterProvider
let scheduleDelay: TimeInterval
let maxQueueSize: Int
let exportTimeout: TimeInterval
let maxExportBatchSize: Int
let willExportCallback: ((inout [SpanData]) -> Void)?
let halfMaxQueueSize: Int
private let cond = NSCondition()
var spanList = [ReadableSpan]()
var queue: OperationQueue

private var queueSizeGauge: ObservableLongGauge?
private var spanGaugeObserver: ObservableLongGauge?

private var processedSpansCounter: LongCounter?
private let droppedAttrs: [String: AttributeValue]
private let exportedAttrs: [String: AttributeValue]
private let spanGaugeBuilder: LongGaugeBuilder
init(
spanExporter: SpanExporter,
meterProvider: StableMeterProvider,
scheduleDelay: TimeInterval,
exportTimeout: TimeInterval,
maxQueueSize: Int,
maxExportBatchSize: Int,
willExportCallback: ((inout [SpanData]) -> Void)?
) {
self.spanExporter = spanExporter
self.meterProvider = meterProvider
self.scheduleDelay = scheduleDelay
self.exportTimeout = exportTimeout
self.maxQueueSize = maxQueueSize
halfMaxQueueSize = maxQueueSize >> 1
self.maxExportBatchSize = maxExportBatchSize
self.willExportCallback = willExportCallback
queue = OperationQueue()
queue.name = "BatchWorker Queue"
queue.maxConcurrentOperationCount = 1

let meter = meterProvider.meterBuilder(name: "io.opentelemetry.sdk.trace").build()

var longGaugeSdk = meter.gaugeBuilder(name: "queueSize").ofLongs() as? LongGaugeBuilderSdk
longGaugeSdk = longGaugeSdk?.setDescription("The number of items queued")
longGaugeSdk = longGaugeSdk?.setUnit("1")
self.queueSizeGauge = longGaugeSdk?.buildWithCallback { result in
nachoBonafonte marked this conversation as resolved.
Show resolved Hide resolved
result.record(
value: maxQueueSize,
attributes: [
BatchSpanProcessor.SPAN_PROCESSOR_TYPE_LABEL: .string(BatchSpanProcessor.SPAN_PROCESSOR_TYPE_VALUE)
]
)
}

self.spanGaugeBuilder = meter.gaugeBuilder(name: "spanSize")
.ofLongs()

var longCounterSdk = meter.counterBuilder(name: "processedSpans") as? LongCounterMeterBuilderSdk
longCounterSdk = longCounterSdk?.setUnit("1")
longCounterSdk = longCounterSdk?.setDescription("The number of spans processed by the BatchSpanProcessor. [dropped=true if they were dropped due to high throughput]")
processedSpansCounter = longCounterSdk?.build()
nachoBonafonte marked this conversation as resolved.
Show resolved Hide resolved

droppedAttrs = [
BatchSpanProcessor.SPAN_PROCESSOR_TYPE_LABEL: .string(BatchSpanProcessor.SPAN_PROCESSOR_TYPE_VALUE),
BatchSpanProcessor.SPAN_PROCESSOR_DROPPED_LABEL: .bool(true)
]
exportedAttrs = [
BatchSpanProcessor.SPAN_PROCESSOR_TYPE_LABEL: .string(BatchSpanProcessor.SPAN_PROCESSOR_TYPE_VALUE),
BatchSpanProcessor.SPAN_PROCESSOR_DROPPED_LABEL: .bool(false)
]
}

deinit {
// Cleanup all gauge observer
self.queueSizeGauge?.close()
self.spanGaugeObserver?.close()
}
Comment on lines +158 to +162
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Clean up here cc: @bryce-b


func addSpan(span: ReadableSpan) {
cond.lock()
defer { cond.unlock() }

if spanList.count == maxQueueSize {
// TODO: Record a counter for dropped spans.
processedSpansCounter?.add(value: 1, attribute: droppedAttrs)
return
}
// TODO: Record a gauge for referenced spans.
spanList.append(span)

// If there is any observer before, let's close it
self.spanGaugeObserver?.close()
mamunto marked this conversation as resolved.
Show resolved Hide resolved
// Subscribe to new gauge observer
self.spanGaugeObserver = self.spanGaugeBuilder
.buildWithCallback { [count = spanList.count] result in
result.record(
value: count,
attributes: [
BatchSpanProcessor.SPAN_PROCESSOR_TYPE_LABEL: .string(BatchSpanProcessor.SPAN_PROCESSOR_TYPE_VALUE)
]
)
}

// Notify the worker thread that at half of the queue is available. It will take
// time anyway for the thread to wake up.
if spanList.count >= halfMaxQueueSize {
Expand Down Expand Up @@ -148,11 +230,16 @@ private class BatchWorker: Thread {
timeoutTimer.cancel()
}

private func exportAction(spanList: [ReadableSpan], explicitTimeout: TimeInterval? = nil) {
stride(from: 0, to: spanList.endIndex, by: maxExportBatchSize).forEach {
var spansToExport = spanList[$0 ..< min($0 + maxExportBatchSize, spanList.count)].map { $0.toSpanData() }
willExportCallback?(&spansToExport)
spanExporter.export(spans: spansToExport, explicitTimeout: explicitTimeout)
private func exportAction(spanList: [ReadableSpan], explicitTimeout: TimeInterval? = nil) {
stride(from: 0, to: spanList.endIndex, by: maxExportBatchSize).forEach {
var spansToExport = spanList[$0 ..< min($0 + maxExportBatchSize, spanList.count)].map { $0.toSpanData() }
willExportCallback?(&spansToExport)
let result = spanExporter.export(spans: spansToExport, explicitTimeout: explicitTimeout)
if result == .success {
cond.lock()
processedSpansCounter?.add(value: spanList.count, attribute: exportedAttrs)
cond.unlock()
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,22 @@ class BatchSpansProcessorTests: XCTestCase {
}

func testStartEndRequirements() {
let spansProcessor = BatchSpanProcessor(spanExporter: WaitingSpanExporter(numberToWaitFor: 0))
let spansProcessor = BatchSpanProcessor(
spanExporter: WaitingSpanExporter(numberToWaitFor: 0),
meterProvider: DefaultStableMeterProvider.instance
)
XCTAssertFalse(spansProcessor.isStartRequired)
XCTAssertTrue(spansProcessor.isEndRequired)
}

func testExportDifferentSampledSpans() {
let waitingSpanExporter = WaitingSpanExporter(numberToWaitFor: 2)

tracerSdkFactory.addSpanProcessor(BatchSpanProcessor(spanExporter: waitingSpanExporter, scheduleDelay: maxScheduleDelay))
tracerSdkFactory.addSpanProcessor(BatchSpanProcessor(
spanExporter: waitingSpanExporter,
meterProvider: DefaultStableMeterProvider.instance,
scheduleDelay: maxScheduleDelay)
)
let span1 = createSampledEndedSpan(spanName: spanName1)
let span2 = createSampledEndedSpan(spanName: spanName2)
let exported = waitingSpanExporter.waitForExport()
Expand All @@ -63,7 +70,12 @@ class BatchSpansProcessorTests: XCTestCase {
func testExportMoreSpansThanTheBufferSize() {
let waitingSpanExporter = WaitingSpanExporter(numberToWaitFor: 6)

tracerSdkFactory.addSpanProcessor(BatchSpanProcessor(spanExporter: waitingSpanExporter, scheduleDelay: maxScheduleDelay, maxQueueSize: 6, maxExportBatchSize: 2))
tracerSdkFactory.addSpanProcessor(BatchSpanProcessor(
spanExporter: waitingSpanExporter,
meterProvider: DefaultStableMeterProvider.instance,
scheduleDelay: maxScheduleDelay,
maxQueueSize: 6, maxExportBatchSize: 2)
)

let span1 = createSampledEndedSpan(spanName: spanName1)
let span2 = createSampledEndedSpan(spanName: spanName1)
Expand All @@ -82,7 +94,13 @@ class BatchSpansProcessorTests: XCTestCase {

func testForceExport() {
let waitingSpanExporter = WaitingSpanExporter(numberToWaitFor: 1)
let batchSpansProcessor = BatchSpanProcessor(spanExporter: waitingSpanExporter, scheduleDelay: 10, maxQueueSize: 10000, maxExportBatchSize: 2000)
let batchSpansProcessor = BatchSpanProcessor(
spanExporter: waitingSpanExporter,
meterProvider: DefaultStableMeterProvider.instance,
scheduleDelay: 10,
maxQueueSize: 10000,
maxExportBatchSize: 2000
)
tracerSdkFactory.addSpanProcessor(batchSpansProcessor)

for _ in 0 ..< 100 {
Expand All @@ -96,7 +114,10 @@ class BatchSpansProcessorTests: XCTestCase {
func testExportSpansToMultipleServices() {
let waitingSpanExporter = WaitingSpanExporter(numberToWaitFor: 2)
let waitingSpanExporter2 = WaitingSpanExporter(numberToWaitFor: 2)
tracerSdkFactory.addSpanProcessor(BatchSpanProcessor(spanExporter: MultiSpanExporter(spanExporters: [waitingSpanExporter, waitingSpanExporter2]), scheduleDelay: maxScheduleDelay))
tracerSdkFactory.addSpanProcessor(BatchSpanProcessor(
spanExporter: MultiSpanExporter(spanExporters: [waitingSpanExporter, waitingSpanExporter2]),
meterProvider: DefaultStableMeterProvider.instance,
scheduleDelay: maxScheduleDelay))

let span1 = createSampledEndedSpan(spanName: spanName1)
let span2 = createSampledEndedSpan(spanName: spanName2)
Expand All @@ -110,7 +131,13 @@ class BatchSpansProcessorTests: XCTestCase {
let maxQueuedSpans = 8
let waitingSpanExporter = WaitingSpanExporter(numberToWaitFor: maxQueuedSpans)

tracerSdkFactory.addSpanProcessor(BatchSpanProcessor(spanExporter: MultiSpanExporter(spanExporters: [waitingSpanExporter, blockingSpanExporter]), scheduleDelay: maxScheduleDelay, maxQueueSize: maxQueuedSpans, maxExportBatchSize: maxQueuedSpans / 2))
tracerSdkFactory.addSpanProcessor(BatchSpanProcessor(
spanExporter: MultiSpanExporter(spanExporters: [waitingSpanExporter, blockingSpanExporter]),
meterProvider: DefaultStableMeterProvider.instance,
scheduleDelay: maxScheduleDelay,
maxQueueSize: maxQueuedSpans,
maxExportBatchSize: maxQueuedSpans / 2)
)

var spansToExport = [SpanData]()
// Wait to block the worker thread in the BatchSampledSpansProcessor. This ensures that no items
Expand Down Expand Up @@ -162,7 +189,11 @@ class BatchSpansProcessorTests: XCTestCase {
func testExportNotSampledSpans() {
let waitingSpanExporter = WaitingSpanExporter(numberToWaitFor: 1)

tracerSdkFactory.addSpanProcessor(BatchSpanProcessor(spanExporter: waitingSpanExporter, scheduleDelay: maxScheduleDelay))
tracerSdkFactory.addSpanProcessor(BatchSpanProcessor(
spanExporter: waitingSpanExporter,
meterProvider: DefaultStableMeterProvider.instance,
scheduleDelay: maxScheduleDelay)
)

createNotSampledEndedSpan(spanName: spanName1)
createNotSampledEndedSpan(spanName: spanName2)
Expand All @@ -181,7 +212,11 @@ class BatchSpansProcessorTests: XCTestCase {
let waitingSpanExporter = WaitingSpanExporter(numberToWaitFor: 1)

// Set the export delay to zero, for no timeout, in order to confirm the #flush() below works
tracerSdkFactory.addSpanProcessor(BatchSpanProcessor(spanExporter: waitingSpanExporter, scheduleDelay: 0.1))
tracerSdkFactory.addSpanProcessor(BatchSpanProcessor(
spanExporter: waitingSpanExporter,
meterProvider: DefaultStableMeterProvider.instance,
scheduleDelay: 0.1)
)

let span2 = createSampledEndedSpan(spanName: spanName2)

Expand Down
Loading