diff --git a/examples/AspNetCore/ControllerSample/Controllers/SampleController.cs b/examples/AspNetCore/ControllerSample/Controllers/SampleController.cs index 485614150..5b339288c 100644 --- a/examples/AspNetCore/ControllerSample/Controllers/SampleController.cs +++ b/examples/AspNetCore/ControllerSample/Controllers/SampleController.cs @@ -11,6 +11,8 @@ // limitations under the License. // ------------------------------------------------------------------------ +using System.Linq; + namespace ControllerSample.Controllers { using System; @@ -43,6 +45,7 @@ public SampleController(ILogger logger) /// State store name. /// public const string StoreName = "statestore"; + private readonly ILogger logger; /// @@ -72,6 +75,11 @@ public ActionResult Get([FromState(StoreName)] StateEntry acco [HttpPost("deposit")] public async Task> Deposit(Transaction transaction, [FromServices] DaprClient daprClient) { + // Example reading cloudevent properties from the headers + var headerEntries = Request.Headers.Aggregate("", (current, header) => current + ($"------- Header: {header.Key} : {header.Value}" + Environment.NewLine)); + + logger.LogInformation(headerEntries); + logger.LogInformation("Enter deposit"); var state = await daprClient.GetStateEntryAsync(StoreName, transaction.Id); state.Value ??= new Account() { Id = transaction.Id, }; @@ -83,7 +91,7 @@ public async Task> Deposit(Transaction transaction, [FromS } state.Value.Balance += transaction.Amount; - logger.LogInformation("Balance for Id {0} is {1}",state.Value.Id, state.Value.Balance); + logger.LogInformation("Balance for Id {0} is {1}", state.Value.Id, state.Value.Balance); await state.SaveAsync(); return state.Value; } @@ -98,22 +106,23 @@ public async Task> Deposit(Transaction transaction, [FromS [Topic("pubsub", "multideposit", "amountDeadLetterTopic", false)] [BulkSubscribe("multideposit", 500, 2000)] [HttpPost("multideposit")] - public async Task> MultiDeposit([FromBody] BulkSubscribeMessage> - bulkMessage, [FromServices] DaprClient daprClient) + public async Task> MultiDeposit([FromBody] + BulkSubscribeMessage> + bulkMessage, [FromServices] DaprClient daprClient) { logger.LogInformation("Enter bulk deposit"); - + List entries = new List(); foreach (var entry in bulkMessage.Entries) - { + { try { var transaction = entry.Event.Data; var state = await daprClient.GetStateEntryAsync(StoreName, transaction.Id); state.Value ??= new Account() { Id = transaction.Id, }; - logger.LogInformation("Id is {0}, the amount to be deposited is {1}", + logger.LogInformation("Id is {0}, the amount to be deposited is {1}", transaction.Id, transaction.Amount); if (transaction.Amount < 0m) @@ -124,12 +133,16 @@ public async Task> MultiDeposit([FromBody state.Value.Balance += transaction.Amount; logger.LogInformation("Balance is {0}", state.Value.Balance); await state.SaveAsync(); - entries.Add(new BulkSubscribeAppResponseEntry(entry.EntryId, BulkSubscribeAppResponseStatus.SUCCESS)); - } catch (Exception e) { + entries.Add( + new BulkSubscribeAppResponseEntry(entry.EntryId, BulkSubscribeAppResponseStatus.SUCCESS)); + } + catch (Exception e) + { logger.LogError(e.Message); entries.Add(new BulkSubscribeAppResponseEntry(entry.EntryId, BulkSubscribeAppResponseStatus.RETRY)); } } + return new BulkSubscribeAppResponse(entries); } @@ -165,6 +178,7 @@ public async Task> Withdraw(Transaction transaction, [From { return this.NotFound(); } + if (transaction.Amount < 0m) { return BadRequest(new { statusCode = 400, message = "bad request" }); @@ -185,7 +199,8 @@ public async Task> Withdraw(Transaction transaction, [From /// "pubsub", the first parameter into the Topic attribute, is name of the default pub/sub configured by the Dapr CLI. [Topic("pubsub", "withdraw", "event.type ==\"withdraw.v2\"", 1)] [HttpPost("withdraw.v2")] - public async Task> WithdrawV2(TransactionV2 transaction, [FromServices] DaprClient daprClient) + public async Task> WithdrawV2(TransactionV2 transaction, + [FromServices] DaprClient daprClient) { logger.LogInformation("Enter withdraw.v2"); if (transaction.Channel == "mobile" && transaction.Amount > 10000) @@ -214,12 +229,15 @@ public async Task> WithdrawV2(TransactionV2 transaction, [ /// "pubsub", the first parameter into the Topic attribute, is name of the default pub/sub configured by the Dapr CLI. [Topic("pubsub", "rawDeposit", true)] [HttpPost("rawDeposit")] - public async Task> RawDeposit([FromBody] JsonDocument rawTransaction, [FromServices] DaprClient daprClient) + public async Task> RawDeposit([FromBody] JsonDocument rawTransaction, + [FromServices] DaprClient daprClient) { var transactionString = rawTransaction.RootElement.GetProperty("data_base64").GetString(); - logger.LogInformation($"Enter deposit: {transactionString} - {Encoding.UTF8.GetString(Convert.FromBase64String(transactionString))}"); + logger.LogInformation( + $"Enter deposit: {transactionString} - {Encoding.UTF8.GetString(Convert.FromBase64String(transactionString))}"); var transactionJson = JsonSerializer.Deserialize(Convert.FromBase64String(transactionString)); - var transaction = JsonSerializer.Deserialize(transactionJson.RootElement.GetProperty("data").GetRawText()); + var transaction = + JsonSerializer.Deserialize(transactionJson.RootElement.GetProperty("data").GetRawText()); var state = await daprClient.GetStateEntryAsync(StoreName, transaction.Id); state.Value ??= new Account() { Id = transaction.Id, }; logger.LogInformation("Id is {0}, the amount to be deposited is {1}", transaction.Id, transaction.Amount); @@ -239,7 +257,8 @@ public async Task> RawDeposit([FromBody] JsonDocument rawT /// Method for returning a BadRequest result which will cause Dapr sidecar to throw an RpcException /// [HttpPost("throwException")] - public async Task> ThrowException(Transaction transaction, [FromServices] DaprClient daprClient) + public async Task> ThrowException(Transaction transaction, + [FromServices] DaprClient daprClient) { logger.LogInformation("Enter ThrowException"); var task = Task.Delay(10); diff --git a/examples/AspNetCore/ControllerSample/Startup.cs b/examples/AspNetCore/ControllerSample/Startup.cs index 64cfba512..ddc6d1c5f 100644 --- a/examples/AspNetCore/ControllerSample/Startup.cs +++ b/examples/AspNetCore/ControllerSample/Startup.cs @@ -11,8 +11,11 @@ // limitations under the License. // ------------------------------------------------------------------------ + +using Dapr; using Dapr.AspNetCore; + namespace ControllerSample { using Microsoft.AspNetCore.Builder; @@ -63,7 +66,10 @@ public void Configure(IApplicationBuilder app, IWebHostEnvironment env) app.UseRouting(); - app.UseCloudEvents(); + app.UseCloudEvents(new CloudEventsMiddlewareOptions + { + ForwardCloudEventPropertiesAsHeaders = true + }); app.UseAuthorization(); diff --git a/src/Dapr.AspNetCore/CloudEventPropertyNames.cs b/src/Dapr.AspNetCore/CloudEventPropertyNames.cs new file mode 100644 index 000000000..87e496004 --- /dev/null +++ b/src/Dapr.AspNetCore/CloudEventPropertyNames.cs @@ -0,0 +1,9 @@ +namespace Dapr +{ + internal static class CloudEventPropertyNames + { + public const string Data = "data"; + public const string DataContentType = "datacontenttype"; + public const string DataBase64 = "data_base64"; + } +} diff --git a/src/Dapr.AspNetCore/CloudEventsMiddleware.cs b/src/Dapr.AspNetCore/CloudEventsMiddleware.cs index 24c89cfed..eac526c26 100644 --- a/src/Dapr.AspNetCore/CloudEventsMiddleware.cs +++ b/src/Dapr.AspNetCore/CloudEventsMiddleware.cs @@ -11,6 +11,9 @@ // limitations under the License. // ------------------------------------------------------------------------ +using System.Collections.Generic; +using System.Linq; + namespace Dapr { using System; @@ -27,6 +30,15 @@ namespace Dapr internal class CloudEventsMiddleware { private const string ContentType = "application/cloudevents+json"; + + // These cloudevent properties are either containing the body of the message or + // are included in the headers by other components of Dapr earlier in the pipeline + private static readonly string[] ExcludedPropertiesFromHeaders = + { + CloudEventPropertyNames.DataContentType, CloudEventPropertyNames.Data, + CloudEventPropertyNames.DataBase64, "pubsubname", "traceparent" + }; + private readonly RequestDelegate next; private readonly CloudEventsMiddlewareOptions options; @@ -52,7 +64,7 @@ public Task InvokeAsync(HttpContext httpContext) // The philosophy here is that we don't report an error for things we don't support, because // that would block someone from implementing their own support for it. We only report an error // when something we do support isn't correct. - if (!this.MatchesContentType(httpContext, out var charSet)) + if (!MatchesContentType(httpContext, out var charSet)) { return this.next(httpContext); } @@ -69,7 +81,8 @@ private async Task ProcessBodyAsync(HttpContext httpContext, string charSet) } else { - using (var reader = new HttpRequestStreamReader(httpContext.Request.Body, Encoding.GetEncoding(charSet))) + using (var reader = + new HttpRequestStreamReader(httpContext.Request.Body, Encoding.GetEncoding(charSet))) { var text = await reader.ReadToEndAsync(); json = JsonSerializer.Deserialize(text); @@ -83,17 +96,43 @@ private async Task ProcessBodyAsync(HttpContext httpContext, string charSet) string contentType; // Check whether to use data or data_base64 as per https://github.com/cloudevents/spec/blob/v1.0.1/json-format.md#31-handling-of-data - var isDataSet = json.TryGetProperty("data", out var data); - var isBinaryDataSet = json.TryGetProperty("data_base64", out var binaryData); + // Get the property names by OrdinalIgnoreCase comparison to support case insensitive JSON as the Json Serializer for AspCore already supports it by default. + var jsonPropNames = json.EnumerateObject().ToArray(); + + var dataPropName = jsonPropNames + .Select(d => d.Name) + .FirstOrDefault(d => d.Equals(CloudEventPropertyNames.Data, StringComparison.OrdinalIgnoreCase)); + + var dataBase64PropName = jsonPropNames + .Select(d => d.Name) + .FirstOrDefault(d => + d.Equals(CloudEventPropertyNames.DataBase64, StringComparison.OrdinalIgnoreCase)); + + var isDataSet = false; + var isBinaryDataSet = false; + JsonElement data = default; + + if (dataPropName != null) + { + isDataSet = true; + data = json.TryGetProperty(dataPropName, out var dataJsonElement) ? dataJsonElement : data; + } + + if (dataBase64PropName != null) + { + isBinaryDataSet = true; + data = json.TryGetProperty(dataBase64PropName, out var dataJsonElement) ? dataJsonElement : data; + } if (isDataSet && isBinaryDataSet) { httpContext.Response.StatusCode = (int)HttpStatusCode.BadRequest; return; } - else if (isDataSet) + + if (isDataSet) { - contentType = this.GetDataContentType(json, out var isJson); + contentType = GetDataContentType(json, out var isJson); // If the value is anything other than a JSON string, treat it as JSON. Cloud Events requires // non-JSON text to be enclosed in a JSON string. @@ -109,8 +148,8 @@ private async Task ProcessBodyAsync(HttpContext httpContext, string charSet) { // Rehydrate body from contents of the string var text = data.GetString(); - using var writer = new HttpResponseStreamWriter(body, Encoding.UTF8); - writer.Write(text); + await using var writer = new HttpResponseStreamWriter(body, Encoding.UTF8); + await writer.WriteAsync(text); } body.Seek(0L, SeekOrigin.Begin); @@ -120,10 +159,10 @@ private async Task ProcessBodyAsync(HttpContext httpContext, string charSet) // As per the spec, if the implementation determines that the type of data is Binary, // the value MUST be represented as a JSON string expression containing the Base64 encoded // binary value, and use the member name data_base64 to store it inside the JSON object. - var decodedBody = binaryData.GetBytesFromBase64(); + var decodedBody = data.GetBytesFromBase64(); body = new MemoryStream(decodedBody); body.Seek(0L, SeekOrigin.Begin); - contentType = this.GetDataContentType(json, out _); + contentType = GetDataContentType(json, out _); } else { @@ -131,6 +170,8 @@ private async Task ProcessBodyAsync(HttpContext httpContext, string charSet) contentType = null; } + ForwardCloudEventPropertiesAsHeaders(httpContext, jsonPropNames); + originalBody = httpContext.Request.Body; originalContentType = httpContext.Request.ContentType; @@ -148,16 +189,57 @@ private async Task ProcessBodyAsync(HttpContext httpContext, string charSet) } } - private string GetDataContentType(JsonElement json, out bool isJson) + private void ForwardCloudEventPropertiesAsHeaders( + HttpContext httpContext, + IEnumerable jsonPropNames) + { + if (!options.ForwardCloudEventPropertiesAsHeaders) + { + return; + } + + var filteredPropertyNames = jsonPropNames + .Where(d => !ExcludedPropertiesFromHeaders.Contains(d.Name, StringComparer.OrdinalIgnoreCase)); + + if (options.IncludedCloudEventPropertiesAsHeaders != null) + { + filteredPropertyNames = filteredPropertyNames + .Where(d => options.IncludedCloudEventPropertiesAsHeaders + .Contains(d.Name, StringComparer.OrdinalIgnoreCase)); + } + else if (options.ExcludedCloudEventPropertiesFromHeaders != null) + { + filteredPropertyNames = filteredPropertyNames + .Where(d => !options.ExcludedCloudEventPropertiesFromHeaders + .Contains(d.Name, StringComparer.OrdinalIgnoreCase)); + } + + foreach (var jsonProperty in filteredPropertyNames) + { + httpContext.Request.Headers.TryAdd($"Cloudevent.{jsonProperty.Name.ToLowerInvariant()}", + jsonProperty.Value.GetRawText().Trim('\"')); + } + } + + private static string GetDataContentType(JsonElement json, out bool isJson) { + var dataContentTypePropName = json + .EnumerateObject() + .Select(d => d.Name) + .FirstOrDefault(d => + d.Equals(CloudEventPropertyNames.DataContentType, + StringComparison.OrdinalIgnoreCase)); + string contentType; - if (json.TryGetProperty("datacontenttype", out var dataContentType) && - dataContentType.ValueKind == JsonValueKind.String && - MediaTypeHeaderValue.TryParse(dataContentType.GetString(), out var parsed)) + + if (dataContentTypePropName != null + && json.TryGetProperty(dataContentTypePropName, out var dataContentType) + && dataContentType.ValueKind == JsonValueKind.String + && MediaTypeHeaderValue.TryParse(dataContentType.GetString(), out var parsed)) { contentType = dataContentType.GetString(); - isJson = - parsed.MediaType.Equals( "application/json", StringComparison.Ordinal) || + isJson = + parsed.MediaType.Equals("application/json", StringComparison.Ordinal) || parsed.Suffix.EndsWith("+json", StringComparison.Ordinal); // Since S.T.Json always outputs utf-8, we may need to normalize the data content type @@ -179,7 +261,7 @@ private string GetDataContentType(JsonElement json, out bool isJson) return contentType; } - private bool MatchesContentType(HttpContext httpContext, out string charSet) + private static bool MatchesContentType(HttpContext httpContext, out string charSet) { if (httpContext.Request.ContentType == null) { diff --git a/src/Dapr.AspNetCore/CloudEventsMiddlewareOptions.cs b/src/Dapr.AspNetCore/CloudEventsMiddlewareOptions.cs index 251a939a7..84e68adb5 100644 --- a/src/Dapr.AspNetCore/CloudEventsMiddlewareOptions.cs +++ b/src/Dapr.AspNetCore/CloudEventsMiddlewareOptions.cs @@ -29,9 +29,49 @@ public class CloudEventsMiddlewareOptions /// instead of the expected JSON-decoded value of Hello, "world!". /// /// - /// Setting this property to true restores the previous invalid behavior for compatiblity. + /// Setting this property to true restores the previous invalid behavior for compatibility. /// /// public bool SuppressJsonDecodingOfTextPayloads { get; set; } + + /// + /// Gets or sets a value that will determine whether the CloudEvent properties will be forwarded as Request Headers. + /// + /// + /// + /// Setting this property to true will forward all the CloudEvent properties as Request Headers. + /// For more fine grained control of which properties are forwarded you can use either or . + /// + /// + /// Property names will always be prefixed with 'Cloudevent.' and be lower case in the following format:"Cloudevent.type" + /// + /// + /// ie. A CloudEvent property "type": "Example.Type" will be added as "Cloudevent.type": "Example.Type" request header. + /// + /// + public bool ForwardCloudEventPropertiesAsHeaders { get; set; } + + /// + /// Gets or sets an array of CloudEvent property names that will be forwarded as Request Headers if is set to true. + /// + /// + /// + /// Note: Setting this will only forwarded the listed property names. + /// + /// + /// ie: ["type", "subject"] + /// + /// + public string[] IncludedCloudEventPropertiesAsHeaders { get; set; } + + /// + /// Gets or sets an array of CloudEvent property names that will not be forwarded as Request Headers if is set to true. + /// + /// + /// + /// ie: ["type", "subject"] + /// + /// + public string[] ExcludedCloudEventPropertiesFromHeaders { get; set; } } } diff --git a/test/Dapr.AspNetCore.Test/CloudEventsMiddlewareTest.cs b/test/Dapr.AspNetCore.Test/CloudEventsMiddlewareTest.cs index 904f6648f..c8a5ff402 100644 --- a/test/Dapr.AspNetCore.Test/CloudEventsMiddlewareTest.cs +++ b/test/Dapr.AspNetCore.Test/CloudEventsMiddlewareTest.cs @@ -84,7 +84,151 @@ public async Task InvokeAsync_ReplacesBodyJson(string dataContentType, string ch await pipeline.Invoke(context); } + + [Theory] + [InlineData(null, null)] // assumes application/json + utf8 + [InlineData("application/json", null)] // assumes utf8 + [InlineData("application/json", "utf-8")] + [InlineData("application/json", "UTF-8")] + [InlineData("application/person+json", "UTF-16")] // arbitrary content type and charset + public async Task InvokeAsync_ReplacesPascalCasedBodyJson(string dataContentType, string charSet) + { + var encoding = charSet == null ? null : Encoding.GetEncoding(charSet); + var app = new ApplicationBuilder(null); + app.UseCloudEvents(); + // Do verification in the scope of the middleware + app.Run(httpContext => + { + httpContext.Request.ContentType.Should().Be(dataContentType ?? "application/json"); + ReadBody(httpContext.Request.Body).Should().Be("{\"name\":\"jimmy\"}"); + return Task.CompletedTask; + }); + + var pipeline = app.Build(); + + var context = new DefaultHttpContext(); + context.Request.ContentType = charSet == null ? "application/cloudevents+json" : $"application/cloudevents+json;charset={charSet}"; + context.Request.Body = dataContentType == null ? + MakeBody("{ \"Data\": { \"name\":\"jimmy\" } }", encoding) : + MakeBody($"{{ \"DataContentType\": \"{dataContentType}\", \"Data\": {{ \"name\":\"jimmy\" }} }}", encoding); + + await pipeline.Invoke(context); + } + + [Theory] + [InlineData(null, null)] // assumes application/json + utf8 + [InlineData("application/json", null)] // assumes utf8 + [InlineData("application/json", "utf-8")] + [InlineData("application/json", "UTF-8")] + [InlineData("application/person+json", "UTF-16")] // arbitrary content type and charset + public async Task InvokeAsync_ForwardsJsonPropertiesAsHeaders(string dataContentType, string charSet) + { + var encoding = charSet == null ? null : Encoding.GetEncoding(charSet); + var app = new ApplicationBuilder(null); + app.UseCloudEvents(new CloudEventsMiddlewareOptions + { + ForwardCloudEventPropertiesAsHeaders = true + }); + + // Do verification in the scope of the middleware + app.Run(httpContext => + { + httpContext.Request.ContentType.Should().Be(dataContentType ?? "application/json"); + ReadBody(httpContext.Request.Body).Should().Be("{\"name\":\"jimmy\"}"); + + httpContext.Request.Headers.Should().ContainKey("Cloudevent.type").WhichValue.Should().BeEquivalentTo("Test.Type"); + httpContext.Request.Headers.Should().ContainKey("Cloudevent.subject").WhichValue.Should().BeEquivalentTo("Test.Subject"); + return Task.CompletedTask; + }); + + var pipeline = app.Build(); + + var context = new DefaultHttpContext(); + context.Request.ContentType = charSet == null ? "application/cloudevents+json" : $"application/cloudevents+json;charset={charSet}"; + context.Request.Body = dataContentType == null ? + MakeBody("{ \"type\": \"Test.Type\", \"subject\": \"Test.Subject\", \"data\": { \"name\":\"jimmy\" } }", encoding) : + MakeBody($"{{ \"datacontenttype\": \"{dataContentType}\", \"type\":\"Test.Type\", \"subject\": \"Test.Subject\", \"data\": {{ \"name\":\"jimmy\" }} }}", encoding); + + await pipeline.Invoke(context); + } + + [Theory] + [InlineData(null, null)] // assumes application/json + utf8 + [InlineData("application/json", null)] // assumes utf8 + [InlineData("application/json", "utf-8")] + [InlineData("application/json", "UTF-8")] + [InlineData("application/person+json", "UTF-16")] // arbitrary content type and charset + public async Task InvokeAsync_ForwardsIncludedJsonPropertiesAsHeaders(string dataContentType, string charSet) + { + var encoding = charSet == null ? null : Encoding.GetEncoding(charSet); + var app = new ApplicationBuilder(null); + app.UseCloudEvents(new CloudEventsMiddlewareOptions + { + ForwardCloudEventPropertiesAsHeaders = true, + IncludedCloudEventPropertiesAsHeaders = new []{"type"} + }); + + // Do verification in the scope of the middleware + app.Run(httpContext => + { + httpContext.Request.ContentType.Should().Be(dataContentType ?? "application/json"); + ReadBody(httpContext.Request.Body).Should().Be("{\"name\":\"jimmy\"}"); + + httpContext.Request.Headers.Should().ContainKey("Cloudevent.type").WhichValue.Should().BeEquivalentTo("Test.Type"); + httpContext.Request.Headers.Should().NotContainKey("Cloudevent.subject"); + return Task.CompletedTask; + }); + + var pipeline = app.Build(); + + var context = new DefaultHttpContext(); + context.Request.ContentType = charSet == null ? "application/cloudevents+json" : $"application/cloudevents+json;charset={charSet}"; + context.Request.Body = dataContentType == null ? + MakeBody("{ \"type\": \"Test.Type\", \"subject\": \"Test.Subject\", \"data\": { \"name\":\"jimmy\" } }", encoding) : + MakeBody($"{{ \"datacontenttype\": \"{dataContentType}\", \"type\":\"Test.Type\", \"subject\": \"Test.Subject\", \"data\": {{ \"name\":\"jimmy\" }} }}", encoding); + + await pipeline.Invoke(context); + } + + [Theory] + [InlineData(null, null)] // assumes application/json + utf8 + [InlineData("application/json", null)] // assumes utf8 + [InlineData("application/json", "utf-8")] + [InlineData("application/json", "UTF-8")] + [InlineData("application/person+json", "UTF-16")] // arbitrary content type and charset + public async Task InvokeAsync_DoesNotForwardExcludedJsonPropertiesAsHeaders(string dataContentType, string charSet) + { + var encoding = charSet == null ? null : Encoding.GetEncoding(charSet); + var app = new ApplicationBuilder(null); + app.UseCloudEvents(new CloudEventsMiddlewareOptions + { + ForwardCloudEventPropertiesAsHeaders = true, + ExcludedCloudEventPropertiesFromHeaders = new []{"type"} + }); + + // Do verification in the scope of the middleware + app.Run(httpContext => + { + httpContext.Request.ContentType.Should().Be(dataContentType ?? "application/json"); + ReadBody(httpContext.Request.Body).Should().Be("{\"name\":\"jimmy\"}"); + + httpContext.Request.Headers.Should().NotContainKey("Cloudevent.type"); + httpContext.Request.Headers.Should().ContainKey("Cloudevent.subject").WhichValue.Should().BeEquivalentTo("Test.Subject"); + return Task.CompletedTask; + }); + + var pipeline = app.Build(); + + var context = new DefaultHttpContext(); + context.Request.ContentType = charSet == null ? "application/cloudevents+json" : $"application/cloudevents+json;charset={charSet}"; + context.Request.Body = dataContentType == null ? + MakeBody("{ \"type\": \"Test.Type\", \"subject\": \"Test.Subject\", \"data\": { \"name\":\"jimmy\" } }", encoding) : + MakeBody($"{{ \"datacontenttype\": \"{dataContentType}\", \"type\":\"Test.Type\", \"subject\": \"Test.Subject\", \"data\": {{ \"name\":\"jimmy\" }} }}", encoding); + + await pipeline.Invoke(context); + } + [Fact] public async Task InvokeAsync_ReplacesBodyNonJsonData() {