diff --git a/src/Elastic.Ingest.Elasticsearch/DataStreams/DataStreamChannel.cs b/src/Elastic.Ingest.Elasticsearch/DataStreams/DataStreamChannel.cs index e5feeec..44cd75b 100644 --- a/src/Elastic.Ingest.Elasticsearch/DataStreams/DataStreamChannel.cs +++ b/src/Elastic.Ingest.Elasticsearch/DataStreams/DataStreamChannel.cs @@ -24,8 +24,8 @@ public DataStreamChannel(DataStreamChannelOptions options, ICollection /// The URL for the bulk request. /// - protected virtual string BulkUrl => "/_bulk"; + protected virtual string BulkUrl => "_bulk"; /// protected override bool RetryAllItems(BulkResponse response) => response.ApiCallDetails.HttpStatusCode == 429; diff --git a/src/Elastic.Ingest.Elasticsearch/Indices/IndexChannel.cs b/src/Elastic.Ingest.Elasticsearch/Indices/IndexChannel.cs index 4f78fb6..0e471ed 100644 --- a/src/Elastic.Ingest.Elasticsearch/Indices/IndexChannel.cs +++ b/src/Elastic.Ingest.Elasticsearch/Indices/IndexChannel.cs @@ -30,7 +30,7 @@ public IndexChannel(IndexChannelOptions options, ICollection( - new TransportConfiguration(new SingleNodePool(new Uri("http://localhost:9200")), - new InMemoryRequestInvoker(Encoding.UTF8.GetBytes("{\"items\":[{\"create\":{\"status\":201}}]}"))) + protected ChannelTestWithSingleDocResponseBase(string url = "https://localhost:9200") => + Transport = new DistributedTransport( + new TransportConfiguration(new SingleNodePool(new Uri(url)), + new InMemoryRequestInvoker(Encoding.UTF8.GetBytes("{\"items\":[{\"create\":{\"status\":201}}]}"))) .DisablePing() .EnableDebugMode()); + + protected ITransport Transport { get; } } diff --git a/tests/Elastic.Ingest.Elasticsearch.Tests/DataStreamChannelTests.cs b/tests/Elastic.Ingest.Elasticsearch.Tests/DataStreamChannelTests.cs index 5ae95b7..f1dc0e5 100644 --- a/tests/Elastic.Ingest.Elasticsearch.Tests/DataStreamChannelTests.cs +++ b/tests/Elastic.Ingest.Elasticsearch.Tests/DataStreamChannelTests.cs @@ -20,7 +20,7 @@ public void DataStreamChannel_UsesCorrectUrlAndOperationHeader() var wait = new ManualResetEvent(false); - using var channel = new DataStreamChannel(new DataStreamChannelOptions(_transport) + using var channel = new DataStreamChannel(new DataStreamChannelOptions(Transport) { BufferOptions = new() { diff --git a/tests/Elastic.Ingest.Elasticsearch.Tests/IndexChannelTests.cs b/tests/Elastic.Ingest.Elasticsearch.Tests/IndexChannelTests.cs index 82756ea..c44a8c7 100644 --- a/tests/Elastic.Ingest.Elasticsearch.Tests/IndexChannelTests.cs +++ b/tests/Elastic.Ingest.Elasticsearch.Tests/IndexChannelTests.cs @@ -22,13 +22,13 @@ public void IndexChannel_WithFixedIndexName_UsesCorrectUrlAndOperationHeader() = public void IndexChannel_WithDynamicIndexName_UsesCorrectUrlAndOperationHeader() => ExecuteAndAssert("/_bulk", "{\"create\":{\"_index\":\"dotnet-2023.07.29\"}}"); - private static void ExecuteAndAssert(string expectedUrl, string expectedOperationHeader, string indexName = null) + private void ExecuteAndAssert(string expectedUrl, string expectedOperationHeader, string indexName = null) { ApiCallDetails callDetails = null; var wait = new ManualResetEvent(false); - var options = new IndexChannelOptions(_transport) + var options = new IndexChannelOptions(Transport) { BufferOptions = new() { diff --git a/tests/Elastic.Ingest.Elasticsearch.Tests/SubPathTests.cs b/tests/Elastic.Ingest.Elasticsearch.Tests/SubPathTests.cs new file mode 100644 index 0000000..cef3ae6 --- /dev/null +++ b/tests/Elastic.Ingest.Elasticsearch.Tests/SubPathTests.cs @@ -0,0 +1,64 @@ +// Licensed to Elasticsearch B.V under one or more agreements. +// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information + +using System; +using System.IO; +using System.Threading; +using Elastic.Ingest.Elasticsearch.Indices; +using Elastic.Transport; +using FluentAssertions; +using Xunit; + +namespace Elastic.Ingest.Elasticsearch.Tests; + +public class SubPathTests : ChannelTestWithSingleDocResponseBase +{ + public SubPathTests() : base("https://localhost:9200/subpath") { } + + [Fact] + public void IndexChannel_WithFixedIndexName_UsesCorrectUrlAndOperationHeader() => + ExecuteAndAssert("/subpath/fixed-index/_bulk", "{\"create\":{}}", "fixed-index"); + + [Fact] + public void IndexChannel_WithDynamicIndexName_UsesCorrectUrlAndOperationHeader() => + ExecuteAndAssert("/subpath/_bulk", "{\"create\":{\"_index\":\"dotnet-2023.07.29\"}}"); + + private void ExecuteAndAssert(string expectedUrl, string expectedOperationHeader, string indexName = null) + { + ApiCallDetails callDetails = null; + var wait = new ManualResetEvent(false); + + var options = new IndexChannelOptions(Transport) + { + BufferOptions = new() + { + OutboundBufferMaxSize = 1 + }, + ExportResponseCallback = (response, _) => + { + callDetails = response.ApiCallDetails; + wait.Set(); + }, + TimestampLookup = _ => new DateTimeOffset(2023, 07, 29, 20, 00, 00, TimeSpan.Zero), + }; + + if (indexName is not null) + { + options.IndexFormat = indexName; + } + + using var channel = new IndexChannel(options); + + channel.TryWrite(new TestDocument()); + wait.WaitOne(); + + callDetails.Should().NotBeNull(); + callDetails.Uri.AbsolutePath.Should().Be(expectedUrl); + + var stream = new MemoryStream(callDetails.RequestBodyInBytes); + var sr = new StreamReader(stream); + var operation = sr.ReadLine(); + operation.Should().Be(expectedOperationHeader); + } +}