Skip to content

Commit

Permalink
Remove last reflection based code path
Browse files Browse the repository at this point in the history
  • Loading branch information
Mpdreamz committed Oct 8, 2024
1 parent 9991c38 commit 9c94df3
Show file tree
Hide file tree
Showing 15 changed files with 66 additions and 48 deletions.
14 changes: 7 additions & 7 deletions src/Elastic.Channels/BufferedChannelBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -122,26 +122,26 @@ public abstract class BufferedChannelBase<TChannelOptions, TEvent, TResponse>
internal InboundBuffer<TEvent> InboundBuffer { get; }

/// <inheritdoc cref="BufferedChannelBase{TChannelOptions,TEvent,TResponse}"/>
protected BufferedChannelBase(TChannelOptions options) : this(options, null) { }
protected BufferedChannelBase(TChannelOptions options, string diagnosticsName) : this(options, null, diagnosticsName) { }

/// <inheritdoc cref="BufferedChannelBase{TChannelOptions,TEvent,TResponse}"/>
protected BufferedChannelBase(TChannelOptions options, ICollection<IChannelCallbacks<TEvent, TResponse>>? callbackListeners)
protected BufferedChannelBase(TChannelOptions options, ICollection<IChannelCallbacks<TEvent, TResponse>>? callbackListeners, string diagnosticsName)
{
TokenSource = options.CancellationToken.HasValue
? CancellationTokenSource.CreateLinkedTokenSource(options.CancellationToken.Value)
: new CancellationTokenSource();
Options = options;

var listeners = callbackListeners == null ? new[] { Options } : callbackListeners.Concat(new[] { Options }).ToArray();
var listeners = callbackListeners == null ? [Options] : callbackListeners.Concat([Options]).ToArray();
DiagnosticsListener = listeners
.Select(l => (l is IChannelDiagnosticsListener c) ? c : null)
.FirstOrDefault(e => e != null);
.OfType<IChannelDiagnosticsListener?>()
.FirstOrDefault();
if (DiagnosticsListener == null && !options.DisableDiagnostics)
{
// if no debug listener was already provided but was requested explicitly create one.
var l = new ChannelDiagnosticsListener<TEvent, TResponse>(GetType().Name);
var l = new ChannelDiagnosticsListener<TEvent, TResponse>(diagnosticsName);
DiagnosticsListener = l;
listeners = listeners.Concat(new[] { l }).ToArray();
listeners = listeners.Concat([l]).ToArray();
}
_callbacks = new ChannelCallbackInvoker<TEvent, TResponse>(listeners);

Expand Down
4 changes: 2 additions & 2 deletions src/Elastic.Channels/Diagnostics/NoopBufferedChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,14 @@ public class NoopChannelOptions : ChannelOptionsBase<NoopEvent, NoopResponse>
public NoopBufferedChannel(
NoopChannelOptions options,
ICollection<IChannelCallbacks<NoopEvent, NoopResponse>>? channelListeners = null
) : base(options, channelListeners) { }
) : base(options, channelListeners, nameof(NoopBufferedChannel)) { }

