Skip to content

Commit

Permalink
Add ByteBuffer field type marshaling support to exporter.
Browse files Browse the repository at this point in the history
  • Loading branch information
jhalliday committed Sep 5, 2024
1 parent ee7fd27 commit 65cf5a1
Show file tree
Hide file tree
Showing 5 changed files with 91 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import io.opentelemetry.api.internal.ConfigUtil;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;

/**
* Protobuf wire encoder.
Expand All @@ -56,7 +57,7 @@
//
// Differences
// - No support for Message/Lite
// - No support for ByteString or ByteBuffer
// - No support for ByteString
// - No support for message set extensions
// - No support for Unsafe
// - No support for Java String, only UTF-8 bytes
Expand Down Expand Up @@ -329,6 +330,11 @@ public static int computeByteArraySizeNoTag(final byte[] value) {
return computeLengthDelimitedFieldSize(value.length);
}

/** Compute the number of bytes that would be needed to encode a {@code bytes} field. */
public static int computeByteBufferSizeNoTag(final ByteBuffer value) {
return computeLengthDelimitedFieldSize(value.capacity());

Check warning on line 335 in exporters/common/src/main/java/io/opentelemetry/exporter/internal/marshal/CodedOutputStream.java

View check run for this annotation

Codecov / codecov/patch

exporters/common/src/main/java/io/opentelemetry/exporter/internal/marshal/CodedOutputStream.java#L335

Added line #L335 was not covered by tests
}

static int computeLengthDelimitedFieldSize(int fieldLength) {
return computeUInt32SizeNoTag(fieldLength) + fieldLength;
}
Expand Down Expand Up @@ -375,6 +381,8 @@ static long encodeZigZag64(final long n) {
abstract void writeByteArrayNoTag(final byte[] value, final int offset, final int length)
throws IOException;

abstract void writeByteBufferNoTag(final ByteBuffer value) throws IOException;

// =================================================================

/** Abstract base class for buffered encoders. */
Expand Down Expand Up @@ -487,6 +495,49 @@ void writeByteArrayNoTag(final byte[] value, int offset, int length) throws IOEx
write(value, offset, length);
}

@Override
void writeByteBufferNoTag(final ByteBuffer value) throws IOException {
writeUInt32NoTag(value.capacity());

Check warning on line 500 in exporters/common/src/main/java/io/opentelemetry/exporter/internal/marshal/CodedOutputStream.java

View check run for this annotation

Codecov / codecov/patch

exporters/common/src/main/java/io/opentelemetry/exporter/internal/marshal/CodedOutputStream.java#L500

Added line #L500 was not covered by tests
if (value.hasArray()) {
write(value.array(), value.arrayOffset(), value.capacity());

Check warning on line 502 in exporters/common/src/main/java/io/opentelemetry/exporter/internal/marshal/CodedOutputStream.java

View check run for this annotation

Codecov / codecov/patch

exporters/common/src/main/java/io/opentelemetry/exporter/internal/marshal/CodedOutputStream.java#L502

Added line #L502 was not covered by tests
} else {
write((ByteBuffer) value.duplicate().clear());

Check warning on line 504 in exporters/common/src/main/java/io/opentelemetry/exporter/internal/marshal/CodedOutputStream.java

View check run for this annotation

Codecov / codecov/patch

exporters/common/src/main/java/io/opentelemetry/exporter/internal/marshal/CodedOutputStream.java#L504

Added line #L504 was not covered by tests
}
}

Check warning on line 506 in exporters/common/src/main/java/io/opentelemetry/exporter/internal/marshal/CodedOutputStream.java

View check run for this annotation

Codecov / codecov/patch

exporters/common/src/main/java/io/opentelemetry/exporter/internal/marshal/CodedOutputStream.java#L506

Added line #L506 was not covered by tests

