Skip to content

Commit

Permalink
Update Elastic packages to avoid transport failures (#464)
Browse files Browse the repository at this point in the history
- Updated Elastic.Clients.Elasticsearch
- Updated Elastic.Ingest.Elasticsearch
- Updated Microsoft.Extensions.Hosting in examples

Closes #463
  • Loading branch information
stevejgordon authored Nov 26, 2024
1 parent b173202 commit 2f97b9d
Show file tree
Hide file tree
Showing 24 changed files with 104 additions and 107 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@
<ItemGroup>
<PackageReference Include="Elastic.Apm" Version="1.22.0" />
<PackageReference Include="Serilog" Version="2.10.0" />
<PackageReference Include="Microsoft.Extensions.Hosting" Version="7.0.0" />
<PackageReference Include="Microsoft.Extensions.Hosting" Version="8.0.1" />
<PackageReference Include="Elastic.Elasticsearch.Ephemeral" Version="0.4.3" />
<PackageReference Include="Elastic.Clients.Elasticsearch" Version="8.12.1" />
<PackageReference Include="Elastic.Clients.Elasticsearch" Version="8.16.2" />
<PackageReference Include="Serilog.Extensions.Hosting" Version="5.0.1" />
<PackageReference Include="Serilog.Sinks.Console" Version="4.1.0" />
</ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

<ItemGroup>
<PackageReference Include="Elastic.Elasticsearch.Ephemeral" Version="0.4.3" />
<PackageReference Include="Elastic.Clients.Elasticsearch" Version="8.12.1" />
<PackageReference Include="Elastic.Clients.Elasticsearch" Version="8.16.2" />
</ItemGroup>

<ItemGroup>
Expand Down
2 changes: 1 addition & 1 deletion examples/aspnetcore-with-serilog/AspnetCoreExample.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
<PackageReference Include="Serilog.AspNetCore" Version="3.2.0" />
<PackageReference Include="Serilog.Sinks.Async" Version="1.5.0" />
<PackageReference Include="Serilog.Sinks.Console" Version="3.1.1" />
<PackageReference Include="Microsoft.Extensions.Hosting" Version="5.0.0"/>
<PackageReference Include="Microsoft.Extensions.Hosting" Version="8.0.1" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\src\Elastic.Apm.SerilogEnricher\Elastic.Apm.SerilogEnricher.csproj" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,19 +33,18 @@ public ElasticsearchBenchmarkExporter(ElasticsearchBenchmarkExporterOptions opti
{
Options = options;
var config = Options.CreateTransportConfiguration();
Transport = new DistributedTransport<TransportConfiguration>(config);
Transport = new DistributedTransport<ITransportConfiguration>(config);
}

// ReSharper disable once UnusedMember.Global
/// <summary> Exports benchmark results to Elasticsearch </summary>
public ElasticsearchBenchmarkExporter(ElasticsearchBenchmarkExporterOptions options, Func<ElasticsearchBenchmarkExporterOptions, TransportConfiguration> configure)
{
Options = options;
Transport = new DistributedTransport<TransportConfiguration>(configure(Options));
Transport = new DistributedTransport<ITransportConfiguration>(configure(Options));
}


private ITransport<TransportConfiguration> Transport { get; }
private ITransport<ITransportConfiguration> Transport { get; }
private ElasticsearchBenchmarkExporterOptions Options { get; }

// We only log when we cannot write to Elasticsearch
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Licensed to Elasticsearch B.V under one or more agreements.
// Licensed to Elasticsearch B.V under one or more agreements.
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information

Expand Down Expand Up @@ -145,9 +145,9 @@ private NodePool CreateNodePool()
}


