Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package datadog.communication.serialization;

import static datadog.trace.util.BitUtils.nextPowerOfTwo;

import java.nio.ByteBuffer;

/**
Expand All @@ -12,8 +14,8 @@ public final class GrowableBuffer implements StreamingBuffer {
private ByteBuffer buffer;
private int messageCount;

public GrowableBuffer(int initialCapacity) {
this.initialCapacity = initialCapacity;
public GrowableBuffer(int requiredCapacity) {
this.initialCapacity = nextPowerOfTwo(requiredCapacity);
this.buffer = ByteBuffer.allocate(initialCapacity);
}

Expand Down Expand Up @@ -122,7 +124,7 @@ public void put(ByteBuffer buffer) {

private void checkCapacity(int required) {
if (buffer.remaining() < required) {
// round up to next multiple of required
// round up to next multiple of initialCapacity that can accommodate required
int newSize = (buffer.capacity() + required + initialCapacity - 1) & -initialCapacity;
ByteBuffer newBuffer = ByteBuffer.allocate(newSize);
buffer.flip();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,14 @@ public void byteBufferTriggersResize() {
@Test
public void testBufferCapacity() {
GrowableBuffer gb = new GrowableBuffer(5);
assertEquals(5, gb.capacity());
assertEquals(8, gb.capacity());
ByteBuffer buffer = ByteBuffer.allocate(20);
for (int i = 0; i < 5; ++i) {
buffer.putInt(i);
}
buffer.flip();
gb.put(buffer);
assertEquals(25, gb.capacity());
assertEquals(32, gb.capacity());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
package datadog.trace.common.writer;

import datadog.trace.core.CoreSpan;
import datadog.trace.core.DDSpanContext;
import datadog.trace.core.otlp.common.OtlpPayload;
import datadog.trace.core.otlp.common.OtlpSender;
import datadog.trace.core.otlp.trace.OtlpTraceCollector;
import datadog.trace.core.otlp.trace.OtlpTraceProtoCollector;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
Expand All @@ -26,18 +24,7 @@ final class OtlpPayloadDispatcher implements PayloadDispatcher {

@Override
public void addTrace(List<? extends CoreSpan<?>> trace) {
List<CoreSpan<?>> sampled = null;
for (CoreSpan<?> span : trace) {
if (shouldExport(span)) {
if (sampled == null) {
sampled = new ArrayList<>(trace.size());
}
sampled.add(span);
}
}
if (sampled != null) {
collector.addTrace(sampled);
}
collector.addTrace(trace);
}

@Override
Expand All @@ -57,13 +44,4 @@ public void onDroppedTrace(int spanCount) {
public Collection<RemoteApi> getApis() {
return Collections.emptyList();
}

private static boolean shouldExport(CoreSpan<?> span) {
// trace-level sampling priority
if (span.samplingPriority() > 0) {
return true;
}
// span-level sampling priority
return span.getTag(DDSpanContext.SPAN_SAMPLING_MECHANISM_TAG) != null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import static java.nio.charset.StandardCharsets.UTF_8;

import datadog.communication.serialization.GenerationalUtf8Cache;
import datadog.communication.serialization.GrowableBuffer;
import datadog.communication.serialization.SimpleUtf8Cache;
import datadog.communication.serialization.StreamingBuffer;
import datadog.trace.api.Config;
Expand Down Expand Up @@ -125,26 +124,6 @@ public static void writeTag(StreamingBuffer buf, int fieldNum, int wireType) {
writeVarInt(buf, fieldNum << 3 | wireType);
}

public static byte[] recordMessage(GrowableBuffer buf, int fieldNum) {
return recordMessage(buf, fieldNum, 0);
}

public static byte[] recordMessage(GrowableBuffer buf, int fieldNum, int remainingBytes) {
try {
ByteBuffer data = buf.flip();
int dataSize = data.remaining();
int expectedSize = dataSize + remainingBytes;
ByteBuffer message =
ByteBuffer.allocate(sizeTag(fieldNum) + sizeVarInt(expectedSize) + dataSize);
writeTag(message, fieldNum, LEN_WIRE_TYPE);
writeVarInt(message, expectedSize);
message.put(data);
return message.array();
} finally {
buf.reset();
}
}

public static void writeInstrumentationScope(
StreamingBuffer buf, OtelInstrumentationScope scope) {
byte[] nameUtf8 = scope.getName().getUtf8Bytes();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public void writeTo(@Nonnull BufferedSink sink) throws IOException {
if (gzip) {
try (Buffer gzipBody = new Buffer()) {
try (BufferedSink gzipSink = Okio.buffer(new GzipSink(gzipBody))) {
payload.drain(gzipSink::write);
gzipSink.write(payload.getContent());
}
sink.writeByte(COMPRESSED_FLAG);
long gzipLength = gzipBody.size();
Expand All @@ -52,7 +52,7 @@ public void writeTo(@Nonnull BufferedSink sink) throws IOException {
} else {
sink.writeByte(UNCOMPRESSED_FLAG);
sink.writeInt(payload.getContentLength());
payload.drain(sink::write);
sink.write(payload.getContent());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@ public MediaType contentType() {
public void writeTo(@Nonnull BufferedSink sink) throws IOException {
if (gzip) {
try (BufferedSink gzipSink = Okio.buffer(new GzipSink(sink))) {
payload.drain(gzipSink::write);
gzipSink.write(payload.getContent());
}
} else {
payload.drain(sink::write);
sink.write(payload.getContent());
}
}
}
Original file line number Diff line number Diff line change
@@ -1,29 +1,22 @@
package datadog.trace.core.otlp.common;

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Deque;
import java.nio.ByteBuffer;

/** OTLP payload consisting of a sequence of chunked byte-arrays. */
public final class OtlpPayload {
public static final OtlpPayload EMPTY = new OtlpPayload(new ArrayDeque<>(), 0, "");
public static final OtlpPayload EMPTY = new OtlpPayload(ByteBuffer.allocate(0), "");

private final Deque<byte[]> chunks;
private final ByteBuffer content;
private final int contentLength;
private final String contentType;

public OtlpPayload(Deque<byte[]> chunks, int contentLength, String contentType) {
this.chunks = chunks;
this.contentLength = contentLength;
public OtlpPayload(ByteBuffer content, String contentType) {
this.content = content;
this.contentLength = content.remaining();
this.contentType = contentType;
}

/** Drains the chunked payload to the given sink. */
public void drain(OtlpSink sink) throws IOException {
byte[] chunk;
while ((chunk = chunks.pollFirst()) != null) {
sink.write(chunk);
}
public ByteBuffer getContent() {
return content.asReadOnlyBuffer();
}

public int getContentLength() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
package datadog.trace.core.otlp.common;

import static datadog.trace.core.otlp.common.OtlpCommonProto.LEN_WIRE_TYPE;
import static datadog.trace.core.otlp.common.OtlpCommonProto.sizeVarInt;
import static datadog.trace.core.otlp.common.OtlpCommonProto.writeVarInt;
import static datadog.trace.util.BitUtils.nextPowerOfTwo;

import datadog.communication.serialization.GrowableBuffer;
import java.nio.ByteBuffer;

/**
* Growable buffer optimized for prepending protobuf messages. This buffer doesn't have a bounded
* length, and grows linearly. It should only be used to serialize bounded data structures.
*
* <p>Messages appear in the final payload in reverse insertion order.
*
* @see GrowableBuffer
*/
public final class OtlpProtoBuffer {
private static final String PROTOBUF_CONTENT_TYPE = "application/x-protobuf";

private final int initialCapacity;
private ByteBuffer buffer;
private int remaining;

public OtlpProtoBuffer(int requiredCapacity) {
this.initialCapacity = nextPowerOfTwo(requiredCapacity);
this.buffer = ByteBuffer.allocate(initialCapacity);
this.remaining = initialCapacity;
}

/**
* Records a self-contained protobuf message.
*
* @param buf buffer containing the message body
* @param fieldNum field number of the message
* @return overall size of the message in bytes
*/
public int recordMessage(GrowableBuffer buf, int fieldNum) {
return recordMessage(buf, fieldNum, 0);
}

/**
* Records a protobuf message that has zero or more nested elements already recorded.
*
* @param buf buffer containing the message header
* @param fieldNum field number of the message
* @param bytesSoFar nested element bytes recorded so far
* @return overall size of the message in bytes
*/
public int recordMessage(GrowableBuffer buf, int fieldNum, int bytesSoFar) {
try {
ByteBuffer message = buf.flip();
// calculate space needed to encode message, its total length, and the tag
int messageSize = message.remaining();
int length = messageSize + bytesSoFar;
int tag = fieldNum << 3 | LEN_WIRE_TYPE;
int numBytes = sizeVarInt(tag) + sizeVarInt(length) + messageSize;
// grow the buffer to fit the incoming content
checkCapacity(numBytes);
remaining -= numBytes;
// reposition so we can write the encoded message
buffer.position(remaining);
// write the usual prelude
writeVarInt(buffer, tag);
writeVarInt(buffer, length);
// write the primary message
buffer.put(message);
// no need to reset position here; it's always reset before any write/read
return numBytes + bytesSoFar;
} finally {
buf.reset();
}
}

/**
* Records a previously cached protobuf message.
*
* @param bytes cached bytes containing the message header
* @return overall size of the message in bytes
*/
public int recordMessage(byte[] bytes) {
// grow the buffer to fit the incoming content
int numBytes = bytes.length;
checkCapacity(numBytes);
remaining -= numBytes;
// reposition so we can write the cached message
buffer.position(remaining);
buffer.put(bytes);
// no need to reset position here; it's always reset before any write/read
return numBytes;
}

/** Flips the buffer, returning the protobuf encoded content for reading. */
public ByteBuffer flip() {
buffer.position(remaining);
return buffer;
}

/**
* Returns an {@link OtlpPayload} containing the protobuf encoded content.
*
* <p>This payload is only valid for the calling thread until the next collection.
*/
public OtlpPayload toPayload() {
return new OtlpPayload(flip(), PROTOBUF_CONTENT_TYPE);
}

/**
* Resets the buffer in anticipation of the next collection cycle.
*
* <p>This does not affect the active payload, which remains valid until the next collection.
*/
public void reset() {
if (buffer.capacity() > initialCapacity) {
buffer = ByteBuffer.allocate(initialCapacity);
}
remaining = buffer.capacity();
}

/** Grows the buffer to ensure the required number of bytes can be prepended. */
private void checkCapacity(int required) {
if (remaining < required) {
ByteBuffer oldBuffer = flip();
int oldSize = oldBuffer.remaining();
// round up to next multiple of initialCapacity that can accommodate required
int newSize = (oldSize + required + initialCapacity - 1) & -initialCapacity;
ByteBuffer newBuffer = ByteBuffer.allocate(newSize);
// copy over old content so it stays at the far end
remaining = newSize - oldSize;
newBuffer.position(remaining);
newBuffer.put(oldBuffer);
buffer = newBuffer;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import static datadog.communication.ddagent.TracerVersion.TRACER_VERSION;
import static datadog.trace.bootstrap.otlp.common.OtlpAttributeVisitor.STRING_ATTRIBUTE;
import static datadog.trace.core.otlp.common.OtlpCommonProto.LEN_WIRE_TYPE;
import static datadog.trace.core.otlp.common.OtlpCommonProto.recordMessage;
import static datadog.trace.core.otlp.common.OtlpCommonProto.writeAttribute;
import static datadog.trace.core.otlp.common.OtlpCommonProto.writeTag;

Expand Down Expand Up @@ -62,7 +61,12 @@ static byte[] buildResourceMessage(Config config) {
}
});

return recordMessage(buf, 1);
OtlpProtoBuffer protobuf = new OtlpProtoBuffer(buf.capacity());
int numBytes = protobuf.recordMessage(buf, 1);
byte[] resourceMessage = new byte[numBytes];
protobuf.flip().get(resourceMessage);

return resourceMessage;
}

private static void writeResourceAttribute(StreamingBuffer buf, String key, String value) {
Expand Down

This file was deleted.

Loading