void write(ByteBuffer value) throws IOException {
int length = value.remaining();

Check warning on line 509 in exporters/common/src/main/java/io/opentelemetry/exporter/internal/marshal/CodedOutputStream.java

View check run for this annotation

Codecov / codecov/patch

exporters/common/src/main/java/io/opentelemetry/exporter/internal/marshal/CodedOutputStream.java#L509

Added line #L509 was not covered by tests
if (limit - position >= length) {
// We have room in the current buffer.
value.get(buffer, position, length);
position += length;
totalBytesWritten += length;

Check warning on line 514 in exporters/common/src/main/java/io/opentelemetry/exporter/internal/marshal/CodedOutputStream.java

View check run for this annotation

Codecov / codecov/patch

exporters/common/src/main/java/io/opentelemetry/exporter/internal/marshal/CodedOutputStream.java#L512-L514

Added lines #L512 - L514 were not covered by tests
} else {
// Write extends past current buffer. Fill the rest of this buffer and
// flush.
final int bytesWritten = limit - position;
value.get(buffer, position, bytesWritten);
length -= bytesWritten;
position = limit;
totalBytesWritten += bytesWritten;
doFlush();

Check warning on line 523 in exporters/common/src/main/java/io/opentelemetry/exporter/internal/marshal/CodedOutputStream.java

View check run for this annotation

Codecov / codecov/patch

exporters/common/src/main/java/io/opentelemetry/exporter/internal/marshal/CodedOutputStream.java#L518-L523

Added lines #L518 - L523 were not covered by tests

// Now deal with the rest.
// Since we have an output stream, this is our buffer
// and buffer offset == 0
while (length > limit) {
// Copy data into the buffer before writing it to OutputStream.
value.get(buffer, 0, limit);
out.write(buffer, 0, limit);
length -= limit;
totalBytesWritten += limit;

Check warning on line 533 in exporters/common/src/main/java/io/opentelemetry/exporter/internal/marshal/CodedOutputStream.java

View check run for this annotation

Codecov / codecov/patch

exporters/common/src/main/java/io/opentelemetry/exporter/internal/marshal/CodedOutputStream.java#L530-L533

Added lines #L530 - L533 were not covered by tests
}
value.get(buffer, 0, length);
position = length;
totalBytesWritten += length;

Check warning on line 537 in exporters/common/src/main/java/io/opentelemetry/exporter/internal/marshal/CodedOutputStream.java

View check run for this annotation

Codecov / codecov/patch

exporters/common/src/main/java/io/opentelemetry/exporter/internal/marshal/CodedOutputStream.java#L535-L537

Added lines #L535 - L537 were not covered by tests
}
}

Check warning on line 539 in exporters/common/src/main/java/io/opentelemetry/exporter/internal/marshal/CodedOutputStream.java

View check run for this annotation

Codecov / codecov/patch

exporters/common/src/main/java/io/opentelemetry/exporter/internal/marshal/CodedOutputStream.java#L539

Added line #L539 was not covered by tests

