From 39de9dc1219e7ef886b3537b003da3eb49e1b56f Mon Sep 17 00:00:00 2001 From: Martijn Laarman Date: Wed, 9 Oct 2024 13:39:56 +0200 Subject: [PATCH] add tests to ensure the right strategies can be provided for indexing events --- .../DataStreams/DataStreamChannel.cs | 3 +- .../ElasticsearchChannelBase.Serialization.cs | 5 +- .../Indices/IndexChannel.cs | 15 ++- .../IndexChannelEventOptionsTests.cs | 104 ++++++++++++++++++ 4 files changed, 118 insertions(+), 9 deletions(-) create mode 100644 tests/Elastic.Ingest.Elasticsearch.Tests/Strategies/IndexChannelEventOptionsTests.cs diff --git a/src/Elastic.Ingest.Elasticsearch/DataStreams/DataStreamChannel.cs b/src/Elastic.Ingest.Elasticsearch/DataStreams/DataStreamChannel.cs index eeafe7f..1bebe12 100644 --- a/src/Elastic.Ingest.Elasticsearch/DataStreams/DataStreamChannel.cs +++ b/src/Elastic.Ingest.Elasticsearch/DataStreams/DataStreamChannel.cs @@ -30,10 +30,9 @@ public DataStreamChannel(DataStreamChannelOptions options, ICollection protected override (HeaderSerializationStrategy, BulkHeader?) EventIndexStrategy(TEvent @event) { - var listExecutedPipelines = Options.ListExecutedPipelines?.Invoke(@event); var templates = Options.DynamicTemplateLookup?.Invoke(@event); - if (templates is null && listExecutedPipelines is null) + if (templates is null && listExecutedPipelines is null or false) return (HeaderSerializationStrategy.CreateNoParams, null); var header = new BulkHeader diff --git a/src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelBase.Serialization.cs b/src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelBase.Serialization.cs index 5d7cbc9..d1a3ee5 100644 --- a/src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelBase.Serialization.cs +++ b/src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelBase.Serialization.cs @@ -37,8 +37,9 @@ public abstract partial class ElasticsearchChannelBase private static byte[] PlainCreateBytes => PlainCreateBytesSpan.ToArray(); #endif - private Task SerializeHeaderAsync(Stream stream, ref readonly BulkHeader? header, JsonSerializerOptions serializerOptions, CancellationToken ctx) => - throw new NotImplementedException(); + private Task SerializeHeaderAsync(Stream stream, ref readonly BulkHeader? header, JsonSerializerOptions serializerOptions, CancellationToken ctx + ) => + Task.CompletedTask; #if NET8_0_OR_GREATER diff --git a/src/Elastic.Ingest.Elasticsearch/Indices/IndexChannel.cs b/src/Elastic.Ingest.Elasticsearch/Indices/IndexChannel.cs index 49b6ca6..1f4464d 100644 --- a/src/Elastic.Ingest.Elasticsearch/Indices/IndexChannel.cs +++ b/src/Elastic.Ingest.Elasticsearch/Indices/IndexChannel.cs @@ -54,11 +54,13 @@ protected override (HeaderSerializationStrategy, BulkHeader?) EventIndexStrategy var templates = Options.DynamicTemplateLookup?.Invoke(@event); var requireAlias = Options.RequireAlias?.Invoke(@event); var listExecutedPipelines = Options.ListExecutedPipelines?.Invoke(@event); + var isUpsert = Options.BulkUpsertLookup?.Invoke(@event, index) is true; if (string.IsNullOrWhiteSpace(index) && string.IsNullOrWhiteSpace(id) && templates is null - && requireAlias is null - && listExecutedPipelines is null) + && isUpsert is false + && requireAlias is null or false + && listExecutedPipelines is null or false) return Options.OperationMode == OperationMode.Index ? (IndexNoParams, null) : (CreateNoParams, null); @@ -71,10 +73,13 @@ protected override (HeaderSerializationStrategy, BulkHeader?) EventIndexStrategy RequireAlias = requireAlias, ListExecutedPipelines = listExecutedPipelines }; - - return (Options.OperationMode == OperationMode.Index + var op = Options.OperationMode == OperationMode.Index ? HeaderSerializationStrategy.Index - : Create, header); + : Create; + if (isUpsert) + op = Update; + + return (op, header); } /// diff --git a/tests/Elastic.Ingest.Elasticsearch.Tests/Strategies/IndexChannelEventOptionsTests.cs b/tests/Elastic.Ingest.Elasticsearch.Tests/Strategies/IndexChannelEventOptionsTests.cs new file mode 100644 index 0000000..296cf38 --- /dev/null +++ b/tests/Elastic.Ingest.Elasticsearch.Tests/Strategies/IndexChannelEventOptionsTests.cs @@ -0,0 +1,104 @@ +// Licensed to Elasticsearch B.V under one or more agreements. +// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information + +using System; +using System.Collections.Generic; +using System.IO; +using System.Threading; +using Elastic.Channels.Diagnostics; +using Elastic.Ingest.Elasticsearch.DataStreams; +using Elastic.Ingest.Elasticsearch.Indices; +using Elastic.Ingest.Elasticsearch.Serialization; +using Elastic.Transport; +using FluentAssertions; +using Xunit; + +namespace Elastic.Ingest.Elasticsearch.Tests.Strategies; + +public class TestDocument +{ + private static int Counter = 0; + private readonly int _id = ++Counter; + + public DateTimeOffset Timestamp { get; set; } + public int Id => _id; +} + +internal class TestIndexChannel(IndexChannelOptions options) : IndexChannel(options) +{ + public List<(HeaderSerializationStrategy, BulkHeader?)> Strategies { get; } = new(); + + protected override (HeaderSerializationStrategy, BulkHeader?) EventIndexStrategy(TestDocument @event) + { + var strategy = base.EventIndexStrategy(@event); + Strategies.Add(strategy); + return strategy; + } +} + +public class IndexChannelEventOptionsTests : ChannelTestWithSingleDocResponseBase +{ + [Fact] + public void DataStreamChannel_UsesCorrectUrlAndOperationHeader() + { + ApiCallDetails callDetails = null; + + var wait = new ManualResetEvent(false); + + Exception exception = null; + using var channel = new TestIndexChannel(new IndexChannelOptions(Transport) + { + IndexFormat = "test-index", + BufferOptions = new() { OutboundBufferMaxSize = 1 }, + DynamicTemplateLookup = document => document.Id == 2 ? new Dictionary { { "id", "1" } } : null, + BulkOperationIdLookup = document => document.Id == 3 ? "33" : null, + BulkUpsertLookup = (document, _) => document.Id == 4, + RequireAlias = document => document.Id == 5, + ListExecutedPipelines = document => document.Id == 6, + ExportExceptionCallback = e => + { + exception = e; + wait.Set(); + }, + ExportResponseCallback = (response, _) => + { + callDetails = response.ApiCallDetails; + wait.Set(); + } + }); + + channel.TryWrite(new TestDocument()); //0 + channel.TryWrite(new TestDocument()); //1 + channel.TryWrite(new TestDocument()); //2 + channel.TryWrite(new TestDocument()); //3 + channel.TryWrite(new TestDocument()); //4 + channel.TryWrite(new TestDocument()); //5 + var signalled = wait.WaitOne(TimeSpan.FromSeconds(5)); + signalled.Should().BeTrue("because ExportResponseCallback should have been called"); + exception.Should().BeNull(); + + callDetails.Uri.AbsolutePath.Should().Be("/test-index/_bulk"); + + + channel.Strategies.Should().HaveCount(6); + + channel.Strategies[0].Item1.Should().Be(HeaderSerializationStrategy.CreateNoParams); + channel.Strategies[0].Item2.Should().BeNull(); + + channel.Strategies[1].Item1.Should().Be(HeaderSerializationStrategy.Create); + channel.Strategies[1].Item2.Should().NotBeNull(); + channel.Strategies[1].Item2!.Value.DynamicTemplates.Should().NotBeNull(); + + channel.Strategies[2].Item1.Should().Be(HeaderSerializationStrategy.Create); + channel.Strategies[2].Item2.Should().NotBeNull(); + channel.Strategies[2].Item2!.Value.Id.Should().Be("33"); + + channel.Strategies[3].Item1.Should().Be(HeaderSerializationStrategy.Update); + + channel.Strategies[4].Item2!.Value.RequireAlias.Should().BeTrue(); + + channel.Strategies[5].Item2!.Value.ListExecutedPipelines.Should().BeTrue(); + + } +}