Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Signals processing #1014

Merged
merged 20 commits into from
Sep 7, 2023
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
7f9a765
Creating processors subproject
LikeTheSalad Aug 23, 2023
27a8f2c
Creating the interceptor interface and interceptable base class
LikeTheSalad Aug 23, 2023
f86d71f
Created and interceptable exporter for spans
LikeTheSalad Aug 23, 2023
238080c
Created and interceptable exporter for metrics
LikeTheSalad Aug 23, 2023
e683579
Created and interceptable exporter for logs
LikeTheSalad Aug 23, 2023
91a3c25
Creating README file
LikeTheSalad Aug 23, 2023
8a48848
Updating component_owners.yml
LikeTheSalad Aug 23, 2023
1af3867
Running spotlessApply
LikeTheSalad Aug 23, 2023
68f84f8
Removing inheritance for interceptable exporters
LikeTheSalad Aug 24, 2023
5d7aaff
Adding breedx-splk as component owner
LikeTheSalad Aug 24, 2023
4f1014e
Adding "interceptAll" as Interceptor default method
LikeTheSalad Aug 24, 2023
9f5c6d9
Update processors/src/main/java/io/opentelemetry/contrib/interceptor/…
LikeTheSalad Aug 25, 2023
266472f
Update processors/src/main/java/io/opentelemetry/contrib/interceptor/…
LikeTheSalad Aug 25, 2023
33443fe
Update processors/src/main/java/io/opentelemetry/contrib/interceptor/…
LikeTheSalad Aug 25, 2023
0d6edb3
Update processors/src/main/java/io/opentelemetry/contrib/interceptor/…
LikeTheSalad Aug 25, 2023
8f61209
Update processors/src/test/java/io/opentelemetry/contrib/interceptor/…
LikeTheSalad Aug 25, 2023
1faffe9
Using CopyOnWriteArrayList in ComposableInterceptor
LikeTheSalad Aug 25, 2023
90fbc9d
Making exporters final
LikeTheSalad Aug 25, 2023
8c93d6c
Adding nullable imports
LikeTheSalad Aug 25, 2023
e044d7c
Making test functions package private
LikeTheSalad Aug 25, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/component_owners.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ components:
- HaloFour
noop-api:
- jack-berg
processors:
- LikeTheSalad
- ?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can add me here. I think the literal ? breaks things.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! You're added now.

prometheus-collector:
- jkwatson
resource-providers:
Expand Down
10 changes: 10 additions & 0 deletions processors/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# Processors

This module provides tools to intercept and process signals globally.

## Component owners

