From 9c94df3d0f8eb0781018d00f9423d14940ada8b1 Mon Sep 17 00:00:00 2001 From: Martijn Laarman Date: Tue, 8 Oct 2024 15:39:43 +0200 Subject: [PATCH] Remove last reflection based code path --- src/Elastic.Channels/BufferedChannelBase.cs | 14 ++++----- .../Diagnostics/NoopBufferedChannel.cs | 4 +-- .../ResponseItemsBufferedChannelBase.cs | 8 ++--- src/Elastic.Ingest.Apm/ApmChannel.cs | 2 +- .../DataStreams/DataStreamChannel.cs | 3 +- .../ElasticsearchChannelBase.Serialization.cs | 2 -- .../ElasticsearchChannelBase.cs | 30 ++++++------------- .../ElasticsearchChannelOptionsBase.cs | 6 ++++ .../Indices/IndexChannel.cs | 3 +- .../Serialization/BulkHeader.cs | 1 - .../CustomOtlpTraceExporter.cs | 2 +- .../TransportChannelBase.cs | 8 ++--- .../DataStreamChannelTests.cs | 11 ++++++- .../IndexChannelTests.cs | 10 ++++++- .../SubPathTests.cs | 10 ++++++- 15 files changed, 66 insertions(+), 48 deletions(-) diff --git a/src/Elastic.Channels/BufferedChannelBase.cs b/src/Elastic.Channels/BufferedChannelBase.cs index 1deaf41..e340018 100644 --- a/src/Elastic.Channels/BufferedChannelBase.cs +++ b/src/Elastic.Channels/BufferedChannelBase.cs @@ -122,26 +122,26 @@ public abstract class BufferedChannelBase internal InboundBuffer InboundBuffer { get; } /// - protected BufferedChannelBase(TChannelOptions options) : this(options, null) { } + protected BufferedChannelBase(TChannelOptions options, string diagnosticsName) : this(options, null, diagnosticsName) { } /// - protected BufferedChannelBase(TChannelOptions options, ICollection>? callbackListeners) + protected BufferedChannelBase(TChannelOptions options, ICollection>? callbackListeners, string diagnosticsName) { TokenSource = options.CancellationToken.HasValue ? CancellationTokenSource.CreateLinkedTokenSource(options.CancellationToken.Value) : new CancellationTokenSource(); Options = options; - var listeners = callbackListeners == null ? new[] { Options } : callbackListeners.Concat(new[] { Options }).ToArray(); + var listeners = callbackListeners == null ? [Options] : callbackListeners.Concat([Options]).ToArray(); DiagnosticsListener = listeners - .Select(l => (l is IChannelDiagnosticsListener c) ? c : null) - .FirstOrDefault(e => e != null); + .OfType() + .FirstOrDefault(); if (DiagnosticsListener == null && !options.DisableDiagnostics) { // if no debug listener was already provided but was requested explicitly create one. - var l = new ChannelDiagnosticsListener(GetType().Name); + var l = new ChannelDiagnosticsListener(diagnosticsName); DiagnosticsListener = l; - listeners = listeners.Concat(new[] { l }).ToArray(); + listeners = listeners.Concat([l]).ToArray(); } _callbacks = new ChannelCallbackInvoker(listeners); diff --git a/src/Elastic.Channels/Diagnostics/NoopBufferedChannel.cs b/src/Elastic.Channels/Diagnostics/NoopBufferedChannel.cs index 4c461bf..c2adaeb 100644 --- a/src/Elastic.Channels/Diagnostics/NoopBufferedChannel.cs +++ b/src/Elastic.Channels/Diagnostics/NoopBufferedChannel.cs @@ -38,14 +38,14 @@ public class NoopChannelOptions : ChannelOptionsBase public NoopBufferedChannel( NoopChannelOptions options, ICollection>? channelListeners = null - ) : base(options, channelListeners) { } + ) : base(options, channelListeners, nameof(NoopBufferedChannel)) { } /// public NoopBufferedChannel( BufferOptions options, ICollection>? channelListeners = null, bool observeConcurrency = false - ) : base(new NoopChannelOptions { BufferOptions = options, TrackConcurrency = observeConcurrency }, channelListeners) + ) : base(new NoopChannelOptions { BufferOptions = options, TrackConcurrency = observeConcurrency }, channelListeners, nameof(NoopBufferedChannel)) { } diff --git a/src/Elastic.Channels/ResponseItemsBufferedChannelBase.cs b/src/Elastic.Channels/ResponseItemsBufferedChannelBase.cs index e480d10..54e76ea 100644 --- a/src/Elastic.Channels/ResponseItemsBufferedChannelBase.cs +++ b/src/Elastic.Channels/ResponseItemsBufferedChannelBase.cs @@ -32,12 +32,12 @@ public abstract class ResponseItemsBufferedChannelBase - protected ResponseItemsBufferedChannelBase(TChannelOptions options, ICollection>? callbackListeners) - : base(options, callbackListeners) { } + protected ResponseItemsBufferedChannelBase(TChannelOptions options, ICollection>? callbackListeners, string diagnosticsName) + : base(options, callbackListeners, diagnosticsName) { } /// - protected ResponseItemsBufferedChannelBase(TChannelOptions options) - : base(options) { } + protected ResponseItemsBufferedChannelBase(TChannelOptions options, string diagnosticsName) + : base(options, diagnosticsName) { } /// Based on should return a bool indicating if retry is needed protected abstract bool Retry(TResponse response); diff --git a/src/Elastic.Ingest.Apm/ApmChannel.cs b/src/Elastic.Ingest.Apm/ApmChannel.cs index caf6077..0ee8477 100644 --- a/src/Elastic.Ingest.Apm/ApmChannel.cs +++ b/src/Elastic.Ingest.Apm/ApmChannel.cs @@ -40,7 +40,7 @@ internal static class ApmChannelStatics public class ApmChannel : TransportChannelBase { /// - public ApmChannel(ApmChannelOptions options) : base(options) { } + public ApmChannel(ApmChannelOptions options) : base(options, nameof(ApmChannel)) { } //retry if APM server returns 429 /// diff --git a/src/Elastic.Ingest.Elasticsearch/DataStreams/DataStreamChannel.cs b/src/Elastic.Ingest.Elasticsearch/DataStreams/DataStreamChannel.cs index ffa6315..44f0af7 100644 --- a/src/Elastic.Ingest.Elasticsearch/DataStreams/DataStreamChannel.cs +++ b/src/Elastic.Ingest.Elasticsearch/DataStreams/DataStreamChannel.cs @@ -19,7 +19,8 @@ public class DataStreamChannel : ElasticsearchChannelBase options) : this(options, null) { } /// - public DataStreamChannel(DataStreamChannelOptions options, ICollection>? callbackListeners) : base(options, callbackListeners) + public DataStreamChannel(DataStreamChannelOptions options, ICollection>? callbackListeners, string? diagnosticsName = null) + : base(options, callbackListeners, diagnosticsName ?? nameof(DataStreamChannel)) { var dataStream = Options.DataStream.ToString(); diff --git a/src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelBase.Serialization.cs b/src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelBase.Serialization.cs index 953a360..8120783 100644 --- a/src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelBase.Serialization.cs +++ b/src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelBase.Serialization.cs @@ -10,7 +10,6 @@ using Elastic.Ingest.Elasticsearch.DataStreams; using Elastic.Ingest.Elasticsearch.Indices; using Elastic.Ingest.Elasticsearch.Serialization; -using Elastic.Ingest.Transport; namespace Elastic.Ingest.Elasticsearch; @@ -19,7 +18,6 @@ namespace Elastic.Ingest.Elasticsearch; /// Coordinates most of the sending to- and bootstrapping of Elasticsearch /// public abstract partial class ElasticsearchChannelBase - : TransportChannelBase where TChannelOptions : ElasticsearchChannelOptionsBase { private static ReadOnlySpan PlainIndexBytesSpan => """ diff --git a/src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelBase.cs b/src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelBase.cs index 8c6d39e..8a7284c 100644 --- a/src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelBase.cs +++ b/src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelBase.cs @@ -30,12 +30,12 @@ public abstract partial class ElasticsearchChannelBase where TChannelOptions : ElasticsearchChannelOptionsBase { /// - protected ElasticsearchChannelBase(TChannelOptions options, ICollection>? callbackListeners) - : base(options, callbackListeners) { } + protected ElasticsearchChannelBase(TChannelOptions options, ICollection>? callbackListeners, string diagnosticsName) + : base(options, callbackListeners, diagnosticsName) { } /// - protected ElasticsearchChannelBase(TChannelOptions options) - : base(options) { } + protected ElasticsearchChannelBase(TChannelOptions options, string diagnosticsName) + : base(options, diagnosticsName) { } /// protected override bool Retry(BulkResponse response) @@ -70,34 +70,22 @@ protected override bool RejectEvent((TEvent, BulkResponseItem) @event) => protected override Task ExportAsync(ITransport transport, ArraySegment page, CancellationToken ctx = default) { ctx = ctx == default ? TokenSource.Token : ctx; -#if NETSTANDARD2_1 - // Option is obsolete to prevent external users to set it. -#pragma warning disable CS0618 -// if (Options.UseReadOnlyMemory) -// #pragma warning restore CS0618 -// { -// var bytes = BulkRequestDataFactory.GetBytes(page, Options, CreateBulkOperationHeader); -// return transport.RequestAsync(HttpMethod.POST, BulkUrl, PostData.ReadOnlyMemory(bytes), RequestParams, ctx); -// } -#endif -#pragma warning disable IDE0022 // Use expression body for method return transport.RequestAsync(HttpMethod.POST, BulkUrl, PostData.StreamHandler(page, (_, _) => { - /* NOT USED */ + /* Synchronous code path never called */ }, - async (b, stream, ctx) => { await WriteBufferToStreamAsync(b, stream, Options, ctx).ConfigureAwait(false); }) + async (b, stream, t) => await WriteBufferToStreamAsync(b, stream, Options, t).ConfigureAwait(false)) , RequestParams, ctx); -#pragma warning restore IDE0022 // Use expression body for method } /// - protected class HeadIndexTemplateResponse : ElasticsearchResponse { } + protected class HeadIndexTemplateResponse : ElasticsearchResponse; /// - protected class PutIndexTemplateResponse : ElasticsearchResponse { } + protected class PutIndexTemplateResponse : ElasticsearchResponse; /// - protected class PutComponentTemplateResponse : ElasticsearchResponse { } + protected class PutComponentTemplateResponse : ElasticsearchResponse; } diff --git a/src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelOptionsBase.cs b/src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelOptionsBase.cs index 758deed..e77b02d 100644 --- a/src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelOptionsBase.cs +++ b/src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelOptionsBase.cs @@ -3,6 +3,8 @@ // See the LICENSE file in the project root for more information using System; +using System.Collections; +using System.Collections.Generic; using Elastic.Ingest.Elasticsearch.Serialization; using Elastic.Ingest.Transport; using Elastic.Transport; @@ -22,4 +24,8 @@ protected ElasticsearchChannelOptionsBase(ITransport transport) : base(transport /// public IElasticsearchEventWriter? EventWriter { get; set; } + /// Optionally set dynamic templates for event + public Func>? DynamicTemplates { get; set; } + + } diff --git a/src/Elastic.Ingest.Elasticsearch/Indices/IndexChannel.cs b/src/Elastic.Ingest.Elasticsearch/Indices/IndexChannel.cs index 8c07ed1..9646965 100644 --- a/src/Elastic.Ingest.Elasticsearch/Indices/IndexChannel.cs +++ b/src/Elastic.Ingest.Elasticsearch/Indices/IndexChannel.cs @@ -22,7 +22,8 @@ public class IndexChannel : ElasticsearchChannelBase options) : this(options, null) { } /// - public IndexChannel(IndexChannelOptions options, ICollection>? callbackListeners) : base(options, callbackListeners) + public IndexChannel(IndexChannelOptions options, ICollection>? callbackListeners, string? diagnosticsName = null) + : base(options, callbackListeners, diagnosticsName ?? nameof(IndexChannel)) { _url = base.BulkUrl; diff --git a/src/Elastic.Ingest.Elasticsearch/Serialization/BulkHeader.cs b/src/Elastic.Ingest.Elasticsearch/Serialization/BulkHeader.cs index 23a46f6..a5e429a 100644 --- a/src/Elastic.Ingest.Elasticsearch/Serialization/BulkHeader.cs +++ b/src/Elastic.Ingest.Elasticsearch/Serialization/BulkHeader.cs @@ -10,7 +10,6 @@ namespace Elastic.Ingest.Elasticsearch.Serialization; /// TODO public struct BulkHeader { - /// The index to write to, never set when writing using public string? Index { get; set; } diff --git a/src/Elastic.Ingest.OpenTelemetry/CustomOtlpTraceExporter.cs b/src/Elastic.Ingest.OpenTelemetry/CustomOtlpTraceExporter.cs index 03d8538..7233794 100644 --- a/src/Elastic.Ingest.OpenTelemetry/CustomOtlpTraceExporter.cs +++ b/src/Elastic.Ingest.OpenTelemetry/CustomOtlpTraceExporter.cs @@ -64,7 +64,7 @@ public TraceChannel(TraceChannelOptions options) : this(options, null) { } /// public TraceChannel(TraceChannelOptions options, ICollection>? callbackListeners) - : base(options, callbackListeners) { + : base(options, callbackListeners, nameof(TraceChannel)) { var o = new OtlpExporterOptions { Endpoint = options.Endpoint, diff --git a/src/Elastic.Ingest.Transport/TransportChannelBase.cs b/src/Elastic.Ingest.Transport/TransportChannelBase.cs index 98515aa..2b33841 100644 --- a/src/Elastic.Ingest.Transport/TransportChannelBase.cs +++ b/src/Elastic.Ingest.Transport/TransportChannelBase.cs @@ -23,12 +23,12 @@ public abstract class TransportChannelBase - protected TransportChannelBase(TChannelOptions options, ICollection>? callbackListeners) - : base(options, callbackListeners) { } + protected TransportChannelBase(TChannelOptions options, ICollection>? callbackListeners, string diagnosticsName) + : base(options, callbackListeners, diagnosticsName) { } /// - protected TransportChannelBase(TChannelOptions options) - : base(options) { } + protected TransportChannelBase(TChannelOptions options, string diagnosticsName) + : base(options, diagnosticsName) { } /// Implement sending the current of the buffer to the output. /// diff --git a/tests/Elastic.Ingest.Elasticsearch.Tests/DataStreamChannelTests.cs b/tests/Elastic.Ingest.Elasticsearch.Tests/DataStreamChannelTests.cs index f1dc0e5..3f7987e 100644 --- a/tests/Elastic.Ingest.Elasticsearch.Tests/DataStreamChannelTests.cs +++ b/tests/Elastic.Ingest.Elasticsearch.Tests/DataStreamChannelTests.cs @@ -2,6 +2,7 @@ // Elasticsearch B.V licenses this file to you under the Apache 2.0 License. // See the LICENSE file in the project root for more information +using System; using System.IO; using System.Threading; using Elastic.Ingest.Elasticsearch.DataStreams; @@ -20,6 +21,7 @@ public void DataStreamChannel_UsesCorrectUrlAndOperationHeader() var wait = new ManualResetEvent(false); + Exception exception = null; using var channel = new DataStreamChannel(new DataStreamChannelOptions(Transport) { BufferOptions = new() @@ -27,6 +29,11 @@ public void DataStreamChannel_UsesCorrectUrlAndOperationHeader() OutboundBufferMaxSize = 1 }, DataStream = new("type"), + ExportExceptionCallback = e => + { + exception = e; + wait.Set(); + }, ExportResponseCallback = (response, _) => { callDetails = response.ApiCallDetails; @@ -35,7 +42,9 @@ public void DataStreamChannel_UsesCorrectUrlAndOperationHeader() }); channel.TryWrite(new TestDocument()); - wait.WaitOne(); + var signalled = wait.WaitOne(TimeSpan.FromSeconds(5)); + signalled.Should().BeTrue("because ExportResponseCallback should have been called"); + exception.Should().BeNull(); callDetails.Uri.AbsolutePath.Should().Be("/type-generic-default/_bulk"); diff --git a/tests/Elastic.Ingest.Elasticsearch.Tests/IndexChannelTests.cs b/tests/Elastic.Ingest.Elasticsearch.Tests/IndexChannelTests.cs index c44a8c7..7e1cbae 100644 --- a/tests/Elastic.Ingest.Elasticsearch.Tests/IndexChannelTests.cs +++ b/tests/Elastic.Ingest.Elasticsearch.Tests/IndexChannelTests.cs @@ -28,12 +28,18 @@ private void ExecuteAndAssert(string expectedUrl, string expectedOperationHeader var wait = new ManualResetEvent(false); + Exception exception = null; var options = new IndexChannelOptions(Transport) { BufferOptions = new() { OutboundBufferMaxSize = 1 }, + ExportExceptionCallback = e => + { + exception = e; + wait.Set(); + }, ExportResponseCallback = (response, _) => { callDetails = response.ApiCallDetails; @@ -50,7 +56,9 @@ private void ExecuteAndAssert(string expectedUrl, string expectedOperationHeader using var channel = new IndexChannel(options); channel.TryWrite(new TestDocument()); - wait.WaitOne(); + var signalled = wait.WaitOne(TimeSpan.FromSeconds(5)); + signalled.Should().BeTrue("because ExportResponseCallback should have been called"); + exception.Should().BeNull(); callDetails.Should().NotBeNull(); callDetails.Uri.AbsolutePath.Should().Be(expectedUrl); diff --git a/tests/Elastic.Ingest.Elasticsearch.Tests/SubPathTests.cs b/tests/Elastic.Ingest.Elasticsearch.Tests/SubPathTests.cs index cef3ae6..bb34c52 100644 --- a/tests/Elastic.Ingest.Elasticsearch.Tests/SubPathTests.cs +++ b/tests/Elastic.Ingest.Elasticsearch.Tests/SubPathTests.cs @@ -29,12 +29,18 @@ private void ExecuteAndAssert(string expectedUrl, string expectedOperationHeader ApiCallDetails callDetails = null; var wait = new ManualResetEvent(false); + Exception exception = null; var options = new IndexChannelOptions(Transport) { BufferOptions = new() { OutboundBufferMaxSize = 1 }, + ExportExceptionCallback = e => + { + exception = e; + wait.Set(); + }, ExportResponseCallback = (response, _) => { callDetails = response.ApiCallDetails; @@ -51,7 +57,9 @@ private void ExecuteAndAssert(string expectedUrl, string expectedOperationHeader using var channel = new IndexChannel(options); channel.TryWrite(new TestDocument()); - wait.WaitOne(); + var signalled = wait.WaitOne(TimeSpan.FromSeconds(5)); + signalled.Should().BeTrue("because ExportResponseCallback should have been called"); + exception.Should().BeNull(); callDetails.Should().NotBeNull(); callDetails.Uri.AbsolutePath.Should().Be(expectedUrl);