Skip to content

Commit

Permalink
add tests to ensure the right strategies can be provided for indexing…
Browse files Browse the repository at this point in the history
… events
  • Loading branch information
Mpdreamz committed Oct 9, 2024
1 parent 33e5867 commit 39de9dc
Show file tree
Hide file tree
Showing 4 changed files with 118 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,9 @@ public DataStreamChannel(DataStreamChannelOptions<TEvent> options, ICollection<I
/// <inheritdoc cref="EventIndexStrategy"/>
protected override (HeaderSerializationStrategy, BulkHeader?) EventIndexStrategy(TEvent @event)
{

var listExecutedPipelines = Options.ListExecutedPipelines?.Invoke(@event);
var templates = Options.DynamicTemplateLookup?.Invoke(@event);
if (templates is null && listExecutedPipelines is null)
if (templates is null && listExecutedPipelines is null or false)
return (HeaderSerializationStrategy.CreateNoParams, null);

var header = new BulkHeader
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,9 @@ public abstract partial class ElasticsearchChannelBase<TEvent, TChannelOptions>
private static byte[] PlainCreateBytes => PlainCreateBytesSpan.ToArray();
#endif

private Task SerializeHeaderAsync(Stream stream, ref readonly BulkHeader? header, JsonSerializerOptions serializerOptions, CancellationToken ctx) =>
throw new NotImplementedException();
private Task SerializeHeaderAsync(Stream stream, ref readonly BulkHeader? header, JsonSerializerOptions serializerOptions, CancellationToken ctx
) =>
Task.CompletedTask;


#if NET8_0_OR_GREATER
Expand Down
15 changes: 10 additions & 5 deletions src/Elastic.Ingest.Elasticsearch/Indices/IndexChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,13 @@ protected override (HeaderSerializationStrategy, BulkHeader?) EventIndexStrategy
var templates = Options.DynamicTemplateLookup?.Invoke(@event);
var requireAlias = Options.RequireAlias?.Invoke(@event);
var listExecutedPipelines = Options.ListExecutedPipelines?.Invoke(@event);
var isUpsert = Options.BulkUpsertLookup?.Invoke(@event, index) is true;
if (string.IsNullOrWhiteSpace(index)
&& string.IsNullOrWhiteSpace(id)
&& templates is null
&& requireAlias is null
&& listExecutedPipelines is null)
&& isUpsert is false
&& requireAlias is null or false
&& listExecutedPipelines is null or false)
return Options.OperationMode == OperationMode.Index
? (IndexNoParams, null)
: (CreateNoParams, null);
Expand All @@ -71,10 +73,13 @@ protected override (HeaderSerializationStrategy, BulkHeader?) EventIndexStrategy
RequireAlias = requireAlias,
ListExecutedPipelines = listExecutedPipelines
};

return (Options.OperationMode == OperationMode.Index
var op = Options.OperationMode == OperationMode.Index
? HeaderSerializationStrategy.Index
: Create, header);
: Create;
if (isUpsert)
op = Update;

return (op, header);
}

/// <inheritdoc cref="ElasticsearchChannelBase{TEvent,TChannelOptions}.TemplateName"/>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
// Licensed to Elasticsearch B.V under one or more agreements.
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information

using System;
using System.Collections.Generic;
using System.IO;
using System.Threading;
using Elastic.Channels.Diagnostics;
using Elastic.Ingest.Elasticsearch.DataStreams;
using Elastic.Ingest.Elasticsearch.Indices;
using Elastic.Ingest.Elasticsearch.Serialization;
using Elastic.Transport;
using FluentAssertions;
using Xunit;

namespace Elastic.Ingest.Elasticsearch.Tests.Strategies;

public class TestDocument
{
private static int Counter = 0;
private readonly int _id = ++Counter;

public DateTimeOffset Timestamp { get; set; }
public int Id => _id;
}

internal class TestIndexChannel(IndexChannelOptions<TestDocument> options) : IndexChannel<TestDocument>(options)
{
public List<(HeaderSerializationStrategy, BulkHeader?)> Strategies { get; } = new();

protected override (HeaderSerializationStrategy, BulkHeader?) EventIndexStrategy(TestDocument @event)
{
var strategy = base.EventIndexStrategy(@event);
Strategies.Add(strategy);
return strategy;
}
}

public class IndexChannelEventOptionsTests : ChannelTestWithSingleDocResponseBase
{
[Fact]
public void DataStreamChannel_UsesCorrectUrlAndOperationHeader()
{
ApiCallDetails callDetails = null;

var wait = new ManualResetEvent(false);

Exception exception = null;
using var channel = new TestIndexChannel(new IndexChannelOptions<TestDocument>(Transport)
{
IndexFormat = "test-index",
BufferOptions = new() { OutboundBufferMaxSize = 1 },
DynamicTemplateLookup = document => document.Id == 2 ? new Dictionary<string, string> { { "id", "1" } } : null,
BulkOperationIdLookup = document => document.Id == 3 ? "33" : null,
BulkUpsertLookup = (document, _) => document.Id == 4,
RequireAlias = document => document.Id == 5,
ListExecutedPipelines = document => document.Id == 6,
ExportExceptionCallback = e =>
{
exception = e;
wait.Set();
},
ExportResponseCallback = (response, _) =>
{
callDetails = response.ApiCallDetails;
wait.Set();
}
});

channel.TryWrite(new TestDocument()); //0
channel.TryWrite(new TestDocument()); //1
channel.TryWrite(new TestDocument()); //2
channel.TryWrite(new TestDocument()); //3
channel.TryWrite(new TestDocument()); //4
channel.TryWrite(new TestDocument()); //5
var signalled = wait.WaitOne(TimeSpan.FromSeconds(5));
signalled.Should().BeTrue("because ExportResponseCallback should have been called");
exception.Should().BeNull();

callDetails.Uri.AbsolutePath.Should().Be("/test-index/_bulk");


channel.Strategies.Should().HaveCount(6);

channel.Strategies[0].Item1.Should().Be(HeaderSerializationStrategy.CreateNoParams);
channel.Strategies[0].Item2.Should().BeNull();

channel.Strategies[1].Item1.Should().Be(HeaderSerializationStrategy.Create);
channel.Strategies[1].Item2.Should().NotBeNull();
channel.Strategies[1].Item2!.Value.DynamicTemplates.Should().NotBeNull();

channel.Strategies[2].Item1.Should().Be(HeaderSerializationStrategy.Create);
channel.Strategies[2].Item2.Should().NotBeNull();
channel.Strategies[2].Item2!.Value.Id.Should().Be("33");

channel.Strategies[3].Item1.Should().Be(HeaderSerializationStrategy.Update);

channel.Strategies[4].Item2!.Value.RequireAlias.Should().BeTrue();

channel.Strategies[5].Item2!.Value.ListExecutedPipelines.Should().BeTrue();

}
}

0 comments on commit 39de9dc

Please sign in to comment.