Skip to content

Commit

Permalink
NLog Elastic Target supports EcsIndexChannel with IndexFormat (#429)
Browse files Browse the repository at this point in the history
  • Loading branch information
snakefoot authored Sep 16, 2024
1 parent b7da2d9 commit 4b0847d
Show file tree
Hide file tree
Showing 3 changed files with 158 additions and 12 deletions.
98 changes: 87 additions & 11 deletions src/Elastic.NLog.Targets/ElasticsearchTarget.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@
using Elastic.Channels;
using Elastic.Channels.Buffers;
using Elastic.Channels.Diagnostics;
using Elastic.CommonSchema.NLog;
using Elastic.Ingest.Elasticsearch;
using Elastic.Ingest.Elasticsearch.CommonSchema;
using Elastic.Ingest.Elasticsearch.DataStreams;
using Elastic.Ingest.Elasticsearch.Indices;
using Elastic.Ingest.Elasticsearch.Serialization;
using Elastic.Transport;
using Elastic.Transport.Products.Elasticsearch;
Expand All @@ -24,7 +26,7 @@ public class ElasticsearchTarget : TargetWithLayout
/// <inheritdoc />
public override Layout Layout { get => _layout; set => _layout = value as Elastic.CommonSchema.NLog.EcsLayout ?? _layout; }
private Elastic.CommonSchema.NLog.EcsLayout _layout = new Elastic.CommonSchema.NLog.EcsLayout();
private EcsDataStreamChannel<NLogEcsDocument>? _channel;
private IBufferedChannel<NLogEcsDocument>? _channel;

/// <summary>
/// Gets or sets the connection pool type. Default for multiple nodes is <c>Sniffing</c>; other supported values are
Expand All @@ -51,6 +53,41 @@ public class ElasticsearchTarget : TargetWithLayout
/// <summary> User-configurable arbitrary grouping</summary>
public Layout? DataStreamNamespace { get; set; } = "default";

/// <summary>
/// Gets or sets the format string for the Elastic search index. The current <c>DateTimeOffset</c> is passed as parameter 0.
///
/// <para> Example: "dotnet-{0:yyyy.MM.dd}"</para>
/// <para> If no {0} parameter is defined the index name is effectively fixed</para>
/// </summary>
public Layout? IndexFormat { get; set; }

/// <summary>
/// Gets or sets the offset to use for the index <c>DateTimeOffset</c>. Default value is null, which uses the system local offset.
/// Use "0" for UTC.
/// </summary>
public Layout? IndexOffsetHours { get; set; }

/// <summary>
/// Control the operation header for each bulk operation. Default value is Auto.
///
/// <para> Can explicit specify Auto, Index or Create</para>
/// </summary>
public OperationMode IndexOperation { get; set; }

/// <summary>
/// Gets or sets the optional override of the per document `_id`.
/// </summary>
public Layout? IndexEventId
{
get => _layout.EventId;
set
{
_layout.EventId = value;
_hasIndexEventId = value is not null;
}
}
private bool _hasIndexEventId;

/// <summary>
/// The maximum number of in flight instances that can be queued in memory. If this threshold is reached, events will be dropped
/// <para>Defaults to <c>100_000</c></para>
Expand Down Expand Up @@ -124,18 +161,17 @@ public Layout? CloudId
/// <summary>
/// Provide callbacks to further configure <see cref="DataStreamChannelOptions{TEvent}"/>
/// </summary>
public Action<DataStreamChannelOptions<NLogEcsDocument>>? ConfigureChannel { get; set; }
public Action<ElasticsearchChannelOptionsBase<NLogEcsDocument>>? ConfigureChannel { get; set; }

/// <inheritdoc cref="IChannelDiagnosticsListener"/>
public IChannelDiagnosticsListener? DiagnosticsListener => _channel?.DiagnosticsListener;

/// <inheritdoc />
protected override void InitializeTarget()
{
var ilmPolicy = IlmPolicy?.Render(LogEventInfo.CreateNullEvent());
var dataStreamType = DataStreamType?.Render(LogEventInfo.CreateNullEvent()) ?? string.Empty;
var dataStreamSet = DataStreamSet?.Render(LogEventInfo.CreateNullEvent()) ?? string.Empty;
var dataStreamNamespace = DataStreamNamespace?.Render(LogEventInfo.CreateNullEvent()) ?? string.Empty;
var indexFormat = IndexFormat?.Render(LogEventInfo.CreateNullEvent()) ?? string.Empty;
var indexOffsetHours = IndexOffsetHours?.Render(LogEventInfo.CreateNullEvent()) ?? string.Empty;
var indexOffset = string.IsNullOrEmpty(indexOffsetHours) ? default(TimeSpan?) : TimeSpan.FromHours(int.Parse(indexOffsetHours));

var connectionPool = CreateNodePool();
var config = new TransportConfiguration(connectionPool, productRegistration: ElasticsearchProductRegistration.Default);
Expand All @@ -144,11 +180,18 @@ protected override void InitializeTarget()
config = SetAuthenticationOnTransport(config);

var transport = new DistributedTransport<TransportConfiguration>(config);
var channelOptions = new DataStreamChannelOptions<NLogEcsDocument>(transport)
if (!string.IsNullOrEmpty(indexFormat))
{
DataStream = new DataStreamName(dataStreamType, dataStreamSet, dataStreamNamespace),
WriteEvent = async (stream, ctx, logEvent) => await logEvent.SerializeAsync(stream, ctx).ConfigureAwait(false),
};
_channel = CreateIndexChannel(transport, indexFormat, indexOffset, IndexOperation);
}
else
{
_channel = CreateDataStreamChannel(transport);
}
}

