Skip to content

Commit

Permalink
Ensure bulk header options can be set
Browse files Browse the repository at this point in the history
  • Loading branch information
Mpdreamz committed Oct 9, 2024
1 parent 9c94df3 commit 33e5867
Show file tree
Hide file tree
Showing 9 changed files with 72 additions and 53 deletions.
19 changes: 15 additions & 4 deletions src/Elastic.Ingest.Elasticsearch/DataStreams/DataStreamChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,22 @@ public DataStreamChannel(DataStreamChannelOptions<TEvent> options, ICollection<I
_url = $"{dataStream}/{base.BulkUrl}";
}

/// <inheritdoc cref="GetIndexOp"/>
protected override HeaderSerialization GetIndexOp(TEvent @event) => HeaderSerialization.CreateNoParams;
/// <inheritdoc cref="EventIndexStrategy"/>
protected override (HeaderSerializationStrategy, BulkHeader?) EventIndexStrategy(TEvent @event)
{

var listExecutedPipelines = Options.ListExecutedPipelines?.Invoke(@event);
var templates = Options.DynamicTemplateLookup?.Invoke(@event);
if (templates is null && listExecutedPipelines is null)
return (HeaderSerializationStrategy.CreateNoParams, null);

/// <inheritdoc cref="MutateHeader"/>
protected override void MutateHeader(TEvent @event, ref BulkHeader header) { }
var header = new BulkHeader
{
DynamicTemplates = templates,
ListExecutedPipelines = listExecutedPipelines
};
return (HeaderSerializationStrategy.Create, header);
}