- [Cesar Munoz](https://github.com/LikeTheSalad), Elastic
- ?

Learn more about component owners in [component_owners.yml](../.github/component_owners.yml).
17 changes: 17 additions & 0 deletions processors/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
plugins {
id("otel.java-conventions")
id("otel.publish-conventions")
}

description = "Tools to intercept and process signals globally."
otelJava.moduleName.set("io.opentelemetry.contrib.processors")

java {
sourceCompatibility = JavaVersion.VERSION_1_8
targetCompatibility = JavaVersion.VERSION_1_8
}

dependencies {
api("io.opentelemetry:opentelemetry-sdk")
testImplementation("io.opentelemetry:opentelemetry-sdk-testing")
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.contrib.interceptor;

import io.opentelemetry.contrib.interceptor.common.Interceptable;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.logs.data.LogRecordData;
import io.opentelemetry.sdk.logs.export.LogRecordExporter;
import java.util.Collection;

/** Intercepts logs before delegating them to the real exporter. */
public class InterceptableLogRecordExporter extends Interceptable<LogRecordData>
implements LogRecordExporter {
private final LogRecordExporter delegate;

public InterceptableLogRecordExporter(LogRecordExporter delegate) {
this.delegate = delegate;
}

@Override
public CompletableResultCode export(Collection<LogRecordData> logs) {
return delegate.export(interceptAll(logs));
}

@Override
public CompletableResultCode flush() {
return delegate.flush();
}

@Override
public CompletableResultCode shutdown() {
return delegate.shutdown();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.contrib.interceptor;

import io.opentelemetry.contrib.interceptor.common.Interceptable;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.metrics.InstrumentType;
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.metrics.export.MetricExporter;
import java.util.Collection;

/** Intercepts metrics before delegating them to the real exporter. */
public class InterceptableMetricExporter extends Interceptable<MetricData>
implements MetricExporter {
private final MetricExporter delegate;

public InterceptableMetricExporter(MetricExporter delegate) {
this.delegate = delegate;
}

@Override
public CompletableResultCode export(Collection<MetricData> metrics) {
return delegate.export(interceptAll(metrics));
}

@Override
public CompletableResultCode flush() {
return delegate.flush();
}

@Override
public CompletableResultCode shutdown() {
return delegate.shutdown();
}

@Override
public AggregationTemporality getAggregationTemporality(InstrumentType instrumentType) {
return delegate.getAggregationTemporality(instrumentType);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.contrib.interceptor;

import io.opentelemetry.contrib.interceptor.common.Interceptable;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.trace.data.SpanData;
import io.opentelemetry.sdk.trace.export.SpanExporter;
import java.util.Collection;

/** Intercepts spans before delegating them to the real exporter. */
public class InterceptableSpanExporter extends Interceptable<SpanData> implements SpanExporter {
private final SpanExporter delegate;

public InterceptableSpanExporter(SpanExporter delegate) {
this.delegate = delegate;
}

@Override
public CompletableResultCode export(Collection<SpanData> spans) {
return delegate.export(interceptAll(spans));
}

@Override
public CompletableResultCode flush() {
return delegate.flush();
}

@Override
public CompletableResultCode shutdown() {
return delegate.shutdown();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.contrib.interceptor.api;

/**
* Intercepts a signal before it gets exported. The signal can get updated and/or filtered out based
* on each interceptor implementation.
*/
public interface Interceptor<T> {

/**
* Intercepts a signal.
*
* @param item The signal object.
* @return The received signal modified (or null for excluding this signal from getting exported).
* If there's no operation needed to be done for a specific signal, it should be returned as
* is.
*/
T intercept(T item);
LikeTheSalad marked this conversation as resolved.
Show resolved Hide resolved
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.contrib.interceptor.common;

import io.opentelemetry.contrib.interceptor.api.Interceptor;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

/** Base class to reuse the code related to intercepting signals. */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I strongly believe that we should not use object inheritance as a mechanism for reusing code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally speaking, I understand the benefits of composition over inheritance, although I still believe that there are cases where inheritance makes more sense. I think the problem comes when people start adding more than one responsibility to a base class. That being said though, after my recent changes to this PR I realized that it was actually better not to use inheritance for this use case 😅 so thanks for your input! Though I guess my point is that the best approach might vary depending on the use case.

public class Interceptable<T> {
private final Set<Interceptor<T>> interceptors = new HashSet<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could think of a few contrived scenarios where order might matter. I'd use a list.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, I've added the changes into ComposableInterceptor.


public void addInterceptor(Interceptor<T> interceptor) {
interceptors.add(interceptor);
}

protected Collection<T> interceptAll(Collection<T> items) {
List<T> result = new ArrayList<>();

for (T item : items) {
T intercepted = intercept(item);
if (intercepted != null) {
result.add(intercepted);
}
}

return result;
}

private T intercept(T item) {
T intercepted = item;
for (Interceptor<T> interceptor : interceptors) {
intercepted = interceptor.intercept(intercepted);
if (intercepted == null) {
break;
}
}
return intercepted;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.contrib.interceptor;

import static org.junit.jupiter.api.Assertions.assertEquals;

import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.common.AttributesBuilder;
import io.opentelemetry.api.logs.Logger;
import io.opentelemetry.api.logs.Severity;
import io.opentelemetry.api.trace.SpanContext;
import io.opentelemetry.sdk.common.InstrumentationScopeInfo;
import io.opentelemetry.sdk.logs.SdkLoggerProvider;
import io.opentelemetry.sdk.logs.data.Body;
import io.opentelemetry.sdk.logs.data.LogRecordData;
import io.opentelemetry.sdk.logs.export.SimpleLogRecordProcessor;
import io.opentelemetry.sdk.resources.Resource;
import io.opentelemetry.sdk.testing.exporter.InMemoryLogRecordExporter;
import java.util.List;
import javax.annotation.Nullable;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

class InterceptableLogRecordExporterTest {
private InMemoryLogRecordExporter memoryLogRecordExporter;
private Logger logger;
private InterceptableLogRecordExporter interceptable;

@BeforeEach
public void setUp() {
LikeTheSalad marked this conversation as resolved.
Show resolved Hide resolved
memoryLogRecordExporter = InMemoryLogRecordExporter.create();
interceptable = new InterceptableLogRecordExporter(memoryLogRecordExporter);
logger =
SdkLoggerProvider.builder()
.addLogRecordProcessor(SimpleLogRecordProcessor.create(interceptable))
.build()
.get("TestScope");
}

@Test
public void verifyLogModification() {
interceptable.addInterceptor(
item -> {
ModifiableLogRecordData modified = new ModifiableLogRecordData(item);
modified.attributes.put("global.attr", "from interceptor");
return modified;
});

logger
.logRecordBuilder()
.setBody("One log")
.setAttribute(AttributeKey.stringKey("local.attr"), "local")
.emit();

List<LogRecordData> finishedLogRecordItems =
memoryLogRecordExporter.getFinishedLogRecordItems();
assertEquals(1, finishedLogRecordItems.size());
LogRecordData logRecordData = finishedLogRecordItems.get(0);
assertEquals(2, logRecordData.getAttributes().size());
assertEquals(
"from interceptor",
logRecordData.getAttributes().get(AttributeKey.stringKey("global.attr")));
assertEquals("local", logRecordData.getAttributes().get(AttributeKey.stringKey("local.attr")));
}

@Test
public void verifyLogFiltering() {
interceptable.addInterceptor(
item -> {
if (item.getBody().asString().contains("deleted")) {
return null;
}
return item;
});

logger.logRecordBuilder().setBody("One log").emit();
logger.logRecordBuilder().setBody("This log will be deleted").emit();
logger.logRecordBuilder().setBody("Another log").emit();

List<LogRecordData> finishedLogRecordItems =
memoryLogRecordExporter.getFinishedLogRecordItems();
assertEquals(2, finishedLogRecordItems.size());
assertEquals("One log", finishedLogRecordItems.get(0).getBody().asString());
assertEquals("Another log", finishedLogRecordItems.get(1).getBody().asString());
}

private static class ModifiableLogRecordData implements LogRecordData {
private final LogRecordData delegate;
private final AttributesBuilder attributes = Attributes.builder();

private ModifiableLogRecordData(LogRecordData delegate) {
this.delegate = delegate;
}

@Override
public Resource getResource() {
return delegate.getResource();
}

@Override
public InstrumentationScopeInfo getInstrumentationScopeInfo() {
return delegate.getInstrumentationScopeInfo();
}

@Override
public long getTimestampEpochNanos() {
return delegate.getTimestampEpochNanos();
}

@Override
public long getObservedTimestampEpochNanos() {
return delegate.getObservedTimestampEpochNanos();
}

@Override
public SpanContext getSpanContext() {
return delegate.getSpanContext();
}

@Override
public Severity getSeverity() {
return delegate.getSeverity();
}

@Nullable
@Override
public String getSeverityText() {
return delegate.getSeverityText();
}

@Override
public Body getBody() {
return delegate.getBody();
}

@Override
public Attributes getAttributes() {
return attributes.putAll(delegate.getAttributes()).build();
}

@Override
public int getTotalAttributeCount() {
return delegate.getTotalAttributeCount();
}
}
}
Loading
Loading