/// <inheritdoc cref="NoopBufferedChannel"/>
public NoopBufferedChannel(
BufferOptions options,
ICollection<IChannelCallbacks<NoopEvent, NoopResponse>>? channelListeners = null,
bool observeConcurrency = false
) : base(new NoopChannelOptions { BufferOptions = options, TrackConcurrency = observeConcurrency }, channelListeners)
) : base(new NoopChannelOptions { BufferOptions = options, TrackConcurrency = observeConcurrency }, channelListeners, nameof(NoopBufferedChannel))
{

}
Expand Down
8 changes: 4 additions & 4 deletions src/Elastic.Channels/ResponseItemsBufferedChannelBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@ public abstract class ResponseItemsBufferedChannelBase<TChannelOptions, TEvent,
where TResponse : class, new()
{
/// <inheritdoc cref="ResponseItemsBufferedChannelBase{TChannelOptions,TEvent,TResponse,TBulkResponseItem}"/>
protected ResponseItemsBufferedChannelBase(TChannelOptions options, ICollection<IChannelCallbacks<TEvent, TResponse>>? callbackListeners)
: base(options, callbackListeners) { }
protected ResponseItemsBufferedChannelBase(TChannelOptions options, ICollection<IChannelCallbacks<TEvent, TResponse>>? callbackListeners, string diagnosticsName)
: base(options, callbackListeners, diagnosticsName) { }

/// <inheritdoc cref="ResponseItemsBufferedChannelBase{TChannelOptions,TEvent,TResponse,TBulkResponseItem}"/>
protected ResponseItemsBufferedChannelBase(TChannelOptions options)
: base(options) { }
protected ResponseItemsBufferedChannelBase(TChannelOptions options, string diagnosticsName)
: base(options, diagnosticsName) { }

/// <summary> Based on <typeparamref name="TResponse"/> should return a bool indicating if retry is needed</summary>
protected abstract bool Retry(TResponse response);
Expand Down
2 changes: 1 addition & 1 deletion src/Elastic.Ingest.Apm/ApmChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ internal static class ApmChannelStatics
public class ApmChannel : TransportChannelBase<ApmChannelOptions, IIntakeObject, EventIntakeResponse, IntakeErrorItem>
{
/// <inheritdoc cref="ApmChannel"/>
public ApmChannel(ApmChannelOptions options) : base(options) { }
public ApmChannel(ApmChannelOptions options) : base(options, nameof(ApmChannel)) { }

//retry if APM server returns 429
/// <inheritdoc cref="ResponseItemsBufferedChannelBase{TChannelOptions,TEvent,TResponse,TBulkResponseItem}.Retry"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ public class DataStreamChannel<TEvent> : ElasticsearchChannelBase<TEvent, DataSt
public DataStreamChannel(DataStreamChannelOptions<TEvent> options) : this(options, null) { }

/// <inheritdoc cref="DataStreamChannel{TEvent}"/>
public DataStreamChannel(DataStreamChannelOptions<TEvent> options, ICollection<IChannelCallbacks<TEvent, BulkResponse>>? callbackListeners) : base(options, callbackListeners)
public DataStreamChannel(DataStreamChannelOptions<TEvent> options, ICollection<IChannelCallbacks<TEvent, BulkResponse>>? callbackListeners, string? diagnosticsName = null)
: base(options, callbackListeners, diagnosticsName ?? nameof(DataStreamChannel<TEvent>))
{
var dataStream = Options.DataStream.ToString();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
using Elastic.Ingest.Elasticsearch.DataStreams;
using Elastic.Ingest.Elasticsearch.Indices;
using Elastic.Ingest.Elasticsearch.Serialization;
using Elastic.Ingest.Transport;

namespace Elastic.Ingest.Elasticsearch;

Expand All @@ -19,7 +18,6 @@ namespace Elastic.Ingest.Elasticsearch;
/// <para>Coordinates most of the sending to- and bootstrapping of Elasticsearch</para>
/// </summary>
public abstract partial class ElasticsearchChannelBase<TEvent, TChannelOptions>
: TransportChannelBase<TChannelOptions, TEvent, BulkResponse, BulkResponseItem>
where TChannelOptions : ElasticsearchChannelOptionsBase<TEvent>
{
private static ReadOnlySpan<byte> PlainIndexBytesSpan => """
Expand Down
30 changes: 9 additions & 21 deletions src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,12 @@ public abstract partial class ElasticsearchChannelBase<TEvent, TChannelOptions>
where TChannelOptions : ElasticsearchChannelOptionsBase<TEvent>
{
/// <inheritdoc cref="ElasticsearchChannelBase{TEvent,TChannelOptions}"/>
protected ElasticsearchChannelBase(TChannelOptions options, ICollection<IChannelCallbacks<TEvent, BulkResponse>>? callbackListeners)
: base(options, callbackListeners) { }
protected ElasticsearchChannelBase(TChannelOptions options, ICollection<IChannelCallbacks<TEvent, BulkResponse>>? callbackListeners, string diagnosticsName)
: base(options, callbackListeners, diagnosticsName) { }

/// <inheritdoc cref="ElasticsearchChannelBase{TEvent,TChannelOptions}"/>
protected ElasticsearchChannelBase(TChannelOptions options)
: base(options) { }
protected ElasticsearchChannelBase(TChannelOptions options, string diagnosticsName)
: base(options, diagnosticsName) { }

/// <inheritdoc cref="ResponseItemsBufferedChannelBase{TChannelOptions,TEvent,TResponse,TBulkResponseItem}.Retry"/>
protected override bool Retry(BulkResponse response)
Expand Down Expand Up @@ -70,34 +70,22 @@ protected override bool RejectEvent((TEvent, BulkResponseItem) @event) =>
protected override Task<BulkResponse> ExportAsync(ITransport transport, ArraySegment<TEvent> page, CancellationToken ctx = default)
{
ctx = ctx == default ? TokenSource.Token : ctx;
#if NETSTANDARD2_1
// Option is obsolete to prevent external users to set it.
#pragma warning disable CS0618
// if (Options.UseReadOnlyMemory)
// #pragma warning restore CS0618
// {
// var bytes = BulkRequestDataFactory.GetBytes(page, Options, CreateBulkOperationHeader);
// return transport.RequestAsync<BulkResponse>(HttpMethod.POST, BulkUrl, PostData.ReadOnlyMemory(bytes), RequestParams, ctx);
// }
#endif
#pragma warning disable IDE0022 // Use expression body for method
return transport.RequestAsync<BulkResponse>(HttpMethod.POST, BulkUrl,
PostData.StreamHandler(page,
(_, _) =>
{
/* NOT USED */
/* Synchronous code path never called */
},
async (b, stream, ctx) => { await WriteBufferToStreamAsync(b, stream, Options, ctx).ConfigureAwait(false); })
async (b, stream, t) => await WriteBufferToStreamAsync(b, stream, Options, t).ConfigureAwait(false))
, RequestParams, ctx);
#pragma warning restore IDE0022 // Use expression body for method
}

/// <summary> </summary>
protected class HeadIndexTemplateResponse : ElasticsearchResponse { }
protected class HeadIndexTemplateResponse : ElasticsearchResponse;

/// <summary> </summary>
protected class PutIndexTemplateResponse : ElasticsearchResponse { }
protected class PutIndexTemplateResponse : ElasticsearchResponse;

/// <summary> </summary>
protected class PutComponentTemplateResponse : ElasticsearchResponse { }
protected class PutComponentTemplateResponse : ElasticsearchResponse;
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
// See the LICENSE file in the project root for more information

using System;
using System.Collections;
using System.Collections.Generic;
using Elastic.Ingest.Elasticsearch.Serialization;
using Elastic.Ingest.Transport;
using Elastic.Transport;
Expand All @@ -22,4 +24,8 @@ protected ElasticsearchChannelOptionsBase(ITransport transport) : base(transport
/// </summary>
public IElasticsearchEventWriter<TEvent>? EventWriter { get; set; }

/// <summary> Optionally set dynamic templates for event</summary>
public Func<TEvent, IDictionary<string, string>>? DynamicTemplates { get; set; }


}
3 changes: 2 additions & 1 deletion src/Elastic.Ingest.Elasticsearch/Indices/IndexChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ public class IndexChannel<TEvent> : ElasticsearchChannelBase<TEvent, IndexChanne
public IndexChannel(IndexChannelOptions<TEvent> options) : this(options, null) { }

/// <inheritdoc cref="IndexChannel{TEvent}"/>
public IndexChannel(IndexChannelOptions<TEvent> options, ICollection<IChannelCallbacks<TEvent, BulkResponse>>? callbackListeners) : base(options, callbackListeners)
public IndexChannel(IndexChannelOptions<TEvent> options, ICollection<IChannelCallbacks<TEvent, BulkResponse>>? callbackListeners, string? diagnosticsName = null)
: base(options, callbackListeners, diagnosticsName ?? nameof(IndexChannel<TEvent>))
{
_url = base.BulkUrl;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ namespace Elastic.Ingest.Elasticsearch.Serialization;
/// <summary> TODO </summary>
public struct BulkHeader
{

/// <summary> The index to write to, never set when writing using <see cref="DataStreamChannel{TEvent}"/> </summary>
public string? Index { get; set; }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public TraceChannel(TraceChannelOptions options) : this(options, null) { }

/// <summary> </summary>
public TraceChannel(TraceChannelOptions options, ICollection<IChannelCallbacks<Activity, TraceExportResult>>? callbackListeners)
: base(options, callbackListeners) {
: base(options, callbackListeners, nameof(TraceChannel)) {
var o = new OtlpExporterOptions
{
Endpoint = options.Endpoint,
Expand Down
8 changes: 4 additions & 4 deletions src/Elastic.Ingest.Transport/TransportChannelBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@ public abstract class TransportChannelBase<TChannelOptions, TEvent, TResponse, T
where TResponse : TransportResponse, new()
{
/// <inheritdoc cref="TransportChannelBase{TChannelOptions,TEvent,TResponse,TBulkResponseItem}"/>
protected TransportChannelBase(TChannelOptions options, ICollection<IChannelCallbacks<TEvent, TResponse>>? callbackListeners)
: base(options, callbackListeners) { }
protected TransportChannelBase(TChannelOptions options, ICollection<IChannelCallbacks<TEvent, TResponse>>? callbackListeners, string diagnosticsName)
: base(options, callbackListeners, diagnosticsName) { }

/// <inheritdoc cref="TransportChannelBase{TChannelOptions,TEvent,TResponse,TBulkResponseItem}"/>
protected TransportChannelBase(TChannelOptions options)
: base(options) { }
protected TransportChannelBase(TChannelOptions options, string diagnosticsName)
: base(options, diagnosticsName) { }

/// <summary> Implement sending the current <paramref name="page"/> of the buffer to the output. </summary>
/// <param name="transport"></param>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information

using System;
using System.IO;
using System.Threading;
using Elastic.Ingest.Elasticsearch.DataStreams;
Expand All @@ -20,13 +21,19 @@ public void DataStreamChannel_UsesCorrectUrlAndOperationHeader()

var wait = new ManualResetEvent(false);

Exception exception = null;
using var channel = new DataStreamChannel<TestDocument>(new DataStreamChannelOptions<TestDocument>(Transport)
{
BufferOptions = new()
{
OutboundBufferMaxSize = 1
},
DataStream = new("type"),
ExportExceptionCallback = e =>
{
exception = e;
wait.Set();
},
ExportResponseCallback = (response, _) =>
{
callDetails = response.ApiCallDetails;
Expand All @@ -35,7 +42,9 @@ public void DataStreamChannel_UsesCorrectUrlAndOperationHeader()
});

channel.TryWrite(new TestDocument());
wait.WaitOne();
var signalled = wait.WaitOne(TimeSpan.FromSeconds(5));
signalled.Should().BeTrue("because ExportResponseCallback should have been called");
exception.Should().BeNull();

callDetails.Uri.AbsolutePath.Should().Be("/type-generic-default/_bulk");

Expand Down
10 changes: 9 additions & 1 deletion tests/Elastic.Ingest.Elasticsearch.Tests/IndexChannelTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,18 @@ private void ExecuteAndAssert(string expectedUrl, string expectedOperationHeader

var wait = new ManualResetEvent(false);

Exception exception = null;
var options = new IndexChannelOptions<TestDocument>(Transport)
{
BufferOptions = new()
{
OutboundBufferMaxSize = 1
},
ExportExceptionCallback = e =>
{
exception = e;
wait.Set();
},
ExportResponseCallback = (response, _) =>
{
callDetails = response.ApiCallDetails;
Expand All @@ -50,7 +56,9 @@ private void ExecuteAndAssert(string expectedUrl, string expectedOperationHeader
using var channel = new IndexChannel<TestDocument>(options);

channel.TryWrite(new TestDocument());
wait.WaitOne();
var signalled = wait.WaitOne(TimeSpan.FromSeconds(5));
signalled.Should().BeTrue("because ExportResponseCallback should have been called");
exception.Should().BeNull();

callDetails.Should().NotBeNull();
callDetails.Uri.AbsolutePath.Should().Be(expectedUrl);
Expand Down
10 changes: 9 additions & 1 deletion tests/Elastic.Ingest.Elasticsearch.Tests/SubPathTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,18 @@ private void ExecuteAndAssert(string expectedUrl, string expectedOperationHeader
ApiCallDetails callDetails = null;
var wait = new ManualResetEvent(false);

Exception exception = null;
var options = new IndexChannelOptions<TestDocument>(Transport)
{
BufferOptions = new()
{
OutboundBufferMaxSize = 1
},
ExportExceptionCallback = e =>
{
exception = e;
wait.Set();
},
ExportResponseCallback = (response, _) =>
{
callDetails = response.ApiCallDetails;
Expand All @@ -51,7 +57,9 @@ private void ExecuteAndAssert(string expectedUrl, string expectedOperationHeader
using var channel = new IndexChannel<TestDocument>(options);

channel.TryWrite(new TestDocument());
wait.WaitOne();
var signalled = wait.WaitOne(TimeSpan.FromSeconds(5));
signalled.Should().BeTrue("because ExportResponseCallback should have been called");
exception.Should().BeNull();

callDetails.Should().NotBeNull();
callDetails.Uri.AbsolutePath.Should().Be(expectedUrl);
Expand Down

0 comments on commit 9c94df3

Please sign in to comment.