Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

disk-buffering refactoring #957

Merged
merged 10 commits into from
Jul 17, 2023
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import io.opentelemetry.contrib.disk.buffering.internal.StorageConfiguration;
import io.opentelemetry.contrib.disk.buffering.internal.exporters.DiskExporter;
import io.opentelemetry.contrib.disk.buffering.internal.serialization.serializers.SignalSerializer;
import io.opentelemetry.contrib.disk.buffering.internal.storage.utils.StorageClock;
import io.opentelemetry.sdk.common.Clock;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.logs.LogRecordProcessor;
import io.opentelemetry.sdk.logs.data.LogRecordData;
Expand Down Expand Up @@ -41,29 +41,29 @@ public final class LogRecordDiskExporter implements LogRecordExporter, StoredBat
public static LogRecordDiskExporter create(
LogRecordExporter wrapped, File rootDir, StorageConfiguration configuration)
throws IOException {
return create(wrapped, rootDir, configuration, StorageClock.getInstance());
return create(wrapped, rootDir, configuration, Clock.getDefault());
}

// This is used for testing purposes.
// This is exposed for testing purposes.
static LogRecordDiskExporter create(
LogRecordExporter wrapped,
File rootDir,
StorageConfiguration configuration,
StorageClock clock)
LogRecordExporter wrapped, File rootDir, StorageConfiguration configuration, Clock clock)
throws IOException {
return new LogRecordDiskExporter(wrapped, rootDir, configuration, clock);
DiskExporter<LogRecordData> diskExporter =
DiskExporter.<LogRecordData>builder()
.setSerializer(SignalSerializer.ofLogs())
.setRootDir(rootDir)
.setFolderName("logs")
.setStorageConfiguration(configuration)
.setStorageClock(clock)
.setExportFunction(wrapped::export)
.build();
return new LogRecordDiskExporter(wrapped, diskExporter);
}

private LogRecordDiskExporter(
LogRecordExporter wrapped,
File rootDir,
StorageConfiguration configuration,
StorageClock clock)
throws IOException {
LogRecordExporter wrapped, DiskExporter<LogRecordData> diskExporter) {
this.wrapped = wrapped;
diskExporter =
new DiskExporter<>(
rootDir, configuration, "logs", SignalSerializer.ofLogs(), wrapped::export, clock);
this.diskExporter = diskExporter;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import io.opentelemetry.contrib.disk.buffering.internal.StorageConfiguration;
import io.opentelemetry.contrib.disk.buffering.internal.exporters.DiskExporter;
import io.opentelemetry.contrib.disk.buffering.internal.serialization.serializers.SignalSerializer;
import io.opentelemetry.contrib.disk.buffering.internal.storage.utils.StorageClock;
import io.opentelemetry.sdk.common.Clock;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.metrics.InstrumentType;
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
Expand Down Expand Up @@ -42,28 +42,28 @@ public final class MetricDiskExporter implements MetricExporter, StoredBatchExpo
*/
public static MetricDiskExporter create(
MetricExporter wrapped, File rootDir, StorageConfiguration configuration) throws IOException {
return create(wrapped, rootDir, configuration, StorageClock.getInstance());
return create(wrapped, rootDir, configuration, Clock.getDefault());
}

// This is used for testing purposes.
// This is exposed for testing purposes.
public static MetricDiskExporter create(
MetricExporter wrapped, File rootDir, StorageConfiguration configuration, StorageClock clock)
MetricExporter wrapped, File rootDir, StorageConfiguration configuration, Clock clock)
throws IOException {
return new MetricDiskExporter(wrapped, rootDir, configuration, clock);
DiskExporter<MetricData> diskExporter =
DiskExporter.<MetricData>builder()
.setRootDir(rootDir)
.setFolderName("metrics")
.setStorageConfiguration(configuration)
.setSerializer(SignalSerializer.ofMetrics())
.setExportFunction(wrapped::export)
.setStorageClock(clock)
.build();
return new MetricDiskExporter(wrapped, diskExporter);
}