internal TransportConfiguration CreateTransportConfiguration()
internal TransportConfigurationDescriptor CreateTransportConfiguration()
{
var settings = new TransportConfiguration(CreateNodePool(), productRegistration: ElasticsearchProductRegistration.Default);
var settings = new TransportConfigurationDescriptor(CreateNodePool(), productRegistration: ElasticsearchProductRegistration.Default);
if (EnableDebugMode)
settings.EnableDebugMode();
return settings;
Expand Down
39 changes: 35 additions & 4 deletions src/Elastic.Extensions.Logging/ElasticsearchLoggerProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@

using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Elastic.Channels;
using Elastic.Channels.Diagnostics;
using Elastic.Extensions.Logging.Options;
Expand All @@ -18,6 +20,10 @@
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;

#if NETSTANDARD2_1_OR_GREATER
using System.Buffers;
#endif

namespace Elastic.Extensions.Logging
{
/// <summary>
Expand All @@ -33,6 +39,20 @@ public class ElasticsearchLoggerProvider : ILoggerProvider, ISupportExternalScop
private IExternalScopeProvider? _scopeProvider;
private IBufferedChannel<LogEvent> _shipper;

private static readonly LogEventWriter LogEventWriterInstance = new()
{
WriteToStreamAsync = static async (stream, logEvent, ctx) => await logEvent.SerializeAsync(stream, ctx).ConfigureAwait(false),
#if NETSTANDARD2_1_OR_GREATER
WriteToArrayBuffer = static (arrayBufferWriter, logEvent) =>
{
var serialized = logEvent.SerializeToUtf8Bytes(); // TODO - Performance optimisation to avoid array allocation
var span = arrayBufferWriter.GetSpan(serialized.Length);
serialized.AsSpan().CopyTo(span);
arrayBufferWriter.Advance(serialized.Length);
}
#endif
};

/// <inheritdoc cref="IChannelDiagnosticsListener"/>
public IChannelDiagnosticsListener? DiagnosticsListener { get; }

Expand Down Expand Up @@ -132,16 +152,16 @@ private static ITransport CreateTransport(ElasticsearchLoggerOptions loggerOptio
if (loggerOptions.Transport != null) return loggerOptions.Transport;

var connectionPool = CreateNodePool(loggerOptions);
var config = new TransportConfiguration(connectionPool, productRegistration: ElasticsearchProductRegistration.Default);
var config = new TransportConfigurationDescriptor(connectionPool, productRegistration: ElasticsearchProductRegistration.Default);
// Cloud sets authentication as required parameter in the constructor
if (loggerOptions.ShipTo.NodePoolType != NodePoolType.Cloud)
config = SetAuthenticationOnTransport(loggerOptions, config);

var transport = new DistributedTransport<TransportConfiguration>(config);
var transport = new DistributedTransport<ITransportConfiguration>(config);
return transport;
}

private static TransportConfiguration SetAuthenticationOnTransport(ElasticsearchLoggerOptions loggerOptions, TransportConfiguration config)
private static TransportConfigurationDescriptor SetAuthenticationOnTransport(ElasticsearchLoggerOptions loggerOptions, TransportConfigurationDescriptor config)
{
var apiKey = loggerOptions.ShipTo.ApiKey;
var username = loggerOptions.ShipTo.Username;
Expand Down Expand Up @@ -177,11 +197,13 @@ private IBufferedChannel<LogEvent> CreatIngestChannel(ElasticsearchLoggerOptions
else
{
var dataStreamNameOptions = loggerOptions.DataStream ?? new DataStreamNameOptions();

var indexChannelOptions = new DataStreamChannelOptions<LogEvent>(transport)
{
DataStream = new DataStreamName(dataStreamNameOptions.Type, dataStreamNameOptions.DataSet, dataStreamNameOptions.Namespace),
WriteEvent = async (stream, ctx, logEvent) => await logEvent.SerializeAsync(stream, ctx).ConfigureAwait(false),
EventWriter = LogEventWriterInstance
};

SetupChannelOptions(_channelConfigurations, indexChannelOptions);
var channel = new EcsDataStreamChannel<LogEvent>(indexChannelOptions);
channel.BootstrapElasticsearch(loggerOptions.BootstrapMethod, loggerOptions.IlmPolicy);
Expand All @@ -191,5 +213,14 @@ private IBufferedChannel<LogEvent> CreatIngestChannel(ElasticsearchLoggerOptions

/// <inheritdoc cref="IChannelProvider.GetChannel"/>
public IBufferedChannel<LogEvent> GetChannel() => _shipper;

private sealed class LogEventWriter : IElasticsearchEventWriter<LogEvent>
{
#if NETSTANDARD2_1_OR_GREATER
public Action<ArrayBufferWriter<byte>, LogEvent>? WriteToArrayBuffer { get; set; }
#endif

public Func<Stream, LogEvent, CancellationToken, Task>? WriteToStreamAsync { get; set; }
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="Elastic.Ingest.Elasticsearch" Version="0.7.2" />
<PackageReference Include="Elastic.Ingest.Elasticsearch" Version="0.7.5" />
</ItemGroup>

</Project>
10 changes: 5 additions & 5 deletions src/Elastic.NLog.Targets/ElasticsearchTarget.cs
Original file line number Diff line number Diff line change
Expand Up @@ -174,12 +174,12 @@ protected override void InitializeTarget()
var indexOffset = string.IsNullOrEmpty(indexOffsetHours) ? default(TimeSpan?) : TimeSpan.FromHours(int.Parse(indexOffsetHours));

var connectionPool = CreateNodePool();
var config = new TransportConfiguration(connectionPool, productRegistration: ElasticsearchProductRegistration.Default);
var config = new TransportConfigurationDescriptor(connectionPool, productRegistration: ElasticsearchProductRegistration.Default);
// Cloud sets authentication as required parameter in the constructor
if (NodePoolType != ElasticPoolType.Cloud)
config = SetAuthenticationOnTransport(config);

var transport = new DistributedTransport<TransportConfiguration>(config);
var transport = new DistributedTransport<ITransportConfiguration>(config);
if (!string.IsNullOrEmpty(indexFormat))
{
_channel = CreateIndexChannel(transport, indexFormat, indexOffset, IndexOperation);
Expand All @@ -205,7 +205,7 @@ private void SetupChannelOptions(ElasticsearchChannelOptionsBase<NLogEcsDocument
ConfigureChannel?.Invoke(channelOptions);
}

private EcsDataStreamChannel<NLogEcsDocument> CreateDataStreamChannel(DistributedTransport<TransportConfiguration> transport)
private EcsDataStreamChannel<NLogEcsDocument> CreateDataStreamChannel(DistributedTransport<ITransportConfiguration> transport)
{
var ilmPolicy = IlmPolicy?.Render(LogEventInfo.CreateNullEvent());
var dataStreamType = DataStreamType?.Render(LogEventInfo.CreateNullEvent()) ?? string.Empty;
Expand All @@ -221,7 +221,7 @@ private EcsDataStreamChannel<NLogEcsDocument> CreateDataStreamChannel(Distribute
return channel;
}

private EcsIndexChannel<NLogEcsDocument> CreateIndexChannel(DistributedTransport<TransportConfiguration> transport, string indexFormat, TimeSpan? indexOffset, OperationMode indexOperation)
private EcsIndexChannel<NLogEcsDocument> CreateIndexChannel(DistributedTransport<ITransportConfiguration> transport, string indexFormat, TimeSpan? indexOffset, OperationMode indexOperation)
{
var indexChannelOptions = new IndexChannelOptions<NLogEcsDocument>(transport)
{
Expand Down Expand Up @@ -300,7 +300,7 @@ private NodePool CreateNodePool()
}
}

private TransportConfiguration SetAuthenticationOnTransport(TransportConfiguration config)
private TransportConfigurationDescriptor SetAuthenticationOnTransport(TransportConfigurationDescriptor config)
{
var apiKey = ApiKey?.Render(LogEventInfo.CreateNullEvent()) ?? string.Empty;
var username = Username?.Render(LogEventInfo.CreateNullEvent()) ?? string.Empty;
Expand Down
7 changes: 3 additions & 4 deletions src/Elastic.Serilog.Sinks/ConfigSinkExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ private static void SetBufferOptions(ElasticsearchSinkOptions sinkOptions, int?
};

private static ElasticsearchSinkOptions CreateSinkOptions(
TransportConfiguration transportConfig,
TransportConfigurationDescriptor transportConfig,
BootstrapMethod bootstrapMethod, string? dataStream, string? ilmPolicy, bool? includeHost,
bool? includeActivity, bool? includeProcess, bool? includeUser, ICollection<string>? filterProperties
)
Expand Down Expand Up @@ -187,7 +187,7 @@ private static ElasticsearchSinkOptions CreateSinkOptions(
return sinkOptions;
}

private static void SetTransportConfig(TransportConfiguration transportConfig,
private static void SetTransportConfig(TransportConfigurationDescriptor transportConfig,
string? apiKey, string? username, string? password,
Uri? proxy, string? proxyUsername, string? proxyPassword, string? fingerprint, bool debugMode
)
Expand All @@ -209,7 +209,6 @@ private static void SetTransportConfig(TransportConfiguration transportConfig,
transportConfig.Authentication(new ApiKey(apiKey));
}


/// <summary>
/// Write logs directly to Elastic Cloud ( https://cloud.elastic.co/ ).
/// <para><paramref name="cloudId"/> describes your deployments endpoints (can be found in the Admin Console)</para>
Expand All @@ -222,7 +221,7 @@ public static LoggerConfiguration ElasticCloud(
string username,
string password,
Action<ElasticsearchSinkOptions>? configureOptions = null,
Action<TransportConfiguration>? configureTransport = null,
Action<TransportConfigurationDescriptor>? configureTransport = null,
LoggingLevelSwitch? levelSwitch = null,
LogEventLevel restrictedToMinimumLevel = LevelAlias.Minimum
)
Expand Down
12 changes: 6 additions & 6 deletions src/Elastic.Serilog.Sinks/ElasticsearchSinkExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public static LoggerConfiguration Elasticsearch(
this LoggerSinkConfiguration loggerConfiguration,
ICollection<Uri> nodes,
Action<ElasticsearchSinkOptions>? configureOptions = null,
Action<TransportConfiguration>? configureTransport = null,
Action<TransportConfigurationDescriptor>? configureTransport = null,
bool useSniffing = false,
LoggingLevelSwitch? levelSwitch = null,
LogEventLevel restrictedToMinimumLevel = LevelAlias.Minimum
Expand All @@ -72,7 +72,7 @@ public static LoggerConfiguration Elasticsearch<TEcsDocument>(
this LoggerSinkConfiguration loggerConfiguration,
ICollection<Uri> nodes,
Action<ElasticsearchSinkOptions<TEcsDocument>>? configureOptions = null,
Action<TransportConfiguration>? configureTransport = null,
Action<TransportConfigurationDescriptor>? configureTransport = null,
bool useSniffing = false,
LoggingLevelSwitch? levelSwitch = null,
LogEventLevel restrictedToMinimumLevel = LevelAlias.Minimum
Expand All @@ -97,7 +97,7 @@ public static LoggerConfiguration ElasticCloud(
string cloudId,
string apiKey,
Action<ElasticsearchSinkOptions>? configureOptions = null,
Action<TransportConfiguration>? configureTransport = null,
Action<TransportConfigurationDescriptor>? configureTransport = null,
LoggingLevelSwitch? levelSwitch = null,
LogEventLevel restrictedToMinimumLevel = LevelAlias.Minimum
)
Expand All @@ -122,7 +122,7 @@ public static LoggerConfiguration ElasticCloud<TEcsDocument>(
string cloudId,
string apiKey,
Action<ElasticsearchSinkOptions<TEcsDocument>>? configureOptions = null,
Action<TransportConfiguration>? configureTransport = null,
Action<TransportConfigurationDescriptor>? configureTransport = null,
LoggingLevelSwitch? levelSwitch = null,
LogEventLevel restrictedToMinimumLevel = LevelAlias.Minimum
) where TEcsDocument : EcsDocument, new()
Expand All @@ -147,7 +147,7 @@ public static LoggerConfiguration ElasticCloud(
string username,
string password,
Action<ElasticsearchSinkOptions>? configureOptions = null,
Action<TransportConfiguration>? configureTransport = null,
Action<TransportConfigurationDescriptor>? configureTransport = null,
LoggingLevelSwitch? levelSwitch = null,
LogEventLevel restrictedToMinimumLevel = LevelAlias.Minimum
)
Expand All @@ -173,7 +173,7 @@ public static LoggerConfiguration ElasticCloud<TEcsDocument>(
string username,
string password,
Action<ElasticsearchSinkOptions<TEcsDocument>>? configureOptions = null,
Action<TransportConfiguration>? configureTransport = null,
Action<TransportConfigurationDescriptor>? configureTransport = null,
LoggingLevelSwitch? levelSwitch = null,
LogEventLevel restrictedToMinimumLevel = LevelAlias.Minimum
) where TEcsDocument : EcsDocument, new()
Expand Down
20 changes: 10 additions & 10 deletions src/Elastic.Serilog.Sinks/TransportHelper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,34 +14,34 @@ internal static class TransportHelper
public static TransportConfiguration Default() =>
new TransportConfiguration(new Uri("http://localhost:9200"), DefaultProduct);

public static TransportConfiguration Static(IEnumerable<string> nodes) => Static(nodes.Select(n => new Uri(n)));
public static TransportConfigurationDescriptor Static(IEnumerable<string> nodes) => Static(nodes.Select(n => new Uri(n)));

public static TransportConfiguration Static(IEnumerable<Uri> nodes)
public static TransportConfigurationDescriptor Static(IEnumerable<Uri> nodes)
{
var pool = new StaticNodePool(nodes.Select(e => new Node(e)));
return new TransportConfiguration(pool, productRegistration: DefaultProduct);
return new TransportConfigurationDescriptor(pool, productRegistration: DefaultProduct);
}

public static TransportConfiguration Sniffing(IEnumerable<string> nodes) => Sniffing(nodes.Select(n => new Uri(n)));
public static TransportConfigurationDescriptor Sniffing(IEnumerable<string> nodes) => Sniffing(nodes.Select(n => new Uri(n)));

public static TransportConfiguration Sniffing(IEnumerable<Uri> nodes)
public static TransportConfigurationDescriptor Sniffing(IEnumerable<Uri> nodes)
{
var pool = new SniffingNodePool(nodes.Select(e => new Node(e)));
return new TransportConfiguration(pool, productRegistration: DefaultProduct);
return new TransportConfigurationDescriptor(pool, productRegistration: DefaultProduct);
}

public static TransportConfiguration Cloud(string cloudId, string apiKey)
public static TransportConfigurationDescriptor Cloud(string cloudId, string apiKey)
{
var header = new ApiKey(apiKey);
var pool = new CloudNodePool(cloudId, header);
return new TransportConfiguration(pool, productRegistration: DefaultProduct);
return new TransportConfigurationDescriptor(pool, productRegistration: DefaultProduct);
}

public static TransportConfiguration Cloud(string cloudId, string username, string password)
public static TransportConfigurationDescriptor Cloud(string cloudId, string username, string password)
{
var header = new BasicAuthentication(username, password);
var pool = new CloudNodePool(cloudId, header);
return new TransportConfiguration(pool, productRegistration: DefaultProduct);
return new TransportConfigurationDescriptor(pool, productRegistration: DefaultProduct);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using BenchmarkDotNet.Configs;
using BenchmarkDotNet.Diagnosers;
using BenchmarkDotNet.Environments;
Expand Down Expand Up @@ -49,7 +50,7 @@ private static IConfig CreateDefaultConfig()
}

[Fact]
public void BenchmarkingPersistsResults()
public async Task BenchmarkingPersistsResults()
{
var url = Client.ElasticsearchClientSettings.NodePool.Nodes.First().Uri;
IChannelDiagnosticsListener listener = null;
Expand Down Expand Up @@ -81,13 +82,13 @@ public void BenchmarkingPersistsResults()
// throw new Exception(template.DebugInformation);

var indexName = $"benchmarks-dotnet-{options.DataStreamNamespace}";
var indexExists = Client.Indices.Exists(indexName);
var indexExists = await Client.Indices.ExistsAsync(indexName);
if (!indexExists.IsValidResponse)
throw new Exception(indexExists.DebugInformation);

Client.Indices.Refresh(indexName);
await Client.Indices.RefreshAsync(indexName);

var searchResponse = Client.Search<BenchmarkDocument>(s => s.Index(indexName).TrackTotalHits(new TrackHits(true)));
var searchResponse = await Client.SearchAsync<BenchmarkDocument>(s => s.Index(indexName).TrackTotalHits(new TrackHits(true)));
if (!searchResponse.IsValidResponse || searchResponse.Total == 0)
throw new Exception(searchResponse.DebugInformation);

Expand Down
Loading

0 comments on commit 2f97b9d

Please sign in to comment.