Skip to content

Commit

Permalink
Refactor OTel instrumentation (#100)
Browse files Browse the repository at this point in the history
* Add static `ActivitySource` class
* Support OTel data and refactor overloads in HttpTransport
* Refactor to avoid complex code duplication
* Add OTel data to RequestData
* Update reference implementation to align with semantic conventions
* Update dependency versions
  • Loading branch information
stevejgordon authored Sep 12, 2023
1 parent a834975 commit 1fa22c9
Show file tree
Hide file tree
Showing 40 changed files with 1,289 additions and 661 deletions.
6 changes: 5 additions & 1 deletion Playground/Playground.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,15 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Elastic.Clients.Elasticsearch" Version="8.0.4" />
<PackageReference Include="Elastic.Clients.Elasticsearch" Version="8.9.2" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\src\Elastic.Transport\Elastic.Transport.csproj" />
</ItemGroup>

<ItemGroup>
<PackageReference Update="Microsoft.NETFramework.ReferenceAssemblies.net461" Version="1.0.3" />
</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,8 @@
<ProjectReference Include="..\..\src\Elastic.Transport\Elastic.Transport.csproj" />
</ItemGroup>

<ItemGroup>
<PackageReference Update="Microsoft.NETFramework.ReferenceAssemblies.net461" Version="1.0.3" />
</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,8 @@
<ProjectReference Include="..\..\src\Elastic.Transport\Elastic.Transport.csproj" />
</ItemGroup>

<ItemGroup>
<PackageReference Update="Microsoft.NETFramework.ReferenceAssemblies.net461" Version="1.0.3" />
</ItemGroup>

</Project>
4 changes: 3 additions & 1 deletion benchmarks/Elastic.Transport.Profiling/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ private static async Task Main()
MemoryProfiler.CollectAllocations(true);
MemoryProfiler.GetSnapshot("start");

var config = new TransportConfiguration(new Uri("http://localhost:9200"), new ElasticsearchProductRegistration());
var config = new TransportConfiguration(new Uri("http://localhost:9200"),
new ElasticsearchProductRegistration(typeof(ElasticsearchProductRegistration)));

var transport = new DefaultHttpTransport(config);

// WARMUP
Expand Down
4 changes: 4 additions & 0 deletions build/scripts/scripts.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,8 @@
<Compile Include="Program.fs" />
</ItemGroup>

<ItemGroup>
<PackageReference Update="Microsoft.NETFramework.ReferenceAssemblies.net461" Version="1.0.3" />
</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,8 @@
<ItemGroup>
<ProjectReference Include="..\Elastic.Transport\Elastic.Transport.csproj" />
</ItemGroup>
<ItemGroup>
<PackageReference Update="Microsoft.NETFramework.ReferenceAssemblies.net461" Version="1.0.3" />
<PackageReference Update="Microsoft.SourceLink.GitHub" Version="1.1.1" />
</ItemGroup>
</Project>
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ private void Initialize(IEnumerable<Node> nodes, DateTimeProvider dateTimeProvid
UsingSsl = scheme == "https";
}
else if (scheme != node.Uri.Scheme)
// TODO - Diagnostic event here
throw new ArgumentException("Trying to instantiate a connection pool with mixed URI Schemes");
}

Expand Down
214 changes: 43 additions & 171 deletions src/Elastic.Transport/Components/Pipeline/DefaultRequestPipeline.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Threading;
Expand All @@ -14,10 +13,6 @@
using Elastic.Transport.Products;
using static Elastic.Transport.Diagnostics.Auditing.AuditEvent;

//#if NETSTANDARD2_0 || NETSTANDARD2_1
//using System.Threading.Tasks.Extensions;
//#endif

namespace Elastic.Transport;

/// <inheritdoc cref="RequestPipeline" />
Expand All @@ -33,10 +28,8 @@ public class DefaultRequestPipeline<TConfiguration> : RequestPipeline
private readonly TConfiguration _settings;
private readonly ResponseBuilder _responseBuilder;

private static readonly ActivitySource _activitySource = new("Elastic.Transport.RequestPipeline");

private RequestConfiguration? _pingAndSniffRequestConfiguration;
private List<Audit> _auditTrail = null;
private List<Audit> _auditTrail = null;

/// <inheritdoc cref="RequestPipeline" />
internal DefaultRequestPipeline(
Expand Down Expand Up @@ -97,7 +90,7 @@ public override bool IsTakingTooLong
var timeout = _settings.MaxRetryTimeout.GetValueOrDefault(RequestTimeout);
var now = _dateTimeProvider.Now();

//we apply a soft margin so that if a request timesout at 59 seconds when the maximum is 60 we also abort.
//we apply a soft margin so that if a request times out at 59 seconds when the maximum is 60 we also abort.
var margin = timeout.TotalMilliseconds / 100.0 * 98;
var marginTimeSpan = TimeSpan.FromMilliseconds(margin);
var timespanCall = now - StartedOn;
Expand Down Expand Up @@ -174,86 +167,27 @@ public override void BadResponse<TResponse>(ref TResponse response, ApiCallDetai
}

public override TResponse CallProductEndpoint<TResponse>(RequestData requestData)
{
using var audit = Audit(HealthyResponse, requestData.Node);

if (audit is not null)
audit.PathAndQuery = requestData.PathAndQuery;

var activity = Activity.Current;
var isElasticClient = activity is not null && activity.GetCustomProperty("elastic.transport.client") is not null;

if (!isElasticClient)
{
activity = _activitySource.StartActivity($"Elastic.Transport: HTTP {requestData.Method}", ActivityKind.Client);
}

activity?.AddTag("http.method", requestData.Method);
activity?.AddTag("http.url", requestData.Uri.AbsoluteUri);
activity?.AddTag("net.peer.name", requestData.Uri.Host);
activity?.AddTag("net.peer.port", requestData.Uri.Port);

try
{
var response = _transportClient.Request<TResponse>(requestData);
=> CallProductEndpointCoreAsync<TResponse>(false, requestData).EnsureCompleted();

#if NET6_0_OR_GREATER
activity?.SetStatus(response.ApiCallDetails.HasSuccessfulStatusCodeAndExpectedContentType ? ActivityStatusCode.Ok : ActivityStatusCode.Error);
#endif

activity?.AddTag("http.status_code", response.ApiCallDetails.HttpStatusCode);

response.ApiCallDetails.AuditTrail = AuditTrail;

ThrowBadAuthPipelineExceptionWhenNeeded(response.ApiCallDetails, response);

if (!response.ApiCallDetails.HasSuccessfulStatusCodeAndExpectedContentType && audit is not null)
audit.Event = requestData.OnFailureAuditEvent;

return response;
}
catch (Exception e) when (audit is not null)
{
audit.Event = requestData.OnFailureAuditEvent;
audit.Exception = e;
throw;
}
finally
{
if (!isElasticClient)
activity?.Dispose();
}
}
public override Task<TResponse> CallProductEndpointAsync<TResponse>(RequestData requestData, CancellationToken cancellationToken = default)
=> CallProductEndpointCoreAsync<TResponse>(true, requestData, cancellationToken).AsTask();

public override async Task<TResponse> CallProductEndpointAsync<TResponse>(RequestData requestData, CancellationToken cancellationToken)
private async ValueTask<TResponse> CallProductEndpointCoreAsync<TResponse>(bool isAsync, RequestData requestData, CancellationToken cancellationToken = default)
where TResponse : TransportResponse, new()
{
using var audit = Audit(HealthyResponse, requestData.Node);

if (audit is not null)
audit.PathAndQuery = requestData.PathAndQuery;

var activity = Activity.Current;
var isElasticClient = activity is not null && activity.GetCustomProperty("elastic.transport.client") is not null;

if (!isElasticClient)
{
activity = _activitySource.StartActivity($"Elastic.Transport: HTTP {requestData.Method}", ActivityKind.Client);
}

activity?.AddTag("http.method", requestData.Method);
activity?.AddTag("http.url", requestData.Uri.AbsoluteUri);
activity?.AddTag("net.peer.name", requestData.Uri.Host);
activity?.AddTag("net.peer.port", requestData.Uri.Port);

try
{
var response = await _transportClient.RequestAsync<TResponse>(requestData, cancellationToken).ConfigureAwait(false);
TResponse response;

#if NET6_0_OR_GREATER
activity?.SetStatus(response.ApiCallDetails.HasSuccessfulStatusCodeAndExpectedContentType ? ActivityStatusCode.Ok : ActivityStatusCode.Error);
#endif

activity?.AddTag("http.status_code", response.ApiCallDetails.HttpStatusCode);
if (isAsync)
response = await _transportClient.RequestAsync<TResponse>(requestData, cancellationToken).ConfigureAwait(false);
else
response = _transportClient.Request<TResponse>(requestData);

response.ApiCallDetails.AuditTrail = AuditTrail;

Expand All @@ -270,11 +204,6 @@ public override async Task<TResponse> CallProductEndpointAsync<TResponse>(Reques
audit.Exception = e;
throw;
}
finally
{
if (!isElasticClient)
activity?.Dispose();
}
}

public override TransportException? CreateClientException<TResponse>(TResponse response, ApiCallDetails? callDetails,
Expand Down Expand Up @@ -459,42 +388,12 @@ public override IEnumerable<Node> NextNode()
}
}

public override void Ping(Node node)
{
if (!_productRegistration.SupportsPing) return;
if (PingDisabled(node)) return;

var pingData = _productRegistration.CreatePingRequestData(node, PingAndSniffRequestConfiguration, _settings, _memoryStreamFactory);

using var audit = Audit(PingSuccess, node);
public override void Ping(Node node) => PingCoreAsync(false, node).EnsureCompleted();

if (audit is not null)
audit.PathAndQuery = pingData.PathAndQuery;
public override Task PingAsync(Node node, CancellationToken cancellationToken = default)
=> PingCoreAsync(true, node, cancellationToken).AsTask();

try
{
var response = _productRegistration.Ping(_transportClient, pingData);

ThrowBadAuthPipelineExceptionWhenNeeded(response.ApiCallDetails);

//ping should not silently accept bad but valid http responses
if (!response.ApiCallDetails.HasSuccessfulStatusCodeAndExpectedContentType)
throw new PipelineException(pingData.OnFailurePipelineFailure, response.ApiCallDetails.OriginalException) { Response = response };
}
catch (Exception e)
{
var response = (e as PipelineException)?.Response;

if (audit is not null)
{
audit.Event = PingFailure;
audit.Exception = e;
}
throw new PipelineException(PipelineFailure.PingFailure, e) { Response = response };
}
}

public override async Task PingAsync(Node node, CancellationToken cancellationToken)
public async ValueTask PingCoreAsync(bool isAsync, Node node, CancellationToken cancellationToken = default)
{
if (!_productRegistration.SupportsPing) return;
if (PingDisabled(node)) return;
Expand All @@ -506,9 +405,14 @@ public override async Task PingAsync(Node node, CancellationToken cancellationTo
if (audit is not null)
audit.PathAndQuery = pingData.PathAndQuery;

TransportResponse response;

try
{
var response = await _productRegistration.PingAsync(_transportClient, pingData, cancellationToken).ConfigureAwait(false);
if (isAsync)
response = await _productRegistration.PingAsync(_transportClient, pingData, cancellationToken).ConfigureAwait(false);
else
response = _productRegistration.Ping(_transportClient, pingData);

ThrowBadAuthPipelineExceptionWhenNeeded(response.ApiCallDetails);

Expand All @@ -518,7 +422,7 @@ public override async Task PingAsync(Node node, CancellationToken cancellationTo
}
catch (Exception e)
{
var response = (e as PipelineException)?.Response;
response = (e as PipelineException)?.Response;
if (audit is not null)
{
audit.Event = PingFailure;
Expand All @@ -528,51 +432,12 @@ public override async Task PingAsync(Node node, CancellationToken cancellationTo
}
}

public override void Sniff()
{
if (!_productRegistration.SupportsSniff) return;

var exceptions = new List<Exception>();

foreach (var node in SniffNodes)
{
var requestData =
_productRegistration.CreateSniffRequestData(node, PingAndSniffRequestConfiguration, _settings, _memoryStreamFactory);
public override void Sniff() => SniffCoreAsync(false).EnsureCompleted();

using var audit = Audit(SniffSuccess, node);
public override Task SniffAsync(CancellationToken cancellationToken = default)
=> SniffCoreAsync(true, cancellationToken).AsTask();

if (audit is not null)
audit.PathAndQuery = requestData.PathAndQuery;

try
{
var (response, nodes) = _productRegistration.Sniff(_transportClient, _nodePool.UsingSsl, requestData);

ThrowBadAuthPipelineExceptionWhenNeeded(response.ApiCallDetails);

//sniff should not silently accept bad but valid http responses
if (!response.ApiCallDetails.HasSuccessfulStatusCodeAndExpectedContentType)
throw new PipelineException(requestData.OnFailurePipelineFailure, response.ApiCallDetails.OriginalException) { Response = response };

_nodePool.Reseed(nodes);
Refresh = true;
return;
}
catch (Exception e)
{
if (audit is not null)
{
audit.Event = SniffFailure;
audit.Exception = e;
}
exceptions.Add(e);
}
}

throw new PipelineException(PipelineFailure.SniffFailure, exceptions.AsAggregateOrFirst());
}

public override async Task SniffAsync(CancellationToken cancellationToken)
public async ValueTask SniffCoreAsync(bool isAsync, CancellationToken cancellationToken = default)
{
if (!_productRegistration.SupportsSniff) return;

Expand All @@ -588,20 +453,27 @@ public override async Task SniffAsync(CancellationToken cancellationToken)
if (audit is not null)
audit.PathAndQuery = requestData.PathAndQuery;

Tuple<TransportResponse, IReadOnlyCollection<Node>> result;

try
{
var (response, nodes) = await _productRegistration
.SniffAsync(_transportClient, _nodePool.UsingSsl, requestData, cancellationToken)
.ConfigureAwait(false);
if (isAsync)
result = await _productRegistration
.SniffAsync(_transportClient, _nodePool.UsingSsl, requestData, cancellationToken)
.ConfigureAwait(false);
else
result = _productRegistration
.Sniff(_transportClient, _nodePool.UsingSsl, requestData);

ThrowBadAuthPipelineExceptionWhenNeeded(response.ApiCallDetails);
ThrowBadAuthPipelineExceptionWhenNeeded(result.Item1.ApiCallDetails);

//sniff should not silently accept bad but valid http responses
if (!response.ApiCallDetails.HasSuccessfulStatusCodeAndExpectedContentType)
throw new PipelineException(requestData.OnFailurePipelineFailure, response.ApiCallDetails.OriginalException) { Response = response };
if (!result.Item1.ApiCallDetails.HasSuccessfulStatusCodeAndExpectedContentType)
throw new PipelineException(requestData.OnFailurePipelineFailure, result.Item1.ApiCallDetails.OriginalException) { Response = result.Item1 };

_nodePool.Reseed(nodes);
_nodePool.Reseed(result.Item2);
Refresh = true;

return;
}
catch (Exception e)
Expand All @@ -613,9 +485,9 @@ public override async Task SniffAsync(CancellationToken cancellationToken)
}
exceptions.Add(e);
}
}

throw new PipelineException(PipelineFailure.SniffFailure, exceptions.AsAggregateOrFirst());
throw new PipelineException(PipelineFailure.SniffFailure, exceptions.AsAggregateOrFirst());
}
}

public override void SniffOnConnectionFailure()
Expand Down
Loading

0 comments on commit 1fa22c9

Please sign in to comment.