private MetricDiskExporter(
MetricExporter wrapped, File rootDir, StorageConfiguration configuration, StorageClock clock)
throws IOException {
private MetricDiskExporter(MetricExporter wrapped, DiskExporter<MetricData> diskExporter) {
this.wrapped = wrapped;
diskExporter =
new DiskExporter<>(
rootDir,
configuration,
"metrics",
SignalSerializer.ofMetrics(),
wrapped::export,
clock);
this.diskExporter = diskExporter;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import io.opentelemetry.contrib.disk.buffering.internal.StorageConfiguration;
import io.opentelemetry.contrib.disk.buffering.internal.exporters.DiskExporter;
import io.opentelemetry.contrib.disk.buffering.internal.serialization.serializers.SignalSerializer;
import io.opentelemetry.contrib.disk.buffering.internal.storage.utils.StorageClock;
import io.opentelemetry.sdk.common.Clock;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.trace.SpanProcessor;
import io.opentelemetry.sdk.trace.data.SpanData;
Expand Down Expand Up @@ -40,23 +40,29 @@ public final class SpanDiskExporter implements SpanExporter, StoredBatchExporter
*/
public static SpanDiskExporter create(
SpanExporter wrapped, File rootDir, StorageConfiguration configuration) throws IOException {
return create(wrapped, rootDir, configuration, StorageClock.getInstance());
return create(wrapped, rootDir, configuration, Clock.getDefault());
}

// This is used for testing purposes.
// This is exposed for testing purposes.
public static SpanDiskExporter create(
SpanExporter wrapped, File rootDir, StorageConfiguration configuration, StorageClock clock)
SpanExporter wrapped, File rootDir, StorageConfiguration configuration, Clock clock)
throws IOException {
return new SpanDiskExporter(wrapped, rootDir, configuration, clock);
DiskExporter<SpanData> diskExporter =
DiskExporter.<SpanData>builder()
.setRootDir(rootDir)
.setFolderName("spans")
.setStorageConfiguration(configuration)
.setSerializer(SignalSerializer.ofSpans())
.setExportFunction(wrapped::export)
.setStorageClock(clock)
.build();
return new SpanDiskExporter(wrapped, diskExporter);
}

private SpanDiskExporter(
SpanExporter wrapped, File rootDir, StorageConfiguration configuration, StorageClock clock)
private SpanDiskExporter(SpanExporter wrapped, DiskExporter<SpanData> diskExporter)
throws IOException {
this.wrapped = wrapped;
diskExporter =
new DiskExporter<>(
rootDir, configuration, "spans", SignalSerializer.ofSpans(), wrapped::export, clock);
this.diskExporter = diskExporter;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,10 @@
package io.opentelemetry.contrib.disk.buffering.internal.exporters;

import io.opentelemetry.contrib.disk.buffering.StoredBatchExporter;
import io.opentelemetry.contrib.disk.buffering.internal.StorageConfiguration;
import io.opentelemetry.contrib.disk.buffering.internal.serialization.serializers.SignalSerializer;
import io.opentelemetry.contrib.disk.buffering.internal.storage.FolderManager;
import io.opentelemetry.contrib.disk.buffering.internal.storage.Storage;
import io.opentelemetry.contrib.disk.buffering.internal.storage.responses.ReadableResult;
import io.opentelemetry.contrib.disk.buffering.internal.storage.utils.StorageClock;
import io.opentelemetry.sdk.common.CompletableResultCode;
import java.io.File;
import java.io.IOException;
import java.util.Collection;
import java.util.concurrent.TimeUnit;
Expand All @@ -27,19 +23,17 @@ public final class DiskExporter<EXPORT_DATA> implements StoredBatchExporter {
private final Function<Collection<EXPORT_DATA>, CompletableResultCode> exportFunction;
private static final Logger logger = Logger.getLogger(DiskExporter.class.getName());

public DiskExporter(
File rootDir,
StorageConfiguration configuration,
String folderName,
DiskExporter(
SignalSerializer<EXPORT_DATA> serializer,
Function<Collection<EXPORT_DATA>, CompletableResultCode> exportFunction,
StorageClock clock)
throws IOException {
validateConfiguration(configuration);
this.storage =
new Storage(new FolderManager(getSignalFolder(rootDir, folderName), configuration, clock));
Storage storage) {
this.serializer = serializer;
this.exportFunction = exportFunction;
this.storage = storage;
}

public static <T> DiskExporterBuilder<T> builder() {
return new DiskExporterBuilder<T>();
}

@Override
Expand Down Expand Up @@ -77,22 +71,4 @@ public CompletableResultCode onExport(Collection<EXPORT_DATA> data) {
return exportFunction.apply(data);
}
}

private static File getSignalFolder(File rootDir, String folderName) throws IOException {
File folder = new File(rootDir, folderName);
if (!folder.exists()) {
if (!folder.mkdirs()) {
throw new IOException(
"Could not create the signal folder: '" + folderName + "' inside: " + rootDir);
}
}
return folder;
}

private static void validateConfiguration(StorageConfiguration configuration) {
if (configuration.getMinFileAgeForReadMillis() <= configuration.getMaxFileAgeForWriteMillis()) {
throw new IllegalArgumentException(
"The configured max file age for writing must be lower than the configured min file age for reading");
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.contrib.disk.buffering.internal.exporters;

import io.opentelemetry.contrib.disk.buffering.internal.StorageConfiguration;
import io.opentelemetry.contrib.disk.buffering.internal.serialization.serializers.SignalSerializer;
import io.opentelemetry.contrib.disk.buffering.internal.storage.FolderManager;
import io.opentelemetry.contrib.disk.buffering.internal.storage.Storage;
import io.opentelemetry.sdk.common.Clock;
import io.opentelemetry.sdk.common.CompletableResultCode;
import java.io.File;
import java.io.IOException;
import java.util.Collection;
import java.util.function.Function;

public final class DiskExporterBuilder<T> {

private SignalSerializer<T> serializer;
private File rootDir;
private String folderName;
private StorageConfiguration configuration;
private Clock clock = Clock.getDefault();
private Function<Collection<T>, CompletableResultCode> exportFunction;

DiskExporterBuilder() {}

public DiskExporterBuilder<T> setRootDir(File rootDir) {
this.rootDir = rootDir;
return this;
}

public DiskExporterBuilder<T> setFolderName(String folderName) {
this.folderName = folderName;
return this;
}

public DiskExporterBuilder<T> setStorageConfiguration(StorageConfiguration configuration) {
this.configuration = configuration;
return this;
}

public DiskExporterBuilder<T> setStorageClock(Clock clock) {
this.clock = clock;
return this;
}

public DiskExporterBuilder<T> setSerializer(SignalSerializer<T> serializer) {
this.serializer = serializer;
return this;
}

public DiskExporterBuilder<T> setExportFunction(
Function<Collection<T>, CompletableResultCode> exportFunction) {
this.exportFunction = exportFunction;
return this;
}

private static File getSignalFolder(File rootDir, String folderName) throws IOException {
File folder = new File(rootDir, folderName);
if (!folder.exists()) {
if (!folder.mkdirs()) {
throw new IOException(
"Could not create the signal folder: '" + folderName + "' inside: " + rootDir);
}
}
return folder;
}

public DiskExporter<T> build() throws IOException {
validateConfiguration(configuration);

File folder = getSignalFolder(rootDir, folderName);
Storage storage = new Storage(new FolderManager(folder, configuration, clock));

return new DiskExporter<>(serializer, exportFunction, storage);
}

private static void validateConfiguration(StorageConfiguration configuration) {
if (configuration.getMinFileAgeForReadMillis() <= configuration.getMaxFileAgeForWriteMillis()) {
throw new IllegalArgumentException(
"The configured max file age for writing must be lower than the configured min file age for reading");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import java.util.List;
import java.util.Map;

public class ProtoLogsDataMapper
public final class ProtoLogsDataMapper
extends BaseProtoSignalsDataMapper<
LogRecordData, LogRecord, LogsData, ResourceLogs, ScopeLogs> {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import java.util.List;
import java.util.Map;

public class ProtoMetricsDataMapper
public final class ProtoMetricsDataMapper
extends BaseProtoSignalsDataMapper<
MetricData, Metric, MetricsData, ResourceMetrics, ScopeMetrics> {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import java.util.List;
import java.util.Map;

public class ProtoSpansDataMapper
public final class ProtoSpansDataMapper
extends BaseProtoSignalsDataMapper<SpanData, Span, TracesData, ResourceSpans, ScopeSpans> {

private static final ProtoSpansDataMapper INSTANCE = new ProtoSpansDataMapper();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,23 @@

package io.opentelemetry.contrib.disk.buffering.internal.serialization.serializers;

import io.opentelemetry.sdk.logs.data.LogRecordData;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.trace.data.SpanData;
import java.util.Collection;
import java.util.List;

public interface SignalSerializer<SDK_ITEM> {

static SpanDataSerializer ofSpans() {
static SignalSerializer<SpanData> ofSpans() {
return SpanDataSerializer.getInstance();
}

static MetricDataSerializer ofMetrics() {
static SignalSerializer<MetricData> ofMetrics() {
return MetricDataSerializer.getInstance();
}

static LogRecordDataSerializer ofLogs() {
static SignalSerializer<LogRecordData> ofLogs() {
return LogRecordDataSerializer.getInstance();
}

Expand Down
Loading
Loading