@Override
void write(byte value) throws IOException {
if (position == limit) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.fasterxml.jackson.core.JsonGenerator;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.List;

Expand Down Expand Up @@ -126,6 +127,13 @@ public void writeBytes(ProtoFieldInfo field, byte[] value) throws IOException {
generator.writeBinaryField(field.getJsonName(), value);
}

@Override
public void writeByteBuffer(ProtoFieldInfo field, ByteBuffer value) throws IOException {
byte[] data = new byte[value.capacity()];
((ByteBuffer) value.duplicate().clear()).get(data);
generator.writeBinaryField(field.getJsonName(), data);
}

Check warning on line 135 in exporters/common/src/main/java/io/opentelemetry/exporter/internal/marshal/JsonSerializer.java

View check run for this annotation

Codecov / codecov/patch

exporters/common/src/main/java/io/opentelemetry/exporter/internal/marshal/JsonSerializer.java#L132-L135

Added lines #L132 - L135 were not covered by tests

@Override
protected void writeStartMessage(ProtoFieldInfo field, int protoMessageSize) throws IOException {
generator.writeObjectFieldStart(field.getJsonName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
Expand Down Expand Up @@ -346,6 +347,14 @@ public static int sizeBytes(ProtoFieldInfo field, byte[] message) {
return field.getTagSize() + CodedOutputStream.computeByteArraySizeNoTag(message);
}

/** Returns the size of a bytes field based on the buffer's capacity. */
public static int sizeByteBuffer(ProtoFieldInfo field, ByteBuffer message) {
if (message.capacity() == 0) {
return 0;

Check warning on line 353 in exporters/common/src/main/java/io/opentelemetry/exporter/internal/marshal/MarshalerUtil.java

View check run for this annotation

Codecov / codecov/patch

exporters/common/src/main/java/io/opentelemetry/exporter/internal/marshal/MarshalerUtil.java#L353

Added line #L353 was not covered by tests
}
return field.getTagSize() + CodedOutputStream.computeByteBufferSizeNoTag(message);

Check warning on line 355 in exporters/common/src/main/java/io/opentelemetry/exporter/internal/marshal/MarshalerUtil.java

View check run for this annotation

Codecov / codecov/patch

exporters/common/src/main/java/io/opentelemetry/exporter/internal/marshal/MarshalerUtil.java#L355

Added line #L355 was not covered by tests
}

/** Returns the size of a enum field. */
// Assumes OTLP always defines the first item in an enum with number 0, which it does and will.
public static int sizeEnum(ProtoFieldInfo field, ProtoEnumInfo enumValue) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import io.opentelemetry.api.trace.TraceId;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -168,6 +169,12 @@ public void writeBytes(ProtoFieldInfo field, byte[] value) throws IOException {
output.writeByteArrayNoTag(value);
}

@Override
public void writeByteBuffer(ProtoFieldInfo field, ByteBuffer value) throws IOException {
output.writeUInt32NoTag(field.getTag());
output.writeByteBufferNoTag(value);
}

Check warning on line 176 in exporters/common/src/main/java/io/opentelemetry/exporter/internal/marshal/ProtoSerializer.java

View check run for this annotation

Codecov / codecov/patch

exporters/common/src/main/java/io/opentelemetry/exporter/internal/marshal/ProtoSerializer.java#L174-L176

Added lines #L174 - L176 were not covered by tests

@Override
protected void writeStartMessage(ProtoFieldInfo field, int protoMessageSize) throws IOException {
output.writeUInt32NoTag(field.getTag());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.sdk.internal.DynamicPrimitiveLongList;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -253,8 +254,22 @@ public void serializeBytes(ProtoFieldInfo field, byte[] value) throws IOExceptio
writeBytes(field, value);
}

/**
* Serializes a protobuf {@code bytes} field. Writes all content of the ByteBuffer regardless of
* the current position and limit. Does not alter the position or limit of the provided
* ByteBuffer.
*/
public void serializeByteBuffer(ProtoFieldInfo field, ByteBuffer value) throws IOException {
if (value.capacity() == 0) {
return;

Check warning on line 264 in exporters/common/src/main/java/io/opentelemetry/exporter/internal/marshal/Serializer.java

View check run for this annotation

Codecov / codecov/patch

exporters/common/src/main/java/io/opentelemetry/exporter/internal/marshal/Serializer.java#L264

Added line #L264 was not covered by tests
}
writeByteBuffer(field, value);
}

Check warning on line 267 in exporters/common/src/main/java/io/opentelemetry/exporter/internal/marshal/Serializer.java

View check run for this annotation

Codecov / codecov/patch

exporters/common/src/main/java/io/opentelemetry/exporter/internal/marshal/Serializer.java#L266-L267

Added lines #L266 - L267 were not covered by tests

public abstract void writeBytes(ProtoFieldInfo field, byte[] value) throws IOException;

public abstract void writeByteBuffer(ProtoFieldInfo field, ByteBuffer value) throws IOException;

protected abstract void writeStartMessage(ProtoFieldInfo field, int protoMessageSize)
throws IOException;

Expand Down

0 comments on commit 65cf5a1

Please sign in to comment.