Skip to content

Commit

Permalink
Add ByteBuffer field type marshaling support to exporter.
Browse files Browse the repository at this point in the history
  • Loading branch information
jhalliday committed Oct 10, 2024
1 parent b927d9d commit dbb782e
Show file tree
Hide file tree
Showing 5 changed files with 91 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import io.opentelemetry.api.internal.ConfigUtil;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;

/**
* Protobuf wire encoder.
Expand All @@ -56,7 +57,7 @@
//
// Differences
// - No support for Message/Lite
// - No support for ByteString or ByteBuffer
// - No support for ByteString
// - No support for message set extensions
// - No support for Unsafe
// - No support for Java String, only UTF-8 bytes
Expand Down Expand Up @@ -329,6 +330,11 @@ public static int computeByteArraySizeNoTag(final byte[] value) {
return computeLengthDelimitedFieldSize(value.length);
}

/** Compute the number of bytes that would be needed to encode a {@code bytes} field. */
public static int computeByteBufferSizeNoTag(final ByteBuffer value) {
return computeLengthDelimitedFieldSize(value.capacity());
}

static int computeLengthDelimitedFieldSize(int fieldLength) {
return computeUInt32SizeNoTag(fieldLength) + fieldLength;
}
Expand Down Expand Up @@ -375,6 +381,8 @@ static long encodeZigZag64(final long n) {
abstract void writeByteArrayNoTag(final byte[] value, final int offset, final int length)
throws IOException;

abstract void writeByteBufferNoTag(final ByteBuffer value) throws IOException;

// =================================================================

/** Abstract base class for buffered encoders. */
Expand Down Expand Up @@ -487,6 +495,49 @@ void writeByteArrayNoTag(final byte[] value, int offset, int length) throws IOEx
write(value, offset, length);
}

@Override
void writeByteBufferNoTag(final ByteBuffer value) throws IOException {
writeUInt32NoTag(value.capacity());
if (value.hasArray()) {
write(value.array(), value.arrayOffset(), value.capacity());
} else {
write((ByteBuffer) value.duplicate().clear());
}
}

void write(ByteBuffer value) throws IOException {
int length = value.remaining();
if (limit - position >= length) {
// We have room in the current buffer.
value.get(buffer, position, length);
position += length;
totalBytesWritten += length;
} else {
// Write extends past current buffer. Fill the rest of this buffer and
// flush.
final int bytesWritten = limit - position;
value.get(buffer, position, bytesWritten);
length -= bytesWritten;
position = limit;
totalBytesWritten += bytesWritten;
doFlush();

// Now deal with the rest.
// Since we have an output stream, this is our buffer
// and buffer offset == 0
while (length > limit) {
// Copy data into the buffer before writing it to OutputStream.
value.get(buffer, 0, limit);
out.write(buffer, 0, limit);
length -= limit;
totalBytesWritten += limit;
}
value.get(buffer, 0, length);
position = length;
totalBytesWritten += length;
}
}

@Override
void write(byte value) throws IOException {
if (position == limit) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.fasterxml.jackson.core.JsonGenerator;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.List;

Expand Down Expand Up @@ -126,6 +127,13 @@ public void writeBytes(ProtoFieldInfo field, byte[] value) throws IOException {
generator.writeBinaryField(field.getJsonName(), value);
}

@Override
public void writeByteBuffer(ProtoFieldInfo field, ByteBuffer value) throws IOException {
byte[] data = new byte[value.capacity()];
((ByteBuffer) value.duplicate().clear()).get(data);
generator.writeBinaryField(field.getJsonName(), data);
}

@Override
protected void writeStartMessage(ProtoFieldInfo field, int protoMessageSize) throws IOException {
generator.writeObjectFieldStart(field.getJsonName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
Expand Down Expand Up @@ -346,6 +347,14 @@ public static int sizeBytes(ProtoFieldInfo field, byte[] message) {
return field.getTagSize() + CodedOutputStream.computeByteArraySizeNoTag(message);
}

/** Returns the size of a bytes field based on the buffer's capacity. */
public static int sizeByteBuffer(ProtoFieldInfo field, ByteBuffer message) {
if (message.capacity() == 0) {
return 0;
}
return field.getTagSize() + CodedOutputStream.computeByteBufferSizeNoTag(message);
}

/** Returns the size of a enum field. */
// Assumes OTLP always defines the first item in an enum with number 0, which it does and will.
public static int sizeEnum(ProtoFieldInfo field, ProtoEnumInfo enumValue) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import io.opentelemetry.api.trace.TraceId;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -168,6 +169,12 @@ public void writeBytes(ProtoFieldInfo field, byte[] value) throws IOException {
output.writeByteArrayNoTag(value);
}

@Override
public void writeByteBuffer(ProtoFieldInfo field, ByteBuffer value) throws IOException {
output.writeUInt32NoTag(field.getTag());
output.writeByteBufferNoTag(value);
}

@Override
protected void writeStartMessage(ProtoFieldInfo field, int protoMessageSize) throws IOException {
output.writeUInt32NoTag(field.getTag());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.sdk.internal.DynamicPrimitiveLongList;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -253,8 +254,22 @@ public void serializeBytes(ProtoFieldInfo field, byte[] value) throws IOExceptio
writeBytes(field, value);
}

/**
* Serializes a protobuf {@code bytes} field. Writes all content of the ByteBuffer regardless of
* the current position and limit. Does not alter the position or limit of the provided
* ByteBuffer.
*/
public void serializeByteBuffer(ProtoFieldInfo field, ByteBuffer value) throws IOException {
if (value.capacity() == 0) {
return;
}
writeByteBuffer(field, value);
}

public abstract void writeBytes(ProtoFieldInfo field, byte[] value) throws IOException;

public abstract void writeByteBuffer(ProtoFieldInfo field, ByteBuffer value) throws IOException;

protected abstract void writeStartMessage(ProtoFieldInfo field, int protoMessageSize)
throws IOException;

Expand Down

0 comments on commit dbb782e

Please sign in to comment.