/// <inheritdoc cref="ElasticsearchChannelBase{TEvent,TChannelOptions}.TemplateName"/>
protected override string TemplateName => Options.DataStream.GetTemplateName();
Expand Down
27 changes: 10 additions & 17 deletions src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelBase.Bytes.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,7 @@ public abstract partial class ElasticsearchChannelBase<TEvent, TChannelOptions>
where TChannelOptions : ElasticsearchChannelOptionsBase<TEvent>
{
/// <summary> TODO </summary>
protected abstract HeaderSerialization GetIndexOp(TEvent @event);

/// <summary> </summary>
/// <param name="event"></param>
/// <param name="header"></param>
protected abstract void MutateHeader(TEvent @event, ref BulkHeader header);
protected abstract (HeaderSerializationStrategy, BulkHeader?) EventIndexStrategy(TEvent @event);

/// <summary>
/// Asynchronously write the NDJSON request body for a page of <typeparamref name="TEvent"/> events to <see cref="Stream"/>.
Expand All @@ -57,28 +52,26 @@ public async Task WriteBufferToStreamAsync(
var @event = items[i];
if (@event == null) continue;

var op = GetIndexOp(@event);
var (op, header) = EventIndexStrategy(@event);
switch (op)
{
case HeaderSerialization.IndexNoParams:
case HeaderSerializationStrategy.IndexNoParams:
await SerializePlainIndexHeaderAsync(stream, ctx).ConfigureAwait(false);
break;
case HeaderSerialization.CreateNoParams:
case HeaderSerializationStrategy.CreateNoParams:
await SerializePlainCreateHeaderAsync(stream, ctx).ConfigureAwait(false);
break;
case HeaderSerialization.Index:
case HeaderSerialization.Create:
case HeaderSerialization.Delete:
case HeaderSerialization.Update:
var header = new BulkHeader();
MutateHeader(@event, ref header);
case HeaderSerializationStrategy.Index:
case HeaderSerializationStrategy.Create:
case HeaderSerializationStrategy.Delete:
case HeaderSerializationStrategy.Update:
await SerializeHeaderAsync(stream, ref header, SerializerOptions, ctx).ConfigureAwait(false);
break;
}

await stream.WriteAsync(LineFeed, 0, 1, ctx).ConfigureAwait(false);

if (op == HeaderSerialization.Update)
if (op == HeaderSerializationStrategy.Update)
await stream.WriteAsync(DocUpdateHeaderStart, 0, DocUpdateHeaderStart.Length, ctx).ConfigureAwait(false);

if (options.EventWriter?.WriteToStreamAsync != null)
Expand All @@ -87,7 +80,7 @@ public async Task WriteBufferToStreamAsync(
await JsonSerializer.SerializeAsync(stream, @event, SerializerOptions, ctx)
.ConfigureAwait(false);

if (op == HeaderSerialization.Update)
if (op == HeaderSerializationStrategy.Update)
await stream.WriteAsync(DocUpdateHeaderEnd, 0, DocUpdateHeaderEnd.Length, ctx).ConfigureAwait(false);

await stream.WriteAsync(LineFeed, 0, 1, ctx).ConfigureAwait(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public abstract partial class ElasticsearchChannelBase<TEvent, TChannelOptions>
private static byte[] PlainCreateBytes => PlainCreateBytesSpan.ToArray();
#endif

private Task SerializeHeaderAsync(Stream stream, ref readonly BulkHeader header, JsonSerializerOptions serializerOptions, CancellationToken ctx) =>
private Task SerializeHeaderAsync(Stream stream, ref readonly BulkHeader? header, JsonSerializerOptions serializerOptions, CancellationToken ctx) =>
throw new NotImplementedException();


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ protected ElasticsearchChannelOptionsBase(ITransport transport) : base(transport
public IElasticsearchEventWriter<TEvent>? EventWriter { get; set; }

/// <summary> Optionally set dynamic templates for event</summary>
public Func<TEvent, IDictionary<string, string>>? DynamicTemplates { get; set; }
public Func<TEvent, IDictionary<string, string>?>? DynamicTemplateLookup { get; set; }

/// <summary> If true, the response will include the ingest pipelines that were executed. Defaults to false. </summary>
public Func<TEvent, bool>? ListExecutedPipelines { get; set; }

}
47 changes: 25 additions & 22 deletions src/Elastic.Ingest.Elasticsearch/Indices/IndexChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using Elastic.Ingest.Elasticsearch.DataStreams;
using Elastic.Ingest.Elasticsearch.Serialization;
using Elastic.Ingest.Transport;
using static Elastic.Ingest.Elasticsearch.Serialization.HeaderSerializationStrategy;

namespace Elastic.Ingest.Elasticsearch.Indices;

Expand Down Expand Up @@ -42,36 +43,38 @@ public IndexChannel(IndexChannelOptions<TEvent> options, ICollection<IChannelCal
/// <inheritdoc cref="ElasticsearchChannelBase{TEvent, TChannelOptions}.BulkUrl"/>
protected override string BulkUrl => _url;

/// <inheritdoc cref="GetIndexOp"/>
protected override HeaderSerialization GetIndexOp(TEvent @event)
/// <inheritdoc cref="EventIndexStrategy"/>
protected override (HeaderSerializationStrategy, BulkHeader?) EventIndexStrategy(TEvent @event)
{
var indexTime = Options.TimestampLookup?.Invoke(@event) ?? DateTimeOffset.Now;
if (Options.IndexOffset.HasValue) indexTime = indexTime.ToOffset(Options.IndexOffset.Value);

var index = _skipIndexName ? string.Empty : string.Format(Options.IndexFormat, indexTime);

var id = Options.BulkOperationIdLookup?.Invoke(@event);
if (string.IsNullOrWhiteSpace(index) && string.IsNullOrWhiteSpace(id))
var templates = Options.DynamicTemplateLookup?.Invoke(@event);
var requireAlias = Options.RequireAlias?.Invoke(@event);
var listExecutedPipelines = Options.ListExecutedPipelines?.Invoke(@event);
if (string.IsNullOrWhiteSpace(index)
&& string.IsNullOrWhiteSpace(id)
&& templates is null
&& requireAlias is null
&& listExecutedPipelines is null)
return Options.OperationMode == OperationMode.Index
? HeaderSerialization.IndexNoParams
: HeaderSerialization.CreateNoParams;

return Options.OperationMode == OperationMode.Index
? HeaderSerialization.Index
: HeaderSerialization.Create;
}
? (IndexNoParams, null)
: (CreateNoParams, null);

/// <inheritdoc cref="MutateHeader"/>
protected override void MutateHeader(TEvent @event, ref BulkHeader header)
{
var indexTime = Options.TimestampLookup?.Invoke(@event) ?? DateTimeOffset.Now;
if (Options.IndexOffset.HasValue) indexTime = indexTime.ToOffset(Options.IndexOffset.Value);

var index = _skipIndexName ? string.Empty : string.Format(Options.IndexFormat, indexTime);
header.Index = index;

var id = Options.BulkOperationIdLookup?.Invoke(@event);
header.Id = id;
var header = new BulkHeader
{
Id = id,
Index = index,
DynamicTemplates = templates,
RequireAlias = requireAlias,
ListExecutedPipelines = listExecutedPipelines
};

return (Options.OperationMode == OperationMode.Index
? HeaderSerializationStrategy.Index
: Create, header);
}

/// <inheritdoc cref="ElasticsearchChannelBase{TEvent,TChannelOptions}.TemplateName"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ public IndexChannelOptions(ITransport transport) : base(transport) { }
/// </summary>
public Func<TEvent, string>? BulkOperationIdLookup { get; set; }

/// <summary> If true, the action must target an index alias. Defaults to false. </summary>
public Func<TEvent, bool>? RequireAlias { get; set; }

/// <summary>
/// Uses the callback provided to <see cref="BulkOperationIdLookup"/> to determine if this is in fact an update operation
/// <para>If this returns true the document will be sent as an upsert operation</para>
Expand Down
19 changes: 13 additions & 6 deletions src/Elastic.Ingest.Elasticsearch/Serialization/BulkHeader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,24 @@
namespace Elastic.Ingest.Elasticsearch.Serialization;

/// <summary> TODO </summary>
public struct BulkHeader
public readonly struct BulkHeader
{
/// <summary> The index to write to, never set when writing using <see cref="DataStreamChannel{TEvent}"/> </summary>
public string? Index { get; set; }
public string? Index { get; init; }

/// <summary> The id of the object being written, never set when writing using <see cref="DataStreamChannel{TEvent}"/> </summary>
public string? Id { get; set; }
public string? Id { get; init; }

/// <summary> Require <see cref="Index"/> to point to an alias, never set when writing using <see cref="DataStreamChannel{TEvent}"/> </summary>
public bool? RequireAlias { get; set; }
public bool? RequireAlias { get; init; }

/// <summary> TODO </summary>
public Dictionary<string, string>? DynamicTemplates { get; init; }
/// <summary>
/// A map from the full name of fields to the name of dynamic templates. Defaults to an empty map. If a name matches a dynamic template,
/// then that template will be applied regardless of other match predicates defined in the template. And if a field is already defined
/// in the mapping, then this parameter won’t be used.
/// </summary>
public IDictionary<string, string>? DynamicTemplates { get; init; }

/// <summary> If true, the response will include the ingest pipelines that were executed. Defaults to false. </summary>
public bool? ListExecutedPipelines { get; init; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
namespace Elastic.Ingest.Elasticsearch.Serialization;

/// <summary> TODO </summary>
public enum HeaderSerialization
public enum HeaderSerializationStrategy
{
/// <summary> </summary>
Index,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public async Task EnsureDocumentsEndUpInDataStream()
var options = new DataStreamChannelOptions<TimeSeriesDocument>(Client.Transport)
{
DataStream = targetDataStream,
BufferOptions = new BufferOptions { WaitHandle = slim, OutboundBufferMaxSize = 1 }
BufferOptions = new BufferOptions { WaitHandle = slim, OutboundBufferMaxSize = 1 },
};
var channel = new DataStreamChannel<TimeSeriesDocument>(options);

Expand Down

0 comments on commit 33e5867

Please sign in to comment.