Skip to content

Commit

Permalink
Ensure we support endpoints with subpaths to write to as well (Elasti…
Browse files Browse the repository at this point in the history
…csearch as a service) (#73)

* Ensure we support endpoints with subpaths to write to as well (Elasticsearch as a service)

* fix datastream url builder
  • Loading branch information
Mpdreamz authored Sep 18, 2024
1 parent 3b7729d commit 60713d1
Show file tree
Hide file tree
Showing 7 changed files with 77 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ public DataStreamChannel(DataStreamChannelOptions<TEvent> options, ICollection<I
{
var dataStream = Options.DataStream.ToString();

_url = $"/{dataStream}{base.BulkUrl}";
_url = $"{dataStream}/{base.BulkUrl}";

_fixedHeader = new CreateOperation();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ protected override bool Retry(BulkResponse response)
/// <summary>
/// The URL for the bulk request.
/// </summary>
protected virtual string BulkUrl => "/_bulk";
protected virtual string BulkUrl => "_bulk";

/// <inheritdoc cref="ResponseItemsBufferedChannelBase{TChannelOptions,TEvent,TResponse,TBulkResponseItem}.RetryAllItems"/>
protected override bool RetryAllItems(BulkResponse response) => response.ApiCallDetails.HttpStatusCode == 429;
Expand Down
2 changes: 1 addition & 1 deletion src/Elastic.Ingest.Elasticsearch/Indices/IndexChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public IndexChannel(IndexChannelOptions<TEvent> options, ICollection<IChannelCal
// We can later avoid the overhead of calculating and adding the index name to the operation headers.
if (string.Format(Options.IndexFormat, DateTimeOffset.Now).Equals(Options.IndexFormat, StringComparison.Ordinal))
{
_url = $"/{Options.IndexFormat}{base.BulkUrl}";
_url = $"{Options.IndexFormat}/{base.BulkUrl}";
_skipIndexNameOnOperations = true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,12 @@ namespace Elastic.Ingest.Elasticsearch.Tests;

public abstract class ChannelTestWithSingleDocResponseBase
{
protected static readonly ITransport _transport = new DistributedTransport<TransportConfiguration>(
new TransportConfiguration(new SingleNodePool(new Uri("http://localhost:9200")),
new InMemoryRequestInvoker(Encoding.UTF8.GetBytes("{\"items\":[{\"create\":{\"status\":201}}]}")))
protected ChannelTestWithSingleDocResponseBase(string url = "https://localhost:9200") =>
Transport = new DistributedTransport<TransportConfiguration>(
new TransportConfiguration(new SingleNodePool(new Uri(url)),
new InMemoryRequestInvoker(Encoding.UTF8.GetBytes("{\"items\":[{\"create\":{\"status\":201}}]}")))
.DisablePing()
.EnableDebugMode());

protected ITransport Transport { get; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public void DataStreamChannel_UsesCorrectUrlAndOperationHeader()

var wait = new ManualResetEvent(false);

using var channel = new DataStreamChannel<TestDocument>(new DataStreamChannelOptions<TestDocument>(_transport)
using var channel = new DataStreamChannel<TestDocument>(new DataStreamChannelOptions<TestDocument>(Transport)
{
BufferOptions = new()
{
Expand Down
4 changes: 2 additions & 2 deletions tests/Elastic.Ingest.Elasticsearch.Tests/IndexChannelTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@ public void IndexChannel_WithFixedIndexName_UsesCorrectUrlAndOperationHeader() =
public void IndexChannel_WithDynamicIndexName_UsesCorrectUrlAndOperationHeader() =>
ExecuteAndAssert("/_bulk", "{\"create\":{\"_index\":\"dotnet-2023.07.29\"}}");

private static void ExecuteAndAssert(string expectedUrl, string expectedOperationHeader, string indexName = null)
private void ExecuteAndAssert(string expectedUrl, string expectedOperationHeader, string indexName = null)
{
ApiCallDetails callDetails = null;

var wait = new ManualResetEvent(false);

var options = new IndexChannelOptions<TestDocument>(_transport)
var options = new IndexChannelOptions<TestDocument>(Transport)
{
BufferOptions = new()
{
Expand Down
64 changes: 64 additions & 0 deletions tests/Elastic.Ingest.Elasticsearch.Tests/SubPathTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Licensed to Elasticsearch B.V under one or more agreements.
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information

using System;
using System.IO;
using System.Threading;
using Elastic.Ingest.Elasticsearch.Indices;
using Elastic.Transport;
using FluentAssertions;
using Xunit;

namespace Elastic.Ingest.Elasticsearch.Tests;

public class SubPathTests : ChannelTestWithSingleDocResponseBase
{
public SubPathTests() : base("https://localhost:9200/subpath") { }

[Fact]
public void IndexChannel_WithFixedIndexName_UsesCorrectUrlAndOperationHeader() =>
ExecuteAndAssert("/subpath/fixed-index/_bulk", "{\"create\":{}}", "fixed-index");

[Fact]
public void IndexChannel_WithDynamicIndexName_UsesCorrectUrlAndOperationHeader() =>
ExecuteAndAssert("/subpath/_bulk", "{\"create\":{\"_index\":\"dotnet-2023.07.29\"}}");

private void ExecuteAndAssert(string expectedUrl, string expectedOperationHeader, string indexName = null)
{
ApiCallDetails callDetails = null;
var wait = new ManualResetEvent(false);

var options = new IndexChannelOptions<TestDocument>(Transport)
{
BufferOptions = new()
{
OutboundBufferMaxSize = 1
},
ExportResponseCallback = (response, _) =>
{
callDetails = response.ApiCallDetails;
wait.Set();
},
TimestampLookup = _ => new DateTimeOffset(2023, 07, 29, 20, 00, 00, TimeSpan.Zero),
};

if (indexName is not null)
{
options.IndexFormat = indexName;
}

using var channel = new IndexChannel<TestDocument>(options);

channel.TryWrite(new TestDocument());
wait.WaitOne();

callDetails.Should().NotBeNull();
callDetails.Uri.AbsolutePath.Should().Be(expectedUrl);

var stream = new MemoryStream(callDetails.RequestBodyInBytes);
var sr = new StreamReader(stream);
var operation = sr.ReadLine();
operation.Should().Be(expectedOperationHeader);
}
}

0 comments on commit 60713d1

Please sign in to comment.