From dbb782e75bee95a0d823740e5b6e5ea4e739d3da Mon Sep 17 00:00:00 2001 From: Jonathan Halliday Date: Thu, 5 Sep 2024 11:23:16 +0100 Subject: [PATCH] Add ByteBuffer field type marshaling support to exporter. --- .../internal/marshal/CodedOutputStream.java | 53 ++++++++++++++++++- .../internal/marshal/JsonSerializer.java | 8 +++ .../internal/marshal/MarshalerUtil.java | 9 ++++ .../internal/marshal/ProtoSerializer.java | 7 +++ .../exporter/internal/marshal/Serializer.java | 15 ++++++ 5 files changed, 91 insertions(+), 1 deletion(-) diff --git a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/marshal/CodedOutputStream.java b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/marshal/CodedOutputStream.java index 8cc17a7834b..68311be1845 100644 --- a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/marshal/CodedOutputStream.java +++ b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/marshal/CodedOutputStream.java @@ -44,6 +44,7 @@ import io.opentelemetry.api.internal.ConfigUtil; import java.io.IOException; import java.io.OutputStream; +import java.nio.ByteBuffer; /** * Protobuf wire encoder. @@ -56,7 +57,7 @@ // // Differences // - No support for Message/Lite -// - No support for ByteString or ByteBuffer +// - No support for ByteString // - No support for message set extensions // - No support for Unsafe // - No support for Java String, only UTF-8 bytes @@ -329,6 +330,11 @@ public static int computeByteArraySizeNoTag(final byte[] value) { return computeLengthDelimitedFieldSize(value.length); } + /** Compute the number of bytes that would be needed to encode a {@code bytes} field. */ + public static int computeByteBufferSizeNoTag(final ByteBuffer value) { + return computeLengthDelimitedFieldSize(value.capacity()); + } + static int computeLengthDelimitedFieldSize(int fieldLength) { return computeUInt32SizeNoTag(fieldLength) + fieldLength; } @@ -375,6 +381,8 @@ static long encodeZigZag64(final long n) { abstract void writeByteArrayNoTag(final byte[] value, final int offset, final int length) throws IOException; + abstract void writeByteBufferNoTag(final ByteBuffer value) throws IOException; + // ================================================================= /** Abstract base class for buffered encoders. */ @@ -487,6 +495,49 @@ void writeByteArrayNoTag(final byte[] value, int offset, int length) throws IOEx write(value, offset, length); } + @Override + void writeByteBufferNoTag(final ByteBuffer value) throws IOException { + writeUInt32NoTag(value.capacity()); + if (value.hasArray()) { + write(value.array(), value.arrayOffset(), value.capacity()); + } else { + write((ByteBuffer) value.duplicate().clear()); + } + } + + void write(ByteBuffer value) throws IOException { + int length = value.remaining(); + if (limit - position >= length) { + // We have room in the current buffer. + value.get(buffer, position, length); + position += length; + totalBytesWritten += length; + } else { + // Write extends past current buffer. Fill the rest of this buffer and + // flush. + final int bytesWritten = limit - position; + value.get(buffer, position, bytesWritten); + length -= bytesWritten; + position = limit; + totalBytesWritten += bytesWritten; + doFlush(); + + // Now deal with the rest. + // Since we have an output stream, this is our buffer + // and buffer offset == 0 + while (length > limit) { + // Copy data into the buffer before writing it to OutputStream. + value.get(buffer, 0, limit); + out.write(buffer, 0, limit); + length -= limit; + totalBytesWritten += limit; + } + value.get(buffer, 0, length); + position = length; + totalBytesWritten += length; + } + } + @Override void write(byte value) throws IOException { if (position == limit) { diff --git a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/marshal/JsonSerializer.java b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/marshal/JsonSerializer.java index 7d8ad3aad3f..680220b5e3e 100644 --- a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/marshal/JsonSerializer.java +++ b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/marshal/JsonSerializer.java @@ -9,6 +9,7 @@ import com.fasterxml.jackson.core.JsonGenerator; import java.io.IOException; import java.io.OutputStream; +import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.List; @@ -126,6 +127,13 @@ public void writeBytes(ProtoFieldInfo field, byte[] value) throws IOException { generator.writeBinaryField(field.getJsonName(), value); } + @Override + public void writeByteBuffer(ProtoFieldInfo field, ByteBuffer value) throws IOException { + byte[] data = new byte[value.capacity()]; + ((ByteBuffer) value.duplicate().clear()).get(data); + generator.writeBinaryField(field.getJsonName(), data); + } + @Override protected void writeStartMessage(ProtoFieldInfo field, int protoMessageSize) throws IOException { generator.writeObjectFieldStart(field.getJsonName()); diff --git a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/marshal/MarshalerUtil.java b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/marshal/MarshalerUtil.java index 3fff240704f..d7f6d44c871 100644 --- a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/marshal/MarshalerUtil.java +++ b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/marshal/MarshalerUtil.java @@ -13,6 +13,7 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.UncheckedIOException; +import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collection; @@ -346,6 +347,14 @@ public static int sizeBytes(ProtoFieldInfo field, byte[] message) { return field.getTagSize() + CodedOutputStream.computeByteArraySizeNoTag(message); } + /** Returns the size of a bytes field based on the buffer's capacity. */ + public static int sizeByteBuffer(ProtoFieldInfo field, ByteBuffer message) { + if (message.capacity() == 0) { + return 0; + } + return field.getTagSize() + CodedOutputStream.computeByteBufferSizeNoTag(message); + } + /** Returns the size of a enum field. */ // Assumes OTLP always defines the first item in an enum with number 0, which it does and will. public static int sizeEnum(ProtoFieldInfo field, ProtoEnumInfo enumValue) { diff --git a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/marshal/ProtoSerializer.java b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/marshal/ProtoSerializer.java index b71109912b2..62f4a175982 100644 --- a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/marshal/ProtoSerializer.java +++ b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/marshal/ProtoSerializer.java @@ -10,6 +10,7 @@ import io.opentelemetry.api.trace.TraceId; import java.io.IOException; import java.io.OutputStream; +import java.nio.ByteBuffer; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -168,6 +169,12 @@ public void writeBytes(ProtoFieldInfo field, byte[] value) throws IOException { output.writeByteArrayNoTag(value); } + @Override + public void writeByteBuffer(ProtoFieldInfo field, ByteBuffer value) throws IOException { + output.writeUInt32NoTag(field.getTag()); + output.writeByteBufferNoTag(value); + } + @Override protected void writeStartMessage(ProtoFieldInfo field, int protoMessageSize) throws IOException { output.writeUInt32NoTag(field.getTag()); diff --git a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/marshal/Serializer.java b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/marshal/Serializer.java index 205ec192e35..e7970d57491 100644 --- a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/marshal/Serializer.java +++ b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/marshal/Serializer.java @@ -9,6 +9,7 @@ import io.opentelemetry.api.common.Attributes; import io.opentelemetry.sdk.internal.DynamicPrimitiveLongList; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.Collection; import java.util.List; import java.util.Map; @@ -253,8 +254,22 @@ public void serializeBytes(ProtoFieldInfo field, byte[] value) throws IOExceptio writeBytes(field, value); } + /** + * Serializes a protobuf {@code bytes} field. Writes all content of the ByteBuffer regardless of + * the current position and limit. Does not alter the position or limit of the provided + * ByteBuffer. + */ + public void serializeByteBuffer(ProtoFieldInfo field, ByteBuffer value) throws IOException { + if (value.capacity() == 0) { + return; + } + writeByteBuffer(field, value); + } + public abstract void writeBytes(ProtoFieldInfo field, byte[] value) throws IOException; + public abstract void writeByteBuffer(ProtoFieldInfo field, ByteBuffer value) throws IOException; + protected abstract void writeStartMessage(ProtoFieldInfo field, int protoMessageSize) throws IOException;