private void SetupChannelOptions(ElasticsearchChannelOptionsBase<NLogEcsDocument> channelOptions)
{
if (InboundBufferMaxSize > 0)
channelOptions.BufferOptions.InboundBufferMaxSize = InboundBufferMaxSize;
if (OutboundBufferMaxSize > 0)
Expand All @@ -160,10 +203,43 @@ protected override void InitializeTarget()
if (ExportMaxRetries >= 0)
channelOptions.BufferOptions.ExportMaxRetries = ExportMaxRetries;
ConfigureChannel?.Invoke(channelOptions);
}

private EcsDataStreamChannel<NLogEcsDocument> CreateDataStreamChannel(DistributedTransport<TransportConfiguration> transport)
{
var ilmPolicy = IlmPolicy?.Render(LogEventInfo.CreateNullEvent());
var dataStreamType = DataStreamType?.Render(LogEventInfo.CreateNullEvent()) ?? string.Empty;
var dataStreamSet = DataStreamSet?.Render(LogEventInfo.CreateNullEvent()) ?? string.Empty;
var dataStreamNamespace = DataStreamNamespace?.Render(LogEventInfo.CreateNullEvent()) ?? string.Empty;
var channelOptions = new DataStreamChannelOptions<NLogEcsDocument>(transport)
{
DataStream = new DataStreamName(dataStreamType, dataStreamSet, dataStreamNamespace),
WriteEvent = async (stream, ctx, logEvent) => await logEvent.SerializeAsync(stream, ctx).ConfigureAwait(false),
};
SetupChannelOptions(channelOptions);
var channel = new EcsDataStreamChannel<NLogEcsDocument>(channelOptions, new[] { new InternalLoggerCallbackListener<NLogEcsDocument>() });
channel.BootstrapElasticsearch(BootstrapMethod, ilmPolicy);
_channel = channel;
return channel;
}

private EcsIndexChannel<NLogEcsDocument> CreateIndexChannel(DistributedTransport<TransportConfiguration> transport, string indexFormat, TimeSpan? indexOffset, OperationMode indexOperation)
{
var indexChannelOptions = new IndexChannelOptions<NLogEcsDocument>(transport)
{
IndexFormat = indexFormat,
IndexOffset = indexOffset,
WriteEvent = async (stream, ctx, logEvent) => await logEvent.SerializeAsync(stream, ctx).ConfigureAwait(false),
TimestampLookup = l => l.Timestamp,
OperationMode = indexOperation,
};

if (_hasIndexEventId)
{
indexChannelOptions.BulkOperationIdLookup = (logEvent) => (logEvent.Event?.Id)!;
}

SetupChannelOptions(indexChannelOptions);
return new EcsIndexChannel<NLogEcsDocument>(indexChannelOptions);
}

/// <inheritdoc />
Expand Down
9 changes: 8 additions & 1 deletion src/Elastic.NLog.Targets/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ var logger = LogManager.GetCurrentClassLogger();
- Cloud - Pool seeded with CloudId
- _NodeUris_ - URIs of the Elasticsearch nodes in the connection pool (comma delimited)
- _CloudId_ - When using NodePoolType = Cloud
- _BootstrapMethod_ - Whether to configure / bootstrap the destination, which requires user has management capabilities (None, Silent, Failure). Default = None

* **Export Authentication**
- _ApiKey_ - When using NodePoolType = Cloud and authentication via API key.
Expand All @@ -67,10 +68,16 @@ var logger = LogManager.GetCurrentClassLogger();
- _ExportMaxRetries_ - Max number of times to retry an export. Default = 3

* **Export DataStream**
- _DataStreamType_ - Generic type describing the data. Defaults = 'logs'
- _DataStreamType_ - Generic type describing the data. Default = 'logs'
- _DataStreamSet_ - Describes the data ingested and its structure. Default = 'dotnet'
- _DataStreamNamespace_ - User-configurable arbitrary grouping. Default = 'default'

* **Export Index**
- _IndexFormat_ - Format string for the Elastic search index (Ex. `dotnet-{0:yyyy.MM.dd}` or blank means disabled). Default = ''
- _IndexOffsetHours_ - Time offset to use for the index (Ex. `0` for UTC or blank means system local). Default = ''
- _IndexOperation_ - Operation header for each bulk operation (Auto, Index, Create). Default = Auto
- _IndexEventId_ - Optional override of the per document `_id`

Notice that export depends on in-memory queue, that is lost on application-crash / -exit.
If higher gurantee of delivery is required, then consider using [Elastic.CommonSchema.NLog](https://www.nuget.org/packages/Elastic.CommonSchema.NLog)
together with NLog FileTarget and use [filebeat](https://www.elastic.co/beats/filebeat) to ship these logs.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Elastic.Channels.Diagnostics;
using Elastic.Clients.Elasticsearch.IndexManagement;
using Elastic.CommonSchema;
using Elastic.Ingest.Elasticsearch;
using FluentAssertions;
using Xunit;
using Xunit.Abstractions;

namespace NLog.Targets.Elastic.IntegrationTests
{
public class LoggingToIndexIngestionTests : TestBase
{
public LoggingToIndexIngestionTests(LoggingCluster cluster, ITestOutputHelper output) : base(cluster, output) { }

[Fact]
public async Task EnsureDocumentsEndUpInIndex()
{
var indexPrefix = "catalog-data-";
var indexFormat = indexPrefix + "{0:yyyy.MM.dd}";

using var _ = CreateLogger(out var logger, out var provider, out var @namespace, out var waitHandle, out var listener, (cfg) =>
{
cfg.IndexFormat = indexFormat;
cfg.DataStreamType = "x";
cfg.DataStreamSet = "dotnet";
var nodesUris = string.Join(",", Client.ElasticsearchClientSettings.NodePool.Nodes.Select(n => n.Uri.ToString()).ToArray());
cfg.NodeUris = nodesUris;
cfg.NodePoolType = ElasticPoolType.Static;
});

var date = DateTimeOffset.Now;
var indexName = string.Format(indexFormat, date);

var index = await Client.Indices.GetAsync(new GetIndexRequest(indexName));
index.Indices.Should().BeNullOrEmpty();

logger.Error("an error occurred!");

if (!waitHandle.WaitOne(TimeSpan.FromSeconds(10)))
throw new Exception($"No flush occurred in 10 seconds: {listener}", listener.ObservedException);

listener.PublishSuccess.Should().BeTrue("{0}", listener);
listener.ObservedException.Should().BeNull();

var refreshResult = await Client.Indices.RefreshAsync(indexName);
refreshResult.IsValidResponse.Should().BeTrue("{0}", refreshResult.DebugInformation);
var searchResult = await Client.SearchAsync<EcsDocument>(s => s.Indices(indexName));
searchResult.Total.Should().Be(1);

var storedDocument = searchResult.Documents.First();
storedDocument.Message.Should().Be("an error occurred!");

var hit = searchResult.Hits.First();
hit.Index.Should().Be(indexName);
}
}
}

0 comments on commit 4b0847d

Please sign in to comment.