Skip to content

Commit

Permalink
Custom block code and msg (#398)
Browse files Browse the repository at this point in the history
* TODOs

* add FilterResult and custom status code and msg for blocking

* add response msg and tests

* fix tests

* revert grpc changes
  • Loading branch information
shashank11p authored Jan 3, 2024
1 parent 2a24d17 commit 4570eef
Show file tree
Hide file tree
Showing 21 changed files with 179 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.opentelemetry.api.trace.Span;
import java.util.List;
import java.util.Map;
import org.hypertrace.agent.core.filter.FilterResult;
import org.hypertrace.agent.filter.api.Filter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -34,12 +35,12 @@ public MultiFilter(List<Filter> filters) {
}

@Override
public boolean evaluateRequestHeaders(Span span, Map<String, String> headers) {
boolean shouldBlock = false;
public FilterResult evaluateRequestHeaders(Span span, Map<String, String> headers) {
for (Filter filter : filters) {
try {
if (filter.evaluateRequestHeaders(span, headers)) {
shouldBlock = true;
FilterResult filterResult = filter.evaluateRequestHeaders(span, headers);
if (filterResult.shouldBlock()) {
return filterResult;
}
} catch (Throwable t) {
logger.warn(
Expand All @@ -48,16 +49,16 @@ public boolean evaluateRequestHeaders(Span span, Map<String, String> headers) {
t);
}
}
return shouldBlock;
return new FilterResult(false, 0, "");
}

@Override
public boolean evaluateRequestBody(Span span, String body, Map<String, String> headers) {
boolean shouldBlock = false;
public FilterResult evaluateRequestBody(Span span, String body, Map<String, String> headers) {
for (Filter filter : filters) {
try {
if (filter.evaluateRequestBody(span, body, headers)) {
shouldBlock = true;
FilterResult filterResult = filter.evaluateRequestBody(span, body, headers);
if (filterResult.shouldBlock()) {
return filterResult;
}
} catch (Throwable t) {
logger.warn(
Expand All @@ -66,6 +67,6 @@ public boolean evaluateRequestBody(Span span, String body, Map<String, String> h
t);
}
}
return shouldBlock;
return new FilterResult(false, 0, "");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import io.opentelemetry.api.trace.Span;
import java.util.Map;
import org.hypertrace.agent.core.filter.FilterResult;
import org.hypertrace.agent.filter.FilterRegistry;

/**
Expand All @@ -32,7 +33,7 @@ public interface Filter {
* @param headers are used for blocking evaluation.
* @return filter result
*/
boolean evaluateRequestHeaders(Span span, Map<String, String> headers);
FilterResult evaluateRequestHeaders(Span span, Map<String, String> headers);

/**
* Evaluate the execution.
Expand All @@ -42,5 +43,5 @@ public interface Filter {
* @param headers of the request associated with this body
* @return filter result
*/
boolean evaluateRequestBody(Span span, String body, Map<String, String> headers);
FilterResult evaluateRequestBody(Span span, String body, Map<String, String> headers);
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import io.opentelemetry.javaagent.instrumentation.hypertrace.grpc.v1_6.GrpcSpanDecorator;
import java.util.Map;
import org.hypertrace.agent.core.config.InstrumentationConfig;
import org.hypertrace.agent.core.filter.FilterResult;
import org.hypertrace.agent.core.instrumentation.HypertraceSemanticAttributes;
import org.hypertrace.agent.filter.FilterRegistry;
import org.slf4j.Logger;
Expand Down Expand Up @@ -57,8 +58,11 @@ public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
GrpcSpanDecorator.addMetadataAttributes(mapHeaders, currentSpan);
}

boolean block = FilterRegistry.getFilter().evaluateRequestHeaders(currentSpan, mapHeaders);
if (block) {
FilterResult filterResult =
FilterRegistry.getFilter().evaluateRequestHeaders(currentSpan, mapHeaders);
if (filterResult.shouldBlock()) {
// We cannot send custom message in grpc calls
// TODO: map http codes with grpc codes. filterResult.getBlockingStatusCode()
call.close(Status.PERMISSION_DENIED, new Metadata());
@SuppressWarnings("unchecked")
ServerCall.Listener<ReqT> noop = NoopServerCallListener.INSTANCE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ public void serverRequestBlocking() throws TimeoutException, InterruptedExceptio
try {
Response response = blockingStub.sayHello(REQUEST);
} catch (StatusRuntimeException ex) {
Assertions.assertEquals(Status.PERMISSION_DENIED, ex.getStatus());
Assertions.assertEquals(Status.PERMISSION_DENIED.getCode(), ex.getStatus().getCode());
}

TEST_WRITER.waitForSpans(2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ public void blocking() throws IOException, TimeoutException, InterruptedExceptio

try (Response response = httpClient.newCall(request).execute()) {
Assertions.assertEquals(403, response.code());
Assertions.assertTrue(response.body().string().isEmpty());
Assertions.assertEquals("Hypertrace Blocked Request", response.body().string());
}

List<List<SpanData>> traces = TEST_WRITER.getTraces();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ public void blocking() throws IOException, TimeoutException, InterruptedExceptio

try (Response response = httpClient.newCall(request).execute()) {
Assertions.assertEquals(403, response.code());
Assertions.assertTrue(response.body().string().isEmpty());
Assertions.assertEquals("Hypertrace Blocked Request", response.body().string());
}

List<List<SpanData>> traces = TEST_WRITER.getTraces();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package io.opentelemetry.javaagent.instrumentation.hypertrace.netty.v4_0.server;

import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
Expand All @@ -30,7 +31,9 @@
import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.netty.v4.common.HttpRequestAndChannel;
import io.opentelemetry.javaagent.instrumentation.hypertrace.netty.v4_0.AttributeKeys;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import org.hypertrace.agent.core.filter.FilterResult;
import org.hypertrace.agent.filter.FilterRegistry;

public class HttpServerBlockingRequestHandler extends ChannelInboundHandlerAdapter {
Expand All @@ -52,26 +55,37 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
if (msg instanceof HttpRequest) {
Attribute<Map<String, String>> headersAttr = channel.attr(AttributeKeys.REQUEST_HEADERS);
Map<String, String> headers = headersAttr.getAndRemove();
if (headers != null && FilterRegistry.getFilter().evaluateRequestHeaders(span, headers)) {
forbidden(ctx, (HttpRequest) msg);
return;
if (headers != null) {
FilterResult filterResult =
FilterRegistry.getFilter().evaluateRequestHeaders(span, headers);
if (filterResult.shouldBlock()) {
forbidden(ctx, (HttpRequest) msg, filterResult);
return;
}
}
}
if (msg instanceof HttpContent) {
if (FilterRegistry.getFilter().evaluateRequestBody(span, null, null)) {
FilterResult filterResult = FilterRegistry.getFilter().evaluateRequestBody(span, null, null);
if (filterResult.shouldBlock()) {
Attribute<?> requestAttr = channel.attr(AttributeKeys.REQUEST);
HttpRequest req = ((HttpRequestAndChannel) (requestAttr.get())).request();
forbidden(ctx, req);
forbidden(ctx, req, filterResult);
return;
}
}
ctx.fireChannelRead(msg);
}

static void forbidden(ChannelHandlerContext ctx, HttpRequest request) {
static void forbidden(ChannelHandlerContext ctx, HttpRequest request, FilterResult filterResult) {
DefaultFullHttpResponse blockResponse =
new DefaultFullHttpResponse(request.getProtocolVersion(), HttpResponseStatus.FORBIDDEN);
blockResponse.headers().add("Content-Length", "0");
new DefaultFullHttpResponse(
request.getProtocolVersion(),
new HttpResponseStatus(
filterResult.getBlockingStatusCode(), HttpResponseStatus.FORBIDDEN.reasonPhrase()),
Unpooled.copiedBuffer(filterResult.getBlockingMsg().getBytes(StandardCharsets.UTF_8)));
blockResponse
.headers()
.add("Content-Length", String.valueOf(filterResult.getBlockingMsg().length()));
ReferenceCountUtil.release(request);
ctx.writeAndFlush(blockResponse).addListener(ChannelFutureListener.CLOSE);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ public void blocking() throws IOException, TimeoutException, InterruptedExceptio

try (Response response = httpClient.newCall(request).execute()) {
Assertions.assertEquals(403, response.code());
Assertions.assertTrue(response.body().string().isEmpty());
Assertions.assertEquals("Hypertrace Blocked Request", response.body().string());
}

List<List<SpanData>> traces = TEST_WRITER.getTraces();
Expand All @@ -168,9 +168,7 @@ public void blocking() throws IOException, TimeoutException, InterruptedExceptio
.getAttributes()
.get(HypertraceSemanticAttributes.httpResponseHeader(RESPONSE_HEADER_NAME)));
Assertions.assertNull(
spanData
.getAttributes()
.get(HypertraceSemanticAttributes.httpResponseHeader(RESPONSE_BODY)));
spanData.getAttributes().get(HypertraceSemanticAttributes.HTTP_RESPONSE_BODY));

RequestBody requestBody = blockedRequestBody(true, 3000, 75);
Request request2 =
Expand All @@ -182,7 +180,7 @@ public void blocking() throws IOException, TimeoutException, InterruptedExceptio

try (Response response = httpClient.newCall(request2).execute()) {
Assertions.assertEquals(403, response.code());
Assertions.assertTrue(response.body().string().isEmpty());
Assertions.assertEquals("Hypertrace Blocked Request", response.body().string());
}

List<List<SpanData>> traces2 = TEST_WRITER.getTraces();
Expand All @@ -202,9 +200,7 @@ public void blocking() throws IOException, TimeoutException, InterruptedExceptio
.getAttributes()
.get(HypertraceSemanticAttributes.httpResponseHeader(RESPONSE_HEADER_NAME)));
Assertions.assertNull(
spanData2
.getAttributes()
.get(HypertraceSemanticAttributes.httpResponseHeader(RESPONSE_BODY)));
spanData2.getAttributes().get(HypertraceSemanticAttributes.HTTP_RESPONSE_BODY));
}

@Test
Expand Down Expand Up @@ -264,7 +260,7 @@ public void connectionKeepAlive() throws IOException, TimeoutException, Interrup

try (Response response = httpClient.newCall(request2).execute()) {
Assertions.assertEquals(403, response.code());
Assertions.assertTrue(response.body().string().isEmpty());
Assertions.assertEquals("Hypertrace Blocked Request", response.body().string());
}

List<List<SpanData>> traces2 = TEST_WRITER.getTraces();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package io.opentelemetry.javaagent.instrumentation.hypertrace.netty.v4_1.server;

import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
Expand All @@ -30,7 +31,9 @@
import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.netty.v4.common.HttpRequestAndChannel;
import io.opentelemetry.javaagent.instrumentation.hypertrace.netty.v4_1.AttributeKeys;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import org.hypertrace.agent.core.filter.FilterResult;
import org.hypertrace.agent.filter.FilterRegistry;

public class HttpServerBlockingRequestHandler extends ChannelInboundHandlerAdapter {
Expand All @@ -51,26 +54,37 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
if (msg instanceof HttpRequest) {
Attribute<Map<String, String>> headersAttr = channel.attr(AttributeKeys.REQUEST_HEADERS);
Map<String, String> headers = headersAttr.getAndRemove();
if (headers != null && FilterRegistry.getFilter().evaluateRequestHeaders(span, headers)) {
forbidden(ctx, (HttpRequest) msg);
return;
if (headers != null) {
FilterResult filterResult =
FilterRegistry.getFilter().evaluateRequestHeaders(span, headers);
if (filterResult.shouldBlock()) {
forbidden(ctx, (HttpRequest) msg, filterResult);
return;
}
}
}
if (msg instanceof HttpContent) {
if (FilterRegistry.getFilter().evaluateRequestBody(span, null, null)) {
FilterResult filterResult = FilterRegistry.getFilter().evaluateRequestBody(span, null, null);
if (filterResult.shouldBlock()) {
Attribute<?> requestAttr = channel.attr(AttributeKeys.REQUEST);
HttpRequest req = ((HttpRequestAndChannel) (requestAttr.get())).request();
forbidden(ctx, req);
forbidden(ctx, req, filterResult);
return;
}
}
ctx.fireChannelRead(msg);
}

static void forbidden(ChannelHandlerContext ctx, HttpRequest request) {
static void forbidden(ChannelHandlerContext ctx, HttpRequest request, FilterResult filterResult) {
DefaultFullHttpResponse blockResponse =
new DefaultFullHttpResponse(request.protocolVersion(), HttpResponseStatus.FORBIDDEN);
blockResponse.headers().add("Content-Length", "0");
new DefaultFullHttpResponse(
request.protocolVersion(),
new HttpResponseStatus(
filterResult.getBlockingStatusCode(), HttpResponseStatus.FORBIDDEN.reasonPhrase()),
Unpooled.copiedBuffer(filterResult.getBlockingMsg().getBytes(StandardCharsets.UTF_8)));
blockResponse
.headers()
.add("Content-Length", String.valueOf(filterResult.getBlockingMsg().length()));
ReferenceCountUtil.release(request);
ctx.writeAndFlush(blockResponse).addListener(ChannelFutureListener.CLOSE);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ public void blocking() throws IOException, TimeoutException, InterruptedExceptio

try (Response response = httpClient.newCall(request).execute()) {
Assertions.assertEquals(403, response.code());
Assertions.assertTrue(response.body().string().isEmpty());
Assertions.assertEquals("Hypertrace Blocked Request", response.body().string());
}

List<List<SpanData>> traces = TEST_WRITER.getTraces();
Expand All @@ -168,9 +168,7 @@ public void blocking() throws IOException, TimeoutException, InterruptedExceptio
.getAttributes()
.get(HypertraceSemanticAttributes.httpResponseHeader(RESPONSE_HEADER_NAME)));
Assertions.assertNull(
spanData
.getAttributes()
.get(HypertraceSemanticAttributes.httpResponseHeader(RESPONSE_BODY)));
spanData.getAttributes().get(HypertraceSemanticAttributes.HTTP_RESPONSE_BODY));

RequestBody requestBody = blockedRequestBody(true, 3000, 75);
Request request2 =
Expand All @@ -182,7 +180,7 @@ public void blocking() throws IOException, TimeoutException, InterruptedExceptio

try (Response response = httpClient.newCall(request2).execute()) {
Assertions.assertEquals(403, response.code());
Assertions.assertTrue(response.body().string().isEmpty());
Assertions.assertEquals("Hypertrace Blocked Request", response.body().string());
}

List<List<SpanData>> traces2 = TEST_WRITER.getTraces();
Expand All @@ -202,9 +200,7 @@ public void blocking() throws IOException, TimeoutException, InterruptedExceptio
.getAttributes()
.get(HypertraceSemanticAttributes.httpResponseHeader(RESPONSE_HEADER_NAME)));
Assertions.assertNull(
spanData2
.getAttributes()
.get(HypertraceSemanticAttributes.httpResponseHeader(RESPONSE_BODY)));
spanData2.getAttributes().get(HypertraceSemanticAttributes.HTTP_RESPONSE_BODY));
}

@Test
Expand Down Expand Up @@ -264,7 +260,7 @@ public void connectionKeepAlive() throws IOException, TimeoutException, Interrup

try (Response response = httpClient.newCall(request2).execute()) {
Assertions.assertEquals(403, response.code());
Assertions.assertTrue(response.body().string().isEmpty());
Assertions.assertEquals("Hypertrace Blocked Request", response.body().string());
}

List<List<SpanData>> traces2 = TEST_WRITER.getTraces();
Expand Down
Loading

0 comments on commit 4570eef

Please sign in to comment.