From 6f5caa38ede4a033f2b805245b0bf14247f7ae0a Mon Sep 17 00:00:00 2001 From: Stuart McCulloch Date: Sun, 3 May 2026 17:44:36 +0100 Subject: [PATCH] Marshals proto messages into a single prepending buffer, instead of many small byte-arrays and move decision whether to export a span into OtlpTraceCollector (avoids re-allocations) --- .../serialization/GrowableBuffer.java | 8 +- .../serialization/GrowableBufferTest.java | 4 +- .../common/writer/OtlpPayloadDispatcher.java | 24 +- .../core/otlp/common/OtlpCommonProto.java | 21 - .../core/otlp/common/OtlpGrpcRequestBody.java | 4 +- .../core/otlp/common/OtlpHttpRequestBody.java | 4 +- .../trace/core/otlp/common/OtlpPayload.java | 23 +- .../core/otlp/common/OtlpProtoBuffer.java | 136 +++++ .../core/otlp/common/OtlpResourceProto.java | 8 +- .../trace/core/otlp/common/OtlpSink.java | 9 - .../trace/core/otlp/logs/OtlpLogsProto.java | 23 +- .../otlp/logs/OtlpLogsProtoCollector.java | 38 +- .../core/otlp/metrics/OtlpMetricsProto.java | 47 +- .../metrics/OtlpMetricsProtoCollector.java | 48 +- .../core/otlp/trace/OtlpTraceCollector.java | 7 + .../trace/core/otlp/trace/OtlpTraceProto.java | 38 +- .../otlp/trace/OtlpTraceProtoCollector.java | 75 +-- .../writer/OtlpPayloadDispatcherTest.java | 95 ++-- .../otlp/common/OtlpHttpRequestBodyTest.java | 45 +- .../core/otlp/common/OtlpProtoBufferTest.java | 440 ++++++++++++++++ .../core/otlp/logs/OtlpLogsProtoTest.java | 449 ++++++++-------- .../otlp/metrics/OtlpMetricsProtoTest.java | 428 +++++++-------- .../core/otlp/trace/OtlpTraceProtoTest.java | 494 ++++++++---------- 23 files changed, 1431 insertions(+), 1037 deletions(-) create mode 100644 dd-trace-core/src/main/java/datadog/trace/core/otlp/common/OtlpProtoBuffer.java delete mode 100644 dd-trace-core/src/main/java/datadog/trace/core/otlp/common/OtlpSink.java create mode 100644 dd-trace-core/src/test/java/datadog/trace/core/otlp/common/OtlpProtoBufferTest.java diff --git a/communication/src/main/java/datadog/communication/serialization/GrowableBuffer.java b/communication/src/main/java/datadog/communication/serialization/GrowableBuffer.java index 81b55aba7d2..54894efec52 100644 --- a/communication/src/main/java/datadog/communication/serialization/GrowableBuffer.java +++ b/communication/src/main/java/datadog/communication/serialization/GrowableBuffer.java @@ -1,5 +1,7 @@ package datadog.communication.serialization; +import static datadog.trace.util.BitUtils.nextPowerOfTwo; + import java.nio.ByteBuffer; /** @@ -12,8 +14,8 @@ public final class GrowableBuffer implements StreamingBuffer { private ByteBuffer buffer; private int messageCount; - public GrowableBuffer(int initialCapacity) { - this.initialCapacity = initialCapacity; + public GrowableBuffer(int requiredCapacity) { + this.initialCapacity = nextPowerOfTwo(requiredCapacity); this.buffer = ByteBuffer.allocate(initialCapacity); } @@ -122,7 +124,7 @@ public void put(ByteBuffer buffer) { private void checkCapacity(int required) { if (buffer.remaining() < required) { - // round up to next multiple of required + // round up to next multiple of initialCapacity that can accommodate required int newSize = (buffer.capacity() + required + initialCapacity - 1) & -initialCapacity; ByteBuffer newBuffer = ByteBuffer.allocate(newSize); buffer.flip(); diff --git a/communication/src/test/java/datadog/communication/serialization/GrowableBufferTest.java b/communication/src/test/java/datadog/communication/serialization/GrowableBufferTest.java index 9b0c06daecd..c018cce7aa3 100644 --- a/communication/src/test/java/datadog/communication/serialization/GrowableBufferTest.java +++ b/communication/src/test/java/datadog/communication/serialization/GrowableBufferTest.java @@ -27,14 +27,14 @@ public void byteBufferTriggersResize() { @Test public void testBufferCapacity() { GrowableBuffer gb = new GrowableBuffer(5); - assertEquals(5, gb.capacity()); + assertEquals(8, gb.capacity()); ByteBuffer buffer = ByteBuffer.allocate(20); for (int i = 0; i < 5; ++i) { buffer.putInt(i); } buffer.flip(); gb.put(buffer); - assertEquals(25, gb.capacity()); + assertEquals(32, gb.capacity()); } @Test diff --git a/dd-trace-core/src/main/java/datadog/trace/common/writer/OtlpPayloadDispatcher.java b/dd-trace-core/src/main/java/datadog/trace/common/writer/OtlpPayloadDispatcher.java index 16bcd71de4f..4f198f1e508 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/writer/OtlpPayloadDispatcher.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/writer/OtlpPayloadDispatcher.java @@ -1,12 +1,10 @@ package datadog.trace.common.writer; import datadog.trace.core.CoreSpan; -import datadog.trace.core.DDSpanContext; import datadog.trace.core.otlp.common.OtlpPayload; import datadog.trace.core.otlp.common.OtlpSender; import datadog.trace.core.otlp.trace.OtlpTraceCollector; import datadog.trace.core.otlp.trace.OtlpTraceProtoCollector; -import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; @@ -26,18 +24,7 @@ final class OtlpPayloadDispatcher implements PayloadDispatcher { @Override public void addTrace(List> trace) { - List> sampled = null; - for (CoreSpan span : trace) { - if (shouldExport(span)) { - if (sampled == null) { - sampled = new ArrayList<>(trace.size()); - } - sampled.add(span); - } - } - if (sampled != null) { - collector.addTrace(sampled); - } + collector.addTrace(trace); } @Override @@ -57,13 +44,4 @@ public void onDroppedTrace(int spanCount) { public Collection getApis() { return Collections.emptyList(); } - - private static boolean shouldExport(CoreSpan span) { - // trace-level sampling priority - if (span.samplingPriority() > 0) { - return true; - } - // span-level sampling priority - return span.getTag(DDSpanContext.SPAN_SAMPLING_MECHANISM_TAG) != null; - } } diff --git a/dd-trace-core/src/main/java/datadog/trace/core/otlp/common/OtlpCommonProto.java b/dd-trace-core/src/main/java/datadog/trace/core/otlp/common/OtlpCommonProto.java index f312499cf97..88fc102a335 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/otlp/common/OtlpCommonProto.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/otlp/common/OtlpCommonProto.java @@ -11,7 +11,6 @@ import static java.nio.charset.StandardCharsets.UTF_8; import datadog.communication.serialization.GenerationalUtf8Cache; -import datadog.communication.serialization.GrowableBuffer; import datadog.communication.serialization.SimpleUtf8Cache; import datadog.communication.serialization.StreamingBuffer; import datadog.trace.api.Config; @@ -125,26 +124,6 @@ public static void writeTag(StreamingBuffer buf, int fieldNum, int wireType) { writeVarInt(buf, fieldNum << 3 | wireType); } - public static byte[] recordMessage(GrowableBuffer buf, int fieldNum) { - return recordMessage(buf, fieldNum, 0); - } - - public static byte[] recordMessage(GrowableBuffer buf, int fieldNum, int remainingBytes) { - try { - ByteBuffer data = buf.flip(); - int dataSize = data.remaining(); - int expectedSize = dataSize + remainingBytes; - ByteBuffer message = - ByteBuffer.allocate(sizeTag(fieldNum) + sizeVarInt(expectedSize) + dataSize); - writeTag(message, fieldNum, LEN_WIRE_TYPE); - writeVarInt(message, expectedSize); - message.put(data); - return message.array(); - } finally { - buf.reset(); - } - } - public static void writeInstrumentationScope( StreamingBuffer buf, OtelInstrumentationScope scope) { byte[] nameUtf8 = scope.getName().getUtf8Bytes(); diff --git a/dd-trace-core/src/main/java/datadog/trace/core/otlp/common/OtlpGrpcRequestBody.java b/dd-trace-core/src/main/java/datadog/trace/core/otlp/common/OtlpGrpcRequestBody.java index b6f54cd4e0a..24457780b1d 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/otlp/common/OtlpGrpcRequestBody.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/otlp/common/OtlpGrpcRequestBody.java @@ -42,7 +42,7 @@ public void writeTo(@Nonnull BufferedSink sink) throws IOException { if (gzip) { try (Buffer gzipBody = new Buffer()) { try (BufferedSink gzipSink = Okio.buffer(new GzipSink(gzipBody))) { - payload.drain(gzipSink::write); + gzipSink.write(payload.getContent()); } sink.writeByte(COMPRESSED_FLAG); long gzipLength = gzipBody.size(); @@ -52,7 +52,7 @@ public void writeTo(@Nonnull BufferedSink sink) throws IOException { } else { sink.writeByte(UNCOMPRESSED_FLAG); sink.writeInt(payload.getContentLength()); - payload.drain(sink::write); + sink.write(payload.getContent()); } } } diff --git a/dd-trace-core/src/main/java/datadog/trace/core/otlp/common/OtlpHttpRequestBody.java b/dd-trace-core/src/main/java/datadog/trace/core/otlp/common/OtlpHttpRequestBody.java index 9f87fdf102c..a94d5c59f08 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/otlp/common/OtlpHttpRequestBody.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/otlp/common/OtlpHttpRequestBody.java @@ -35,10 +35,10 @@ public MediaType contentType() { public void writeTo(@Nonnull BufferedSink sink) throws IOException { if (gzip) { try (BufferedSink gzipSink = Okio.buffer(new GzipSink(sink))) { - payload.drain(gzipSink::write); + gzipSink.write(payload.getContent()); } } else { - payload.drain(sink::write); + sink.write(payload.getContent()); } } } diff --git a/dd-trace-core/src/main/java/datadog/trace/core/otlp/common/OtlpPayload.java b/dd-trace-core/src/main/java/datadog/trace/core/otlp/common/OtlpPayload.java index 38075244d5b..a846fdb4ecf 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/otlp/common/OtlpPayload.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/otlp/common/OtlpPayload.java @@ -1,29 +1,22 @@ package datadog.trace.core.otlp.common; -import java.io.IOException; -import java.util.ArrayDeque; -import java.util.Deque; +import java.nio.ByteBuffer; -/** OTLP payload consisting of a sequence of chunked byte-arrays. */ public final class OtlpPayload { - public static final OtlpPayload EMPTY = new OtlpPayload(new ArrayDeque<>(), 0, ""); + public static final OtlpPayload EMPTY = new OtlpPayload(ByteBuffer.allocate(0), ""); - private final Deque chunks; + private final ByteBuffer content; private final int contentLength; private final String contentType; - public OtlpPayload(Deque chunks, int contentLength, String contentType) { - this.chunks = chunks; - this.contentLength = contentLength; + public OtlpPayload(ByteBuffer content, String contentType) { + this.content = content; + this.contentLength = content.remaining(); this.contentType = contentType; } - /** Drains the chunked payload to the given sink. */ - public void drain(OtlpSink sink) throws IOException { - byte[] chunk; - while ((chunk = chunks.pollFirst()) != null) { - sink.write(chunk); - } + public ByteBuffer getContent() { + return content.asReadOnlyBuffer(); } public int getContentLength() { diff --git a/dd-trace-core/src/main/java/datadog/trace/core/otlp/common/OtlpProtoBuffer.java b/dd-trace-core/src/main/java/datadog/trace/core/otlp/common/OtlpProtoBuffer.java new file mode 100644 index 00000000000..f60a3cda6be --- /dev/null +++ b/dd-trace-core/src/main/java/datadog/trace/core/otlp/common/OtlpProtoBuffer.java @@ -0,0 +1,136 @@ +package datadog.trace.core.otlp.common; + +import static datadog.trace.core.otlp.common.OtlpCommonProto.LEN_WIRE_TYPE; +import static datadog.trace.core.otlp.common.OtlpCommonProto.sizeVarInt; +import static datadog.trace.core.otlp.common.OtlpCommonProto.writeVarInt; +import static datadog.trace.util.BitUtils.nextPowerOfTwo; + +import datadog.communication.serialization.GrowableBuffer; +import java.nio.ByteBuffer; + +/** + * Growable buffer optimized for prepending protobuf messages. This buffer doesn't have a bounded + * length, and grows linearly. It should only be used to serialize bounded data structures. + * + *

Messages appear in the final payload in reverse insertion order. + * + * @see GrowableBuffer + */ +public final class OtlpProtoBuffer { + private static final String PROTOBUF_CONTENT_TYPE = "application/x-protobuf"; + + private final int initialCapacity; + private ByteBuffer buffer; + private int remaining; + + public OtlpProtoBuffer(int requiredCapacity) { + this.initialCapacity = nextPowerOfTwo(requiredCapacity); + this.buffer = ByteBuffer.allocate(initialCapacity); + this.remaining = initialCapacity; + } + + /** + * Records a self-contained protobuf message. + * + * @param buf buffer containing the message body + * @param fieldNum field number of the message + * @return overall size of the message in bytes + */ + public int recordMessage(GrowableBuffer buf, int fieldNum) { + return recordMessage(buf, fieldNum, 0); + } + + /** + * Records a protobuf message that has zero or more nested elements already recorded. + * + * @param buf buffer containing the message header + * @param fieldNum field number of the message + * @param bytesSoFar nested element bytes recorded so far + * @return overall size of the message in bytes + */ + public int recordMessage(GrowableBuffer buf, int fieldNum, int bytesSoFar) { + try { + ByteBuffer message = buf.flip(); + // calculate space needed to encode message, its total length, and the tag + int messageSize = message.remaining(); + int length = messageSize + bytesSoFar; + int tag = fieldNum << 3 | LEN_WIRE_TYPE; + int numBytes = sizeVarInt(tag) + sizeVarInt(length) + messageSize; + // grow the buffer to fit the incoming content + checkCapacity(numBytes); + remaining -= numBytes; + // reposition so we can write the encoded message + buffer.position(remaining); + // write the usual prelude + writeVarInt(buffer, tag); + writeVarInt(buffer, length); + // write the primary message + buffer.put(message); + // no need to reset position here; it's always reset before any write/read + return numBytes + bytesSoFar; + } finally { + buf.reset(); + } + } + + /** + * Records a previously cached protobuf message. + * + * @param bytes cached bytes containing the message header + * @return overall size of the message in bytes + */ + public int recordMessage(byte[] bytes) { + // grow the buffer to fit the incoming content + int numBytes = bytes.length; + checkCapacity(numBytes); + remaining -= numBytes; + // reposition so we can write the cached message + buffer.position(remaining); + buffer.put(bytes); + // no need to reset position here; it's always reset before any write/read + return numBytes; + } + + /** Flips the buffer, returning the protobuf encoded content for reading. */ + public ByteBuffer flip() { + buffer.position(remaining); + return buffer; + } + + /** + * Returns an {@link OtlpPayload} containing the protobuf encoded content. + * + *

This payload is only valid for the calling thread until the next collection. + */ + public OtlpPayload toPayload() { + return new OtlpPayload(flip(), PROTOBUF_CONTENT_TYPE); + } + + /** + * Resets the buffer in anticipation of the next collection cycle. + * + *

This does not affect the active payload, which remains valid until the next collection. + */ + public void reset() { + if (buffer.capacity() > initialCapacity) { + buffer = ByteBuffer.allocate(initialCapacity); + } + remaining = buffer.capacity(); + } + + /** Grows the buffer to ensure the required number of bytes can be prepended. */ + private void checkCapacity(int required) { + if (remaining < required) { + ByteBuffer oldBuffer = flip(); + int oldSize = oldBuffer.remaining(); + // round up to next multiple of initialCapacity that can accommodate required + int newSize = (oldSize + required + initialCapacity - 1) & -initialCapacity; + ByteBuffer newBuffer = ByteBuffer.allocate(newSize); + // copy over old content so it stays at the far end + remaining = newSize - oldSize; + newBuffer.position(remaining); + newBuffer.put(oldBuffer); + buffer = newBuffer; + } + } +} diff --git a/dd-trace-core/src/main/java/datadog/trace/core/otlp/common/OtlpResourceProto.java b/dd-trace-core/src/main/java/datadog/trace/core/otlp/common/OtlpResourceProto.java index e45676d0721..0e45aa22ab7 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/otlp/common/OtlpResourceProto.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/otlp/common/OtlpResourceProto.java @@ -3,7 +3,6 @@ import static datadog.communication.ddagent.TracerVersion.TRACER_VERSION; import static datadog.trace.bootstrap.otlp.common.OtlpAttributeVisitor.STRING_ATTRIBUTE; import static datadog.trace.core.otlp.common.OtlpCommonProto.LEN_WIRE_TYPE; -import static datadog.trace.core.otlp.common.OtlpCommonProto.recordMessage; import static datadog.trace.core.otlp.common.OtlpCommonProto.writeAttribute; import static datadog.trace.core.otlp.common.OtlpCommonProto.writeTag; @@ -62,7 +61,12 @@ static byte[] buildResourceMessage(Config config) { } }); - return recordMessage(buf, 1); + OtlpProtoBuffer protobuf = new OtlpProtoBuffer(buf.capacity()); + int numBytes = protobuf.recordMessage(buf, 1); + byte[] resourceMessage = new byte[numBytes]; + protobuf.flip().get(resourceMessage); + + return resourceMessage; } private static void writeResourceAttribute(StreamingBuffer buf, String key, String value) { diff --git a/dd-trace-core/src/main/java/datadog/trace/core/otlp/common/OtlpSink.java b/dd-trace-core/src/main/java/datadog/trace/core/otlp/common/OtlpSink.java deleted file mode 100644 index 35c9d297fc7..00000000000 --- a/dd-trace-core/src/main/java/datadog/trace/core/otlp/common/OtlpSink.java +++ /dev/null @@ -1,9 +0,0 @@ -package datadog.trace.core.otlp.common; - -import java.io.IOException; - -/** Receives chunks of OTLP data. */ -@FunctionalInterface -public interface OtlpSink { - void write(byte[] chunk) throws IOException; -} diff --git a/dd-trace-core/src/main/java/datadog/trace/core/otlp/logs/OtlpLogsProto.java b/dd-trace-core/src/main/java/datadog/trace/core/otlp/logs/OtlpLogsProto.java index a060084f6bc..62e4964c854 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/otlp/logs/OtlpLogsProto.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/otlp/logs/OtlpLogsProto.java @@ -4,7 +4,6 @@ import static datadog.trace.core.otlp.common.OtlpCommonProto.I64_WIRE_TYPE; import static datadog.trace.core.otlp.common.OtlpCommonProto.LEN_WIRE_TYPE; import static datadog.trace.core.otlp.common.OtlpCommonProto.VARINT_WIRE_TYPE; -import static datadog.trace.core.otlp.common.OtlpCommonProto.recordMessage; import static datadog.trace.core.otlp.common.OtlpCommonProto.sizeVarInt; import static datadog.trace.core.otlp.common.OtlpCommonProto.writeI32; import static datadog.trace.core.otlp.common.OtlpCommonProto.writeI64; @@ -21,17 +20,18 @@ import datadog.communication.serialization.GrowableBuffer; import datadog.trace.bootstrap.otel.common.OtelInstrumentationScope; import datadog.trace.bootstrap.otlp.logs.OtlpLogRecord; +import datadog.trace.core.otlp.common.OtlpProtoBuffer; /** Provides optimized writers for OpenTelemetry's "logs.proto" wire protocol. */ public final class OtlpLogsProto { private OtlpLogsProto() {} - /** - * Records the first part of a scoped logs message where we know its nested log messages will - * follow in one or more byte-arrays that add up to the given number of remaining bytes. - */ - public static byte[] recordScopedLogsMessage( - GrowableBuffer buf, OtelInstrumentationScope scope, int remainingBytes) { + /** Records a scoped logs message after its nested log messages have been recorded. */ + public static int recordScopedLogsMessage( + GrowableBuffer buf, + OtelInstrumentationScope scope, + int nestedLogBytes, + OtlpProtoBuffer protobuf) { writeTag(buf, 1, LEN_WIRE_TYPE); writeInstrumentationScope(buf, scope); @@ -40,11 +40,12 @@ public static byte[] recordScopedLogsMessage( writeString(buf, scope.getSchemaUrl().getUtf8Bytes()); } - return recordMessage(buf, 2, remainingBytes); + return protobuf.recordMessage(buf, 2, nestedLogBytes); } - /** Completes recording of a log record message and packs it into its own byte-array. */ - public static byte[] recordLogRecordMessage(GrowableBuffer buf, OtlpLogRecord logRecord) { + /** Records a log message. */ + public static int recordLogRecordMessage( + GrowableBuffer buf, OtlpLogRecord logRecord, OtlpProtoBuffer protobuf) { writeTag(buf, 1, I64_WIRE_TYPE); writeI64(buf, logRecord.timestampNanos); @@ -88,6 +89,6 @@ public static byte[] recordLogRecordMessage(GrowableBuffer buf, OtlpLogRecord lo writeStringCached(buf, logRecord.eventName); } - return recordMessage(buf, 2); + return protobuf.recordMessage(buf, 2); } } diff --git a/dd-trace-core/src/main/java/datadog/trace/core/otlp/logs/OtlpLogsProtoCollector.java b/dd-trace-core/src/main/java/datadog/trace/core/otlp/logs/OtlpLogsProtoCollector.java index 176354465a8..30cb17193e2 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/otlp/logs/OtlpLogsProtoCollector.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/otlp/logs/OtlpLogsProtoCollector.java @@ -1,7 +1,6 @@ package datadog.trace.core.otlp.logs; import static datadog.trace.core.otlp.common.OtlpCommonProto.LEN_WIRE_TYPE; -import static datadog.trace.core.otlp.common.OtlpCommonProto.recordMessage; import static datadog.trace.core.otlp.common.OtlpCommonProto.writeAttribute; import static datadog.trace.core.otlp.common.OtlpCommonProto.writeTag; import static datadog.trace.core.otlp.common.OtlpResourceProto.RESOURCE_MESSAGE; @@ -16,10 +15,7 @@ import datadog.trace.bootstrap.otlp.logs.OtlpScopedLogsVisitor; import datadog.trace.core.otlp.common.OtlpCommonProto; import datadog.trace.core.otlp.common.OtlpPayload; -import java.util.ArrayDeque; -import java.util.ArrayList; -import java.util.Deque; -import java.util.List; +import datadog.trace.core.otlp.common.OtlpProtoBuffer; import java.util.function.Consumer; /** @@ -42,10 +38,7 @@ public final class OtlpLogsProtoCollector extends OtlpLogsCollector private static final String PROTOBUF_CONTENT_TYPE = "application/x-protobuf"; private final GrowableBuffer buf = new GrowableBuffer(512); - - // temporary collections of chunks at different nesting levels - private final Deque payloadChunks = new ArrayDeque<>(); - private final List scopedChunks = new ArrayList<>(); + private final OtlpProtoBuffer protobuf = new OtlpProtoBuffer(8192); // total number of chunked bytes at different nesting levels private int payloadBytes; @@ -78,9 +71,6 @@ OtlpPayload collectLogs(Consumer processor) { /** Prepare temporary elements to collect logs data. */ private void start() { - // clear payloadChunks in case it wasn't fully consumed via OtlpPayload - payloadChunks.clear(); - // remove stale entries from caches OtlpCommonProto.recalibrateCaches(); } @@ -88,9 +78,7 @@ private void start() { /** Cleanup elements used to collect logs data. */ private void stop() { buf.reset(); - - // leave payloadChunks in place so it can be consumed via OtlpPayload - scopedChunks.clear(); + protobuf.reset(); payloadBytes = 0; scopedBytes = 0; @@ -109,9 +97,7 @@ public OtlpScopedLogsVisitor visitScopedLogs(OtelInstrumentationScope scope) { @Override public void visitLogRecord(OtlpLogRecord logRecord) { - byte[] logChunk = recordLogRecordMessage(buf, logRecord); - scopedChunks.add(logChunk); - scopedBytes += logChunk.length; + scopedBytes += recordLogRecordMessage(buf, logRecord, protobuf); } @Override @@ -132,15 +118,11 @@ private OtlpPayload completePayload() { } // prepend the canned resource chunk - payloadChunks.addFirst(RESOURCE_MESSAGE); - payloadBytes += RESOURCE_MESSAGE.length; + payloadBytes += protobuf.recordMessage(RESOURCE_MESSAGE); // finally prepend the total length of all collected chunks - byte[] prefix = recordMessage(buf, 1, payloadBytes); - payloadChunks.addFirst(prefix); - payloadBytes += prefix.length; - - return new OtlpPayload(payloadChunks, payloadBytes, PROTOBUF_CONTENT_TYPE); + protobuf.recordMessage(buf, 1, payloadBytes); + return protobuf.toPayload(); } // called once we've processed all logs in a specific scope @@ -148,15 +130,11 @@ private void completeScope() { // add scoped logs message prefix to its nested chunks and promote to payload if (scopedBytes > 0) { - byte[] scopedPrefix = recordScopedLogsMessage(buf, currentScope, scopedBytes); - payloadChunks.add(scopedPrefix); - payloadChunks.addAll(scopedChunks); - payloadBytes += scopedPrefix.length + scopedBytes; + payloadBytes += recordScopedLogsMessage(buf, currentScope, scopedBytes, protobuf); } // reset temporary elements for next scope currentScope = null; - scopedChunks.clear(); scopedBytes = 0; } } diff --git a/dd-trace-core/src/main/java/datadog/trace/core/otlp/metrics/OtlpMetricsProto.java b/dd-trace-core/src/main/java/datadog/trace/core/otlp/metrics/OtlpMetricsProto.java index 8a73beb1681..751649940ba 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/otlp/metrics/OtlpMetricsProto.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/otlp/metrics/OtlpMetricsProto.java @@ -8,7 +8,6 @@ import static datadog.trace.core.otlp.common.OtlpCommonProto.I64_WIRE_TYPE; import static datadog.trace.core.otlp.common.OtlpCommonProto.LEN_WIRE_TYPE; import static datadog.trace.core.otlp.common.OtlpCommonProto.VARINT_WIRE_TYPE; -import static datadog.trace.core.otlp.common.OtlpCommonProto.recordMessage; import static datadog.trace.core.otlp.common.OtlpCommonProto.writeI64; import static datadog.trace.core.otlp.common.OtlpCommonProto.writeInstrumentationScope; import static datadog.trace.core.otlp.common.OtlpCommonProto.writeString; @@ -25,6 +24,7 @@ import datadog.trace.bootstrap.otlp.metrics.OtlpDoublePoint; import datadog.trace.bootstrap.otlp.metrics.OtlpHistogramPoint; import datadog.trace.bootstrap.otlp.metrics.OtlpLongPoint; +import datadog.trace.core.otlp.common.OtlpProtoBuffer; /** Provides optimized writers for OpenTelemetry's "metrics.proto" wire protocol. */ public final class OtlpMetricsProto { @@ -37,12 +37,12 @@ private OtlpMetricsProto() {} private static final int OBSERVABLE_COUNTER_TEMPORALITY = temporality(OBSERVABLE_COUNTER); private static final int HISTOGRAM_TEMPORALITY = temporality(HISTOGRAM); - /** - * Records the first part of a scoped metrics message where we know its nested metric messages - * will follow in one or more byte-arrays that add up to the given number of remaining bytes. - */ - public static byte[] recordScopedMetricsMessage( - GrowableBuffer buf, OtelInstrumentationScope scope, int remainingBytes) { + /** Records a scoped metrics message after its nested metric messages have been recorded. */ + public static int recordScopedMetricsMessage( + GrowableBuffer buf, + OtelInstrumentationScope scope, + int nestedMetricBytes, + OtlpProtoBuffer protobuf) { writeTag(buf, 1, LEN_WIRE_TYPE); writeInstrumentationScope(buf, scope); @@ -51,15 +51,15 @@ public static byte[] recordScopedMetricsMessage( writeString(buf, scope.getSchemaUrl().getUtf8Bytes()); } - return recordMessage(buf, 2, remainingBytes); + return protobuf.recordMessage(buf, 2, nestedMetricBytes); } - /** - * Records the first part of a metric message where we know that its nested data point messages - * will follow in one or more byte-arrays that add up to the given number of remaining bytes. - */ - public static byte[] recordMetricMessage( - GrowableBuffer buf, OtelInstrumentDescriptor descriptor, int remainingBytes) { + /** Records a metric message after its nested data point messages have been recorded. */ + public static int recordMetricMessage( + GrowableBuffer buf, + OtelInstrumentDescriptor descriptor, + int nestedDataPointBytes, + OtlpProtoBuffer protobuf) { writeTag(buf, 1, LEN_WIRE_TYPE); writeString(buf, descriptor.getName().getUtf8Bytes()); @@ -76,12 +76,12 @@ public static byte[] recordMetricMessage( case GAUGE: case OBSERVABLE_GAUGE: writeTag(buf, 5, LEN_WIRE_TYPE); - writeVarInt(buf, remainingBytes); + writeVarInt(buf, nestedDataPointBytes); // gauges have no aggregation temporality break; case COUNTER: writeTag(buf, 7, LEN_WIRE_TYPE); - writeVarInt(buf, remainingBytes + 4); + writeVarInt(buf, nestedDataPointBytes + 4); writeTag(buf, 2, VARINT_WIRE_TYPE); writeVarInt(buf, COUNTER_TEMPORALITY); writeTag(buf, 3, VARINT_WIRE_TYPE); @@ -89,7 +89,7 @@ public static byte[] recordMetricMessage( break; case OBSERVABLE_COUNTER: writeTag(buf, 7, LEN_WIRE_TYPE); - writeVarInt(buf, remainingBytes + 4); + writeVarInt(buf, nestedDataPointBytes + 4); writeTag(buf, 2, VARINT_WIRE_TYPE); writeVarInt(buf, OBSERVABLE_COUNTER_TEMPORALITY); writeTag(buf, 3, VARINT_WIRE_TYPE); @@ -98,14 +98,14 @@ public static byte[] recordMetricMessage( case UP_DOWN_COUNTER: case OBSERVABLE_UP_DOWN_COUNTER: writeTag(buf, 7, LEN_WIRE_TYPE); - writeVarInt(buf, remainingBytes + 2); + writeVarInt(buf, nestedDataPointBytes + 2); writeTag(buf, 2, VARINT_WIRE_TYPE); // up/down counters are always cumulative writeVarInt(buf, TEMPORALITY_CUMULATIVE); break; case HISTOGRAM: writeTag(buf, 9, LEN_WIRE_TYPE); - writeVarInt(buf, remainingBytes + 2); + writeVarInt(buf, nestedDataPointBytes + 2); writeTag(buf, 2, VARINT_WIRE_TYPE); writeVarInt(buf, HISTOGRAM_TEMPORALITY); break; @@ -113,11 +113,12 @@ public static byte[] recordMetricMessage( throw new IllegalArgumentException("Unknown instrument type: " + descriptor.getType()); } - return recordMessage(buf, 2, remainingBytes); + return protobuf.recordMessage(buf, 2, nestedDataPointBytes); } - /** Completes recording of a data point message and packs it into its own byte-array. */ - public static byte[] recordDataPointMessage(GrowableBuffer buf, OtlpDataPoint point) { + /** Records a data point message. */ + public static int recordDataPointMessage( + GrowableBuffer buf, OtlpDataPoint point, OtlpProtoBuffer protobuf) { if (point instanceof OtlpDoublePoint) { writeTag(buf, 4, I64_WIRE_TYPE); writeI64(buf, ((OtlpDoublePoint) point).value); @@ -155,7 +156,7 @@ public static byte[] recordDataPointMessage(GrowableBuffer buf, OtlpDataPoint po } } - return recordMessage(buf, 1); + return protobuf.recordMessage(buf, 1); } private static int temporality(OtelInstrumentType type) { diff --git a/dd-trace-core/src/main/java/datadog/trace/core/otlp/metrics/OtlpMetricsProtoCollector.java b/dd-trace-core/src/main/java/datadog/trace/core/otlp/metrics/OtlpMetricsProtoCollector.java index 062c558b143..bf819668733 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/otlp/metrics/OtlpMetricsProtoCollector.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/otlp/metrics/OtlpMetricsProtoCollector.java @@ -5,7 +5,6 @@ import static datadog.trace.bootstrap.otel.metrics.OtelInstrumentType.OBSERVABLE_GAUGE; import static datadog.trace.core.otlp.common.OtlpCommonProto.I64_WIRE_TYPE; import static datadog.trace.core.otlp.common.OtlpCommonProto.LEN_WIRE_TYPE; -import static datadog.trace.core.otlp.common.OtlpCommonProto.recordMessage; import static datadog.trace.core.otlp.common.OtlpCommonProto.writeAttribute; import static datadog.trace.core.otlp.common.OtlpCommonProto.writeI64; import static datadog.trace.core.otlp.common.OtlpCommonProto.writeTag; @@ -27,10 +26,7 @@ import datadog.trace.bootstrap.otlp.metrics.OtlpScopedMetricsVisitor; import datadog.trace.core.otlp.common.OtlpCommonProto; import datadog.trace.core.otlp.common.OtlpPayload; -import java.util.ArrayDeque; -import java.util.ArrayList; -import java.util.Deque; -import java.util.List; +import datadog.trace.core.otlp.common.OtlpProtoBuffer; import java.util.function.Consumer; /** @@ -53,20 +49,14 @@ public final class OtlpMetricsProtoCollector extends OtlpMetricsCollector public static final OtlpMetricsProtoCollector INSTANCE = new OtlpMetricsProtoCollector(SystemTimeSource.INSTANCE); - private static final String PROTOBUF_CONTENT_TYPE = "application/x-protobuf"; - private final GrowableBuffer buf = new GrowableBuffer(512); + private final OtlpProtoBuffer protobuf = new OtlpProtoBuffer(8192); private final TimeSource timeSource; private long startNanos; private long endNanos; - // temporary collections of chunks at different nesting levels - private final Deque payloadChunks = new ArrayDeque<>(); - private final List scopedChunks = new ArrayList<>(); - private final List metricChunks = new ArrayList<>(); - // total number of chunked bytes at different nesting levels private int payloadBytes; private int scopedBytes; @@ -106,9 +96,6 @@ private void start() { startNanos = endNanos; endNanos = timeSource.getCurrentTimeNanos(); - // clear payloadChunks in case it wasn't fully consumed via OtlpPayload - payloadChunks.clear(); - // remove stale entries from caches OtlpCommonProto.recalibrateCaches(); } @@ -116,10 +103,7 @@ private void start() { /** Cleanup elements used to collect metrics data. */ private void stop() { buf.reset(); - - // leave payloadChunks in place so it can be consumed via OtlpPayload - scopedChunks.clear(); - metricChunks.clear(); + protobuf.reset(); payloadBytes = 0; scopedBytes = 0; @@ -167,9 +151,7 @@ public void visitDataPoint(OtlpDataPoint point) { writeI64(buf, endNanos); // add complete data point message to the metric chunks - byte[] pointMessage = recordDataPointMessage(buf, point); - metricChunks.add(pointMessage); - metricBytes += pointMessage.length; + metricBytes += recordDataPointMessage(buf, point, protobuf); } // called once we've processed all scopes and metric messages @@ -183,15 +165,11 @@ private OtlpPayload completePayload() { } // prepend the canned resource chunk - payloadChunks.addFirst(RESOURCE_MESSAGE); - payloadBytes += RESOURCE_MESSAGE.length; + payloadBytes += protobuf.recordMessage(RESOURCE_MESSAGE); // finally prepend the total length of all collected chunks - byte[] prefix = recordMessage(buf, 1, payloadBytes); - payloadChunks.addFirst(prefix); - payloadBytes += prefix.length; - - return new OtlpPayload(payloadChunks, payloadBytes, PROTOBUF_CONTENT_TYPE); + protobuf.recordMessage(buf, 1, payloadBytes); + return protobuf.toPayload(); } // called once we've processed all metrics in a specific scope @@ -202,15 +180,11 @@ private void completeScope() { // add scoped metrics message prefix to its nested chunks and promote to payload if (scopedBytes > 0) { - byte[] scopedPrefix = recordScopedMetricsMessage(buf, currentScope, scopedBytes); - payloadChunks.add(scopedPrefix); - payloadChunks.addAll(scopedChunks); - payloadBytes += scopedPrefix.length + scopedBytes; + payloadBytes += recordScopedMetricsMessage(buf, currentScope, scopedBytes, protobuf); } // reset temporary elements for next scope currentScope = null; - scopedChunks.clear(); scopedBytes = 0; } @@ -219,15 +193,11 @@ private void completeMetric() { // add metric message prefix to its nested chunks and promote to scoped if (metricBytes > 0) { - byte[] metricPrefix = recordMetricMessage(buf, currentMetric, metricBytes); - scopedChunks.add(metricPrefix); - scopedChunks.addAll(metricChunks); - scopedBytes += metricPrefix.length + metricBytes; + scopedBytes += recordMetricMessage(buf, currentMetric, metricBytes, protobuf); } // reset temporary elements for next metric currentMetric = null; - metricChunks.clear(); metricBytes = 0; } } diff --git a/dd-trace-core/src/main/java/datadog/trace/core/otlp/trace/OtlpTraceCollector.java b/dd-trace-core/src/main/java/datadog/trace/core/otlp/trace/OtlpTraceCollector.java index e7d53f3d009..07bfc2d1b84 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/otlp/trace/OtlpTraceCollector.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/otlp/trace/OtlpTraceCollector.java @@ -1,5 +1,7 @@ package datadog.trace.core.otlp.trace; +import static datadog.trace.core.DDSpanContext.SPAN_SAMPLING_MECHANISM_TAG; + import datadog.trace.core.CoreSpan; import datadog.trace.core.otlp.common.OtlpPayload; import java.util.List; @@ -10,4 +12,9 @@ public abstract class OtlpTraceCollector { public abstract void addTrace(List> spans); public abstract OtlpPayload collectTraces(); + + protected final boolean shouldExport(CoreSpan span) { + return span.samplingPriority() > 0 // trace-level sampling priority + || span.getTag(SPAN_SAMPLING_MECHANISM_TAG) != null; // span-level sampling priority + } } diff --git a/dd-trace-core/src/main/java/datadog/trace/core/otlp/trace/OtlpTraceProto.java b/dd-trace-core/src/main/java/datadog/trace/core/otlp/trace/OtlpTraceProto.java index 01bac2cf4fd..5f317b40730 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/otlp/trace/OtlpTraceProto.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/otlp/trace/OtlpTraceProto.java @@ -22,7 +22,6 @@ import static datadog.trace.core.otlp.common.OtlpCommonProto.I64_WIRE_TYPE; import static datadog.trace.core.otlp.common.OtlpCommonProto.LEN_WIRE_TYPE; import static datadog.trace.core.otlp.common.OtlpCommonProto.VARINT_WIRE_TYPE; -import static datadog.trace.core.otlp.common.OtlpCommonProto.recordMessage; import static datadog.trace.core.otlp.common.OtlpCommonProto.sizeVarInt; import static datadog.trace.core.otlp.common.OtlpCommonProto.writeAttribute; import static datadog.trace.core.otlp.common.OtlpCommonProto.writeI32; @@ -46,6 +45,7 @@ import datadog.trace.core.Metadata; import datadog.trace.core.MetadataConsumer; import datadog.trace.core.PendingTrace; +import datadog.trace.core.otlp.common.OtlpProtoBuffer; import datadog.trace.core.propagation.PropagationTags; /** Provides optimized writers for OpenTelemetry's "trace.proto" wire protocol. */ @@ -62,12 +62,12 @@ public final class OtlpTraceProto { private OtlpTraceProto() {} - /** - * Records the first part of a scoped spans message where we know its nested span messages will - * follow in one or more byte-arrays that add up to the given number of remaining bytes. - */ - public static byte[] recordScopedSpansMessage( - GrowableBuffer buf, OtelInstrumentationScope scope, int remainingBytes) { + /** Records a scoped spans message after its nested span messages have been recorded. */ + public static int recordScopedSpansMessage( + GrowableBuffer buf, + OtelInstrumentationScope scope, + int nestedSpanBytes, + OtlpProtoBuffer protobuf) { writeTag(buf, 1, LEN_WIRE_TYPE); writeInstrumentationScope(buf, scope); @@ -76,15 +76,16 @@ public static byte[] recordScopedSpansMessage( writeString(buf, scope.getSchemaUrl().getUtf8Bytes()); } - return recordMessage(buf, 2, remainingBytes); + return protobuf.recordMessage(buf, 2, nestedSpanBytes); } - /** - * Records the first part of a span message where we know its nested span-links will follow in one - * or more byte-arrays that add up to the given number of remaining bytes. - */ - public static byte[] recordSpanMessage( - GrowableBuffer buf, DDSpan span, MetaWriter metaWriter, int remainingBytes) { + /** Records a span message after its nested span-link messages have been recorded. */ + public static int recordSpanMessage( + GrowableBuffer buf, + DDSpan span, + MetaWriter metaWriter, + int nestedSpanLinkBytes, + OtlpProtoBuffer protobuf) { PropagationTags propagationTags = span.context().getPropagationTags(); writeTag(buf, 1, LEN_WIRE_TYPE); @@ -162,11 +163,12 @@ public static byte[] recordSpanMessage( writeVarInt(buf, 2); } - return recordMessage(buf, 2, remainingBytes); + return protobuf.recordMessage(buf, 2, nestedSpanLinkBytes); } - /** Completes recording of a span-link message and packs it into its own byte-array. */ - public static byte[] recordSpanLinkMessage(GrowableBuffer buf, AgentSpanLink spanLink) { + /** Records a span-link message. */ + public static int recordSpanLinkMessage( + GrowableBuffer buf, AgentSpanLink spanLink, OtlpProtoBuffer protobuf) { writeTag(buf, 1, LEN_WIRE_TYPE); writeTraceId(buf, spanLink.traceId()); @@ -189,7 +191,7 @@ public static byte[] recordSpanLinkMessage(GrowableBuffer buf, AgentSpanLink spa writeTag(buf, 6, I32_WIRE_TYPE); writeI32(buf, spanLink.traceFlags()); - return recordMessage(buf, 13); + return protobuf.recordMessage(buf, 13); } public static void writeTraceId(StreamingBuffer buf, DDTraceId traceId) { diff --git a/dd-trace-core/src/main/java/datadog/trace/core/otlp/trace/OtlpTraceProtoCollector.java b/dd-trace-core/src/main/java/datadog/trace/core/otlp/trace/OtlpTraceProtoCollector.java index aff7717577e..f37b7a5aea2 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/otlp/trace/OtlpTraceProtoCollector.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/otlp/trace/OtlpTraceProtoCollector.java @@ -1,6 +1,5 @@ package datadog.trace.core.otlp.trace; -import static datadog.trace.core.otlp.common.OtlpCommonProto.recordMessage; import static datadog.trace.core.otlp.common.OtlpResourceProto.RESOURCE_MESSAGE; import static datadog.trace.core.otlp.trace.OtlpTraceProto.recordScopedSpansMessage; import static datadog.trace.core.otlp.trace.OtlpTraceProto.recordSpanLinkMessage; @@ -13,9 +12,7 @@ import datadog.trace.core.DDSpan; import datadog.trace.core.otlp.common.OtlpCommonProto; import datadog.trace.core.otlp.common.OtlpPayload; -import java.util.ArrayDeque; -import java.util.ArrayList; -import java.util.Deque; +import datadog.trace.core.otlp.common.OtlpProtoBuffer; import java.util.List; /** @@ -37,15 +34,9 @@ public final class OtlpTraceProtoCollector extends OtlpTraceCollector { private static final OtelInstrumentationScope DEFAULT_TRACE_SCOPE = new OtelInstrumentationScope("", null, null); - private static final String PROTOBUF_CONTENT_TYPE = "application/x-protobuf"; - private final GrowableBuffer buf = new GrowableBuffer(512); private final OtlpTraceProto.MetaWriter metaWriter = new OtlpTraceProto.MetaWriter(buf); - - // temporary collections of chunks at different nesting levels - private final Deque payloadChunks = new ArrayDeque<>(); - private final List scopedChunks = new ArrayList<>(); - private final List spanChunks = new ArrayList<>(); + private final OtlpProtoBuffer protobuf = new OtlpProtoBuffer(8192); private boolean payloadStarted; @@ -62,14 +53,11 @@ public final class OtlpTraceProtoCollector extends OtlpTraceCollector { public void addTrace(List> spans) { if (!payloadStarted) { start(); - metaWriter.includeProcessTags(); payloadStarted = true; } - for (int i = 0, len = spans.size(); i < len; i++) { - if (i == 0 || i == len - 1) { - metaWriter.includeSamplingTags(); - } + // OtlpProtoBuffer collects spans in reverse + for (int i = spans.size() - 1; i >= 0; i--) { visitSpan(spans.get(i)); } } @@ -91,9 +79,6 @@ public OtlpPayload collectTraces() { /** Prepare temporary elements to collect trace data. */ private void start() { - // clear payloadChunks in case it wasn't fully consumed via OtlpPayload - payloadChunks.clear(); - // remove stale entries from caches OtlpCommonProto.recalibrateCaches(); @@ -106,10 +91,7 @@ private void stop() { payloadStarted = false; buf.reset(); - - // leave payloadChunks in place so it can be consumed via OtlpPayload - scopedChunks.clear(); - spanChunks.clear(); + protobuf.reset(); payloadBytes = 0; scopedBytes = 0; @@ -127,17 +109,22 @@ private void visitScopedSpans(OtelInstrumentationScope scope) { } private void visitSpan(CoreSpan span) { - if (currentSpan != null) { - completeSpan(); + if (shouldExport(span)) { + if (currentSpan != null) { + // ensure last span written at trace boundary includes sampling tags + // payload buffer is prepending, so last span written appears first! + if (!span.getTraceId().equals(currentSpan.getTraceId())) { + metaWriter.includeSamplingTags(); + } + completeSpan(); + } + currentSpan = (DDSpan) span; + currentSpan.getLinks().forEach(this::visitSpanLink); } - currentSpan = (DDSpan) span; - currentSpan.getLinks().forEach(this::visitSpanLink); } private void visitSpanLink(AgentSpanLink spanLink) { - byte[] spanLinkMessage = recordSpanLinkMessage(buf, spanLink); - spanChunks.add(spanLinkMessage); - spanBytes += spanLinkMessage.length; + spanBytes += recordSpanLinkMessage(buf, spanLink, protobuf); } // called once we've processed all scopes and span messages @@ -151,49 +138,39 @@ private OtlpPayload completePayload() { } // prepend the canned resource chunk - payloadChunks.addFirst(RESOURCE_MESSAGE); - payloadBytes += RESOURCE_MESSAGE.length; + payloadBytes += protobuf.recordMessage(RESOURCE_MESSAGE); // finally prepend the total length of all collected chunks - byte[] prefix = recordMessage(buf, 1, payloadBytes); - payloadChunks.addFirst(prefix); - payloadBytes += prefix.length; - - return new OtlpPayload(payloadChunks, payloadBytes, PROTOBUF_CONTENT_TYPE); + protobuf.recordMessage(buf, 1, payloadBytes); + return protobuf.toPayload(); } // called once we've processed all spans in a specific scope private void completeScope() { if (currentSpan != null) { + // ensure last span written at scope boundary includes process+sampling tags + // payload buffer is prepending, so last span written appears first! + metaWriter.includeProcessTags(); + metaWriter.includeSamplingTags(); completeSpan(); } - // add scoped spans message prefix to its nested chunks and promote to payload if (scopedBytes > 0) { - byte[] scopedPrefix = recordScopedSpansMessage(buf, currentScope, scopedBytes); - payloadChunks.add(scopedPrefix); - payloadChunks.addAll(scopedChunks); - payloadBytes += scopedPrefix.length + scopedBytes; + payloadBytes += recordScopedSpansMessage(buf, currentScope, scopedBytes, protobuf); } // reset temporary elements for next scope currentScope = null; - scopedChunks.clear(); scopedBytes = 0; } // called once we've processed all span-links in a specific span private void completeSpan() { - // add span message prefix to its nested chunks and promote to scoped - byte[] spanPrefix = recordSpanMessage(buf, currentSpan, metaWriter, spanBytes); - scopedChunks.add(spanPrefix); - scopedChunks.addAll(spanChunks); - scopedBytes += spanPrefix.length + spanBytes; + scopedBytes += recordSpanMessage(buf, currentSpan, metaWriter, spanBytes, protobuf); // reset temporary elements for next span currentSpan = null; - spanChunks.clear(); spanBytes = 0; } } diff --git a/dd-trace-core/src/test/java/datadog/trace/common/writer/OtlpPayloadDispatcherTest.java b/dd-trace-core/src/test/java/datadog/trace/common/writer/OtlpPayloadDispatcherTest.java index 512060cc91b..6e55ebeadaa 100644 --- a/dd-trace-core/src/test/java/datadog/trace/common/writer/OtlpPayloadDispatcherTest.java +++ b/dd-trace-core/src/test/java/datadog/trace/common/writer/OtlpPayloadDispatcherTest.java @@ -1,6 +1,8 @@ package datadog.trace.common.writer; import static datadog.trace.core.DDSpanContext.SPAN_SAMPLING_MECHANISM_TAG; +import static java.util.Collections.emptyList; +import static java.util.Collections.singletonList; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.mock; @@ -13,10 +15,9 @@ import datadog.trace.core.otlp.common.OtlpPayload; import datadog.trace.core.otlp.common.OtlpSender; import datadog.trace.core.otlp.trace.OtlpTraceCollector; -import java.util.ArrayDeque; +import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; -import java.util.Deque; import java.util.List; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -26,22 +27,23 @@ @ExtendWith(MockitoExtension.class) class OtlpPayloadDispatcherTest { - @Mock OtlpSender sender; - @Mock OtlpTraceCollector collector; + + TestCollector collector = new TestCollector(); @Test - @SuppressWarnings({"unchecked", "rawtypes"}) void sampledTraceForwardsAllSpans() { OtlpPayloadDispatcher dispatcher = new OtlpPayloadDispatcher(sender, collector); List> trace = Arrays.asList(sampledSpan(), sampledSpan()); dispatcher.addTrace(trace); + assertEquals(collector.spansToExport, trace); + dispatcher.flush(); - ArgumentCaptor captor = ArgumentCaptor.forClass(List.class); - verify(collector).addTrace(captor.capture()); - assertEquals(trace, captor.getValue()); - verifyNoInteractions(sender); + // expect two spans to be exported + ArgumentCaptor captor = ArgumentCaptor.forClass(OtlpPayload.class); + verify(sender).send(captor.capture()); + assertEquals(2 /*spans*/, captor.getValue().getContentLength()); } @Test @@ -49,8 +51,10 @@ void droppedTraceWithoutSingleSpanSamplingForwardsNothing() { OtlpPayloadDispatcher dispatcher = new OtlpPayloadDispatcher(sender, collector); dispatcher.addTrace(Arrays.asList(droppedSpan(), droppedSpan())); + assertEquals(collector.spansToExport, emptyList()); + dispatcher.flush(); - verifyNoInteractions(collector, sender); + verifyNoInteractions(sender); } @Test @@ -58,12 +62,13 @@ void unsetPriorityTraceWithoutSingleSpanSamplingForwardsNothing() { OtlpPayloadDispatcher dispatcher = new OtlpPayloadDispatcher(sender, collector); dispatcher.addTrace(Arrays.asList(unsetSpan(), unsetSpan())); + assertEquals(collector.spansToExport, emptyList()); + dispatcher.flush(); - verifyNoInteractions(collector, sender); + verifyNoInteractions(sender); } @Test - @SuppressWarnings({"unchecked", "rawtypes"}) void droppedTraceWithSingleSpanSampledForwardsOnlyThoseSpans() { OtlpPayloadDispatcher dispatcher = new OtlpPayloadDispatcher(sender, collector); CoreSpan keep = singleSpanSampledSpan(); @@ -71,39 +76,20 @@ void droppedTraceWithSingleSpanSampledForwardsOnlyThoseSpans() { CoreSpan drop2 = droppedSpan(); dispatcher.addTrace(Arrays.asList(drop1, keep, drop2)); - - ArgumentCaptor captor = ArgumentCaptor.forClass(List.class); - verify(collector).addTrace(captor.capture()); - assertEquals(Collections.singletonList(keep), captor.getValue()); - } - - @Test - void emptyTraceForwardsNothing() { - OtlpPayloadDispatcher dispatcher = new OtlpPayloadDispatcher(sender, collector); - - dispatcher.addTrace(Collections.emptyList()); - - verifyNoInteractions(collector, sender); - } - - @Test - void flushSendsNonEmptyPayload() { - Deque chunks = new ArrayDeque<>(); - chunks.add(new byte[] {1, 2, 3}); - OtlpPayload payload = new OtlpPayload(chunks, 3, "application/x-protobuf"); - when(collector.collectTraces()).thenReturn(payload); - OtlpPayloadDispatcher dispatcher = new OtlpPayloadDispatcher(sender, collector); - + assertEquals(collector.spansToExport, singletonList(keep)); dispatcher.flush(); - verify(sender).send(payload); + // expect only one span to be exported + ArgumentCaptor captor = ArgumentCaptor.forClass(OtlpPayload.class); + verify(sender).send(captor.capture()); + assertEquals(1 /*spans*/, captor.getValue().getContentLength()); } @Test - void flushSkipsEmptyPayload() { - when(collector.collectTraces()).thenReturn(OtlpPayload.EMPTY); + void emptyTraceForwardsNothing() { OtlpPayloadDispatcher dispatcher = new OtlpPayloadDispatcher(sender, collector); + dispatcher.addTrace(emptyList()); dispatcher.flush(); verifyNoInteractions(sender); @@ -121,8 +107,9 @@ void onDroppedTraceDoesNothing() { OtlpPayloadDispatcher dispatcher = new OtlpPayloadDispatcher(sender, collector); dispatcher.onDroppedTrace(5); + dispatcher.flush(); - verifyNoInteractions(collector, sender); + verifyNoInteractions(sender); } private static CoreSpan sampledSpan() { @@ -151,4 +138,32 @@ private static CoreSpan unsetSpan() { when(span.getTag(SPAN_SAMPLING_MECHANISM_TAG)).thenReturn(null); return span; } + + /** Test collector that creates payloads whose size equals the number of exported spans. */ + private static class TestCollector extends OtlpTraceCollector { + final List> spansToExport = new ArrayList<>(); + + @Override + public void addTrace(List> spans) { + for (CoreSpan span : spans) { + if (shouldExport(span)) { + spansToExport.add(span); + } + } + } + + @Override + public OtlpPayload collectTraces() { + if (spansToExport.isEmpty()) { + return OtlpPayload.EMPTY; + } + try { + // number of bytes returned represents the number of exported spans + int contentLength = spansToExport.size(); + return new OtlpPayload(ByteBuffer.allocate(contentLength), "application/octet-stream"); + } finally { + spansToExport.clear(); + } + } + } } diff --git a/dd-trace-core/src/test/java/datadog/trace/core/otlp/common/OtlpHttpRequestBodyTest.java b/dd-trace-core/src/test/java/datadog/trace/core/otlp/common/OtlpHttpRequestBodyTest.java index d246f35e01f..bd354049e9c 100644 --- a/dd-trace-core/src/test/java/datadog/trace/core/otlp/common/OtlpHttpRequestBodyTest.java +++ b/dd-trace-core/src/test/java/datadog/trace/core/otlp/common/OtlpHttpRequestBodyTest.java @@ -7,8 +7,7 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; -import java.util.ArrayDeque; -import java.util.Deque; +import java.nio.ByteBuffer; import java.util.zip.GZIPInputStream; import okhttp3.MediaType; import okio.Buffer; @@ -19,7 +18,9 @@ class OtlpHttpRequestBodyTest { @Test void contentTypeIsParsedFromPayload() { OtlpHttpRequestBody body = - new OtlpHttpRequestBody(payload("application/x-protobuf", new byte[] {1, 2, 3}), false); + new OtlpHttpRequestBody( + new OtlpPayload(ByteBuffer.wrap(new byte[] {1, 2, 3}), "application/x-protobuf"), + false); assertEquals(MediaType.get("application/x-protobuf"), body.contentType()); } @@ -27,7 +28,9 @@ void contentTypeIsParsedFromPayload() { @Test void contentLengthMatchesPayloadWhenUncompressed() { OtlpHttpRequestBody body = - new OtlpHttpRequestBody(payload("application/x-protobuf", new byte[] {1, 2, 3, 4}), false); + new OtlpHttpRequestBody( + new OtlpPayload(ByteBuffer.wrap(new byte[] {1, 2, 3, 4}), "application/x-protobuf"), + false); assertEquals(4, body.contentLength()); } @@ -36,7 +39,9 @@ void contentLengthMatchesPayloadWhenUncompressed() { void contentLengthIsNegativeOneWhenGzipped() { // gzip writes chunked, so the framework can't know the length up front OtlpHttpRequestBody body = - new OtlpHttpRequestBody(payload("application/x-protobuf", new byte[] {1, 2, 3, 4}), true); + new OtlpHttpRequestBody( + new OtlpPayload(ByteBuffer.wrap(new byte[] {1, 2, 3, 4}), "application/x-protobuf"), + true); assertEquals(-1, body.contentLength()); } @@ -44,34 +49,22 @@ void contentLengthIsNegativeOneWhenGzipped() { @Test void writeToDrainsRawBytesWhenUncompressed() throws IOException { byte[] data = {10, 20, 30, 40, 50}; - OtlpHttpRequestBody body = - new OtlpHttpRequestBody(payload("application/x-protobuf", data), false); - Buffer sink = new Buffer(); - - body.writeTo(sink); - - assertArrayEquals(data, sink.readByteArray()); - } - - @Test - void writeToConcatenatesMultipleChunksWhenUncompressed() throws IOException { OtlpHttpRequestBody body = new OtlpHttpRequestBody( - payload( - "application/x-protobuf", new byte[] {1, 2}, new byte[] {3, 4, 5}, new byte[] {6}), - false); + new OtlpPayload(ByteBuffer.wrap(data), "application/x-protobuf"), false); Buffer sink = new Buffer(); body.writeTo(sink); - assertArrayEquals(new byte[] {1, 2, 3, 4, 5, 6}, sink.readByteArray()); + assertArrayEquals(data, sink.readByteArray()); } @Test void writeToProducesGzipStreamThatDecompressesToPayloadWhenGzipped() throws IOException { byte[] data = "the quick brown fox jumps over the lazy dog".getBytes(); OtlpHttpRequestBody body = - new OtlpHttpRequestBody(payload("application/x-protobuf", data), true); + new OtlpHttpRequestBody( + new OtlpPayload(ByteBuffer.wrap(data), "application/x-protobuf"), true); Buffer sink = new Buffer(); body.writeTo(sink); @@ -79,16 +72,6 @@ void writeToProducesGzipStreamThatDecompressesToPayloadWhenGzipped() throws IOEx assertArrayEquals(data, gunzip(sink.readByteArray())); } - private static OtlpPayload payload(String contentType, byte[]... chunks) { - Deque deque = new ArrayDeque<>(); - int total = 0; - for (byte[] chunk : chunks) { - deque.add(chunk); - total += chunk.length; - } - return new OtlpPayload(deque, total, contentType); - } - private static byte[] gunzip(byte[] gzipped) throws IOException { ByteArrayOutputStream out = new ByteArrayOutputStream(); try (InputStream gz = new GZIPInputStream(new ByteArrayInputStream(gzipped))) { diff --git a/dd-trace-core/src/test/java/datadog/trace/core/otlp/common/OtlpProtoBufferTest.java b/dd-trace-core/src/test/java/datadog/trace/core/otlp/common/OtlpProtoBufferTest.java new file mode 100644 index 00000000000..5cf7a820016 --- /dev/null +++ b/dd-trace-core/src/test/java/datadog/trace/core/otlp/common/OtlpProtoBufferTest.java @@ -0,0 +1,440 @@ +package datadog.trace.core.otlp.common; + +import static org.junit.jupiter.api.Assertions.*; + +import com.google.protobuf.CodedInputStream; +import com.google.protobuf.WireFormat; +import datadog.communication.serialization.GrowableBuffer; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.ReadOnlyBufferException; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +/** + * Tests for {@link OtlpProtoBuffer}. + * + *

The buffer prepends encoded protobuf messages. These messages appear in the final payload in + * reverse insertion order. Each test verifies the wire encoding via {@link CodedInputStream} where + * appropriate. + */ +class OtlpProtoBufferTest { + + private OtlpProtoBuffer buffer; + + @BeforeEach + void setUp() { + buffer = new OtlpProtoBuffer(16); + } + + // ─── helpers ───────────────────────────────────────────────────────────── + + /** Reads all available bytes from the buffer without consuming the underlying state. */ + private static byte[] readAll(OtlpProtoBuffer buf) { + ByteBuffer bb = buf.flip(); + byte[] bytes = new byte[bb.remaining()]; + bb.get(bytes); + return bytes; + } + + /** Creates a pre-filled GrowableBuffer containing the given bytes. */ + private static GrowableBuffer growable(byte... body) { + GrowableBuffer buf = new GrowableBuffer(Math.max(1, body.length)); + if (body.length > 0) { + buf.put(body); + } + return buf; + } + + // ─── initial state ─────────────────────────────────────────────────────── + + @Test + void initialBufferIsEmpty() { + assertEquals(0, buffer.flip().remaining()); + } + + @Test + void initialPayloadIsEmpty() { + OtlpPayload payload = buffer.toPayload(); + assertEquals(0, payload.getContentLength()); + assertEquals("application/x-protobuf", payload.getContentType()); + } + + // ─── recordMessage(GrowableBuffer, int) ────────────────────────────────── + + @Test + void recordsSingleByteMessage() throws IOException { + // tag for field 1 = (1<<3)|2 = 10 → 1 varint byte; length=1 → 1 byte; body=1 → total 3 + int returned = buffer.recordMessage(growable((byte) 0x42), 1); + assertEquals(3, returned); + + CodedInputStream in = CodedInputStream.newInstance(readAll(buffer)); + int tag = in.readTag(); + assertEquals(1, WireFormat.getTagFieldNumber(tag)); + assertEquals(WireFormat.WIRETYPE_LENGTH_DELIMITED, WireFormat.getTagWireType(tag)); + assertArrayEquals(new byte[] {0x42}, in.readByteArray()); + assertTrue(in.isAtEnd()); + } + + @Test + void recordsMultiByteBodyMessage() throws IOException { + byte[] body = {0x01, 0x02, 0x03}; + // tag(field=2)=(2<<3)|2=18 → 1 byte; length=3 → 1 byte; body=3 → total 5 + int returned = buffer.recordMessage(growable(body), 2); + assertEquals(5, returned); + + CodedInputStream in = CodedInputStream.newInstance(readAll(buffer)); + int tag = in.readTag(); + assertEquals(2, WireFormat.getTagFieldNumber(tag)); + assertArrayEquals(body, in.readByteArray()); + assertTrue(in.isAtEnd()); + } + + @Test + void recordsEmptyBodyMessage() throws IOException { + // tag(1 byte) + length-varint(0, 1 byte) + 0 body bytes = 2 + int returned = buffer.recordMessage(new GrowableBuffer(1), 1); + assertEquals(2, returned); + + CodedInputStream in = CodedInputStream.newInstance(readAll(buffer)); + assertEquals(1, WireFormat.getTagFieldNumber(in.readTag())); + assertEquals(0, in.readByteArray().length); + assertTrue(in.isAtEnd()); + } + + @Test + void recordsMessageWithLargeFieldNumber() throws IOException { + // field 16: tag=(16<<3)|2=130, needs 2 varint bytes; length=1 → 1 byte; body=1 → total 4 + int returned = buffer.recordMessage(growable((byte) 0xAB), 16); + assertEquals(4, returned); + + CodedInputStream in = CodedInputStream.newInstance(readAll(buffer)); + int tag = in.readTag(); + assertEquals(16, WireFormat.getTagFieldNumber(tag)); + assertEquals(WireFormat.WIRETYPE_LENGTH_DELIMITED, WireFormat.getTagWireType(tag)); + assertArrayEquals(new byte[] {(byte) 0xAB}, in.readByteArray()); + assertTrue(in.isAtEnd()); + } + + @Test + void growableBufferIsReusableAfterRecordMessage() throws IOException { + // Verifies that the finally-block reset allows the same GrowableBuffer to be reused + GrowableBuffer buf = new GrowableBuffer(16); + + buf.put((byte) 0x01); + buffer.recordMessage(buf, 1); + + // After reset the buffer should accept new writes immediately + buf.put((byte) 0x02); + buffer.recordMessage(buf, 2); + + CodedInputStream in = CodedInputStream.newInstance(readAll(buffer)); + assertEquals(2, WireFormat.getTagFieldNumber(in.readTag()), "field 2 (last) first"); + assertArrayEquals(new byte[] {0x02}, in.readByteArray()); + assertEquals(1, WireFormat.getTagFieldNumber(in.readTag()), "field 1 (first) second"); + assertArrayEquals(new byte[] {0x01}, in.readByteArray()); + assertTrue(in.isAtEnd()); + } + + @Test + void growableBufferIsReusableAfterRecordMessageWithGrowth() throws IOException { + // OtlpProtoBuffer growth must not prevent the finally-block reset on the GrowableBuffer + OtlpProtoBuffer tinyBuf = new OtlpProtoBuffer(1); + GrowableBuffer buf = new GrowableBuffer(16); + + buf.put(new byte[50]); // forces OtlpProtoBuffer growth + tinyBuf.recordMessage(buf, 1); + + buf.put((byte) 0x42); // reuse after reset + tinyBuf.recordMessage(buf, 2); + + CodedInputStream in = CodedInputStream.newInstance(readAll(tinyBuf)); + assertEquals(2, WireFormat.getTagFieldNumber(in.readTag())); + assertArrayEquals(new byte[] {0x42}, in.readByteArray()); + assertEquals(1, WireFormat.getTagFieldNumber(in.readTag())); + assertEquals(50, in.readByteArray().length); + assertTrue(in.isAtEnd()); + } + + // ─── recordMessage(GrowableBuffer, int, int bytesSoFar) ────────────────── + + @Test + void bytesSoFarIncreasesEncodedLength() { + // field 1, body 1 byte, bytesSoFar 3 → encoded length = 4 + // numBytes = sizeVarInt(10) + sizeVarInt(4) + 1 = 1+1+1 = 3; return 3+3 = 6 + int returned = buffer.recordMessage(growable((byte) 0x01), 1, 3); + assertEquals(6, returned); + } + + @Test + void bytesSoFarAtVarintBoundaryProducesMultiByteLengthField() { + // body=1 byte, bytesSoFar=127 → encoded length = 128, which needs 2 varint bytes + // numBytes = sizeVarInt(10) + sizeVarInt(128) + 1 = 1+2+1 = 4; return 4+127 = 131 + int returned = buffer.recordMessage(growable((byte) 0x01), 1, 127); + assertEquals(131, returned); + } + + @Test + void zeroBytesSoFarMatchesSimpleOverload() { + OtlpProtoBuffer buf1 = new OtlpProtoBuffer(16); + OtlpProtoBuffer buf2 = new OtlpProtoBuffer(16); + int ret1 = buf1.recordMessage(growable((byte) 0x05), 3); + int ret2 = buf2.recordMessage(growable((byte) 0x05), 3, 0); + assertEquals(ret1, ret2); + assertArrayEquals(readAll(buf1), readAll(buf2)); + } + + // ─── recordMessage(byte[]) ──────────────────────────────────────────────── + + @Test + void recordsByteArrayDirectly() { + // hand-crafted protobuf: field 1 (tag=0x0A), length 3, body [1, 2, 3] + byte[] encoded = {0x0A, 0x03, 0x01, 0x02, 0x03}; + int returned = buffer.recordMessage(encoded); + assertEquals(5, returned); + assertArrayEquals(encoded, readAll(buffer)); + } + + @Test + void recordsEmptyByteArray() { + int returned = buffer.recordMessage(new byte[0]); + assertEquals(0, returned); + assertEquals(0, buffer.flip().remaining()); + } + + @Test + void byteArrayMessageAppearsInOutput() throws IOException { + // Encode field 4, body [0xBE, 0xEF] by hand: tag=(4<<3)|2=34, length=2 + byte[] encoded = {0x22, 0x02, (byte) 0xBE, (byte) 0xEF}; + buffer.recordMessage(encoded); + + CodedInputStream in = CodedInputStream.newInstance(readAll(buffer)); + assertEquals(4, WireFormat.getTagFieldNumber(in.readTag())); + assertArrayEquals(new byte[] {(byte) 0xBE, (byte) 0xEF}, in.readByteArray()); + assertTrue(in.isAtEnd()); + } + + // ─── message ordering ──────────────────────────────────────────────────── + + @Test + void messagesAppearInReverseInsertionOrder() throws IOException { + buffer.recordMessage(growable((byte) 0x01), 1); // first inserted + buffer.recordMessage(growable((byte) 0x02), 2); // second → appears first in output + + CodedInputStream in = CodedInputStream.newInstance(readAll(buffer)); + assertEquals(2, WireFormat.getTagFieldNumber(in.readTag()), "field 2 (last) should be first"); + in.readByteArray(); + assertEquals(1, WireFormat.getTagFieldNumber(in.readTag()), "field 1 (first) should be second"); + in.readByteArray(); + assertTrue(in.isAtEnd()); + } + + @Test + void threeMessagesAreOrderedInReverseInsertion() throws IOException { + for (int i = 1; i <= 3; i++) { + buffer.recordMessage(growable((byte) i), i); + } + + CodedInputStream in = CodedInputStream.newInstance(readAll(buffer)); + for (int expected = 3; expected >= 1; expected--) { + assertEquals(expected, WireFormat.getTagFieldNumber(in.readTag())); + in.readByteArray(); + } + assertTrue(in.isAtEnd()); + } + + @Test + void byteArrayAndGrowableMessagesInterleaveInReverseOrder() throws IOException { + // field 1 hand-encoded: tag=0x0A, length=1, body=0x11 + buffer.recordMessage(new byte[] {0x0A, 0x01, 0x11}); // first + buffer.recordMessage(growable((byte) 0x22), 2); // second → first in output + + CodedInputStream in = CodedInputStream.newInstance(readAll(buffer)); + assertEquals(2, WireFormat.getTagFieldNumber(in.readTag()), "GrowableBuffer message first"); + in.readByteArray(); + assertEquals(1, WireFormat.getTagFieldNumber(in.readTag()), "byte[] message second"); + in.readByteArray(); + assertTrue(in.isAtEnd()); + } + + // ─── flip() ────────────────────────────────────────────────────────────── + + @Test + void flipReturnsCorrectByteCount() { + // tag(1) + length(1) + body(2) = 4 bytes + buffer.recordMessage(growable((byte) 1, (byte) 2), 1); + assertEquals(4, buffer.flip().remaining()); + } + + @Test + void flipIsIdempotent() { + buffer.recordMessage(growable((byte) 0x55), 1); + assertArrayEquals(readAll(buffer), readAll(buffer)); + } + + // ─── toPayload() ───────────────────────────────────────────────────────── + + @Test + void toPayloadHasProtobufContentType() { + assertEquals("application/x-protobuf", buffer.toPayload().getContentType()); + } + + @Test + void toPayloadContentLengthMatchesFlipRemaining() { + buffer.recordMessage(growable((byte) 1, (byte) 2, (byte) 3), 1); + OtlpPayload payload = buffer.toPayload(); + assertEquals(buffer.flip().remaining(), payload.getContentLength()); + } + + @Test + void toPayloadContentMatchesFlip() { + buffer.recordMessage(growable((byte) 0xDE, (byte) 0xAD), 5); + + // Capture expected bytes before toPayload(): both share the same ByteBuffer, so readAll + // must not consume the position that toPayload() will later restore via flip(). + byte[] expected = readAll(buffer); + OtlpPayload payload = buffer.toPayload(); + + ByteBuffer content = payload.getContent(); + byte[] actual = new byte[content.remaining()]; + content.get(actual); + assertArrayEquals(expected, actual); + } + + @Test + void toPayloadContentIsReadOnly() { + buffer.recordMessage(growable((byte) 1), 1); + assertThrows( + ReadOnlyBufferException.class, () -> buffer.toPayload().getContent().put((byte) 0)); + } + + // ─── reset() ───────────────────────────────────────────────────────────── + + @Test + void resetClearsBuffer() { + buffer.recordMessage(growable((byte) 1), 1); + assertNotEquals(0, buffer.flip().remaining()); + + buffer.reset(); + assertEquals(0, buffer.flip().remaining()); + } + + @Test + void resetAllowsNewMessagesToBeRecorded() throws IOException { + buffer.recordMessage(growable((byte) 0x01), 1); + buffer.reset(); + buffer.recordMessage(growable((byte) 0x02), 2); + + CodedInputStream in = CodedInputStream.newInstance(readAll(buffer)); + int tag = in.readTag(); + assertEquals(2, WireFormat.getTagFieldNumber(tag), "only field 2 should be present"); + assertArrayEquals(new byte[] {0x02}, in.readByteArray()); + assertTrue(in.isAtEnd()); + } + + @Test + void resetOnGrownBufferShrinksToInitialCapacityThenFunctionsCorrectly() { + // initialCapacity=1; recording 100 bytes forces growth + OtlpProtoBuffer buf = new OtlpProtoBuffer(1); + buf.recordMessage(new byte[100]); + assertEquals(100, buf.flip().remaining()); + + buf.reset(); + assertEquals(0, buf.flip().remaining()); + + // Should continue to function correctly after shrink + int size = buf.recordMessage(growable((byte) 0x42), 1); + assertEquals(3, size); + assertEquals(3, buf.flip().remaining()); + } + + @Test + void resetPreservesPayloadContentLength() { + buffer.recordMessage(growable((byte) 0xAB), 1); + OtlpPayload payload = buffer.toPayload(); + int expectedLength = payload.getContentLength(); + assertTrue(expectedLength > 0); + + buffer.reset(); + + // contentLength is eagerly captured in the payload constructor and unaffected by reset + assertEquals(expectedLength, payload.getContentLength()); + } + + @Test + void payloadContentRemainsReadableAfterReset() { + buffer.recordMessage(growable((byte) 0xAB, (byte) 0xCD), 7); + byte[] expected = readAll(buffer); + OtlpPayload payload = buffer.toPayload(); + + buffer.reset(); + + ByteBuffer content = payload.getContent(); + byte[] actual = new byte[content.remaining()]; + content.get(actual); + assertArrayEquals(expected, actual, "reset() must not corrupt existing payload content"); + } + + @Test + void payloadContentRemainsReadableAfterResetOnGrownBuffer() { + // initialCapacity=4; recording 50 bytes forces buffer growth, so reset() allocates a new + // ByteBuffer — the payload must still reference the old (orphaned) one correctly. + OtlpProtoBuffer buf = new OtlpProtoBuffer(4); + buf.recordMessage(new byte[50]); + byte[] expected = readAll(buf); + OtlpPayload payload = buf.toPayload(); + + buf.reset(); + + ByteBuffer content = payload.getContent(); + byte[] actual = new byte[content.remaining()]; + content.get(actual); + assertArrayEquals( + expected, actual, "reset() on a grown buffer must not corrupt existing payload content"); + } + + // ─── buffer growth ──────────────────────────────────────────────────────── + + @Test + void bufferAccommodatesMessageLargerThanInitialCapacity() { + OtlpProtoBuffer buf = new OtlpProtoBuffer(4); + byte[] big = new byte[50]; + int returned = buf.recordMessage(big); + assertEquals(50, returned); + assertArrayEquals(big, readAll(buf)); + } + + @Test + void growthPreservesAlreadyRecordedMessages() throws IOException { + // initialCapacity=4; first message fills it exactly, second triggers growth + OtlpProtoBuffer buf = new OtlpProtoBuffer(4); + buf.recordMessage(growable((byte) 0x01, (byte) 0x02), 1); // tag+len+2body = 4 bytes + buf.recordMessage(growable((byte) 0x03, (byte) 0x04), 2); // 4 more bytes → forces growth + + byte[] bytes = readAll(buf); + assertEquals(8, bytes.length); + + CodedInputStream in = CodedInputStream.newInstance(bytes); + assertEquals(2, WireFormat.getTagFieldNumber(in.readTag())); + assertArrayEquals(new byte[] {0x03, 0x04}, in.readByteArray()); + assertEquals(1, WireFormat.getTagFieldNumber(in.readTag())); + assertArrayEquals(new byte[] {0x01, 0x02}, in.readByteArray()); + assertTrue(in.isAtEnd()); + } + + @Test + void repeatedGrowthsPreserveAllMessages() throws IOException { + // initialCapacity=4; each 4-byte message exhausts remaining, forcing growth each round + OtlpProtoBuffer buf = new OtlpProtoBuffer(4); + for (int i = 1; i <= 4; i++) { + buf.recordMessage(growable((byte) i, (byte) (i + 10)), i); + } + + CodedInputStream in = CodedInputStream.newInstance(readAll(buf)); + for (int expected = 4; expected >= 1; expected--) { + assertEquals(expected, WireFormat.getTagFieldNumber(in.readTag())); + assertArrayEquals(new byte[] {(byte) expected, (byte) (expected + 10)}, in.readByteArray()); + } + assertTrue(in.isAtEnd()); + } +} diff --git a/dd-trace-core/src/test/java/datadog/trace/core/otlp/logs/OtlpLogsProtoTest.java b/dd-trace-core/src/test/java/datadog/trace/core/otlp/logs/OtlpLogsProtoTest.java index 178f069aa3e..88a20427b44 100644 --- a/dd-trace-core/src/test/java/datadog/trace/core/otlp/logs/OtlpLogsProtoTest.java +++ b/dd-trace-core/src/test/java/datadog/trace/core/otlp/logs/OtlpLogsProtoTest.java @@ -24,6 +24,7 @@ import datadog.trace.bootstrap.instrumentation.api.AgentSpan; import datadog.trace.bootstrap.instrumentation.api.AgentSpanContext; import datadog.trace.bootstrap.otel.common.OtelInstrumentationScope; +import datadog.trace.bootstrap.otlp.common.OtlpAttributeVisitor; import datadog.trace.bootstrap.otlp.logs.OtlpLogRecord; import datadog.trace.bootstrap.otlp.logs.OtlpScopedLogsVisitor; import datadog.trace.common.writer.LoggingWriter; @@ -32,11 +33,11 @@ import datadog.trace.core.otlp.common.OtlpPayload; import datadog.trace.core.propagation.ExtractedContext; import datadog.trace.core.propagation.PropagationTags; -import java.io.ByteArrayOutputStream; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -86,7 +87,7 @@ class OtlpLogsProtoTest { static final class LogSpec { /** Instrumentation scope for this log record → drives ScopeLogs grouping. */ - final OtelInstrumentationScope scope; + OtelInstrumentationScope scope; /** time_unix_nano (proto field 1). */ final long timestampNanos; @@ -95,25 +96,25 @@ static final class LogSpec { final long observedNanos; /** severity_number (proto field 2). */ - final int severityNumber; + int severityNumber; /** severity_text (proto field 3); {@code null} → field absent. */ - @Nullable final String severityText; + @Nullable String severityText; /** body.string_value (proto field 5 → AnyValue field 1); {@code null} → field absent. */ @Nullable final String body; /** Extra attributes written via {@code visitAttribute} before the log record. */ - final Map attrs; + Map attrs; /** * If ≥ 0, index into the pre-built span list for the span context, encoding trace_id, span_id, * and flags. If -1, no span context → those fields absent. */ - final int spanContextIndex; + int spanContextIndex; /** event_name (proto field 12); {@code null} → field absent. */ - @Nullable final String eventName; + @Nullable String eventName; /** * If true, the span at {@code spanContextIndex} is started under a known 128-bit trace ID so @@ -121,25 +122,41 @@ static final class LogSpec { */ boolean use128BitTraceId; - LogSpec( - OtelInstrumentationScope scope, - long timestampNanos, - long observedNanos, - int severityNumber, - @Nullable String severityText, - @Nullable String body, - Map attrs, - int spanContextIndex, - @Nullable String eventName) { - this.scope = scope; - this.timestampNanos = timestampNanos; - this.observedNanos = observedNanos; - this.severityNumber = severityNumber; - this.severityText = severityText; + LogSpec(@Nullable String body) { this.body = body; + this.scope = DEFAULT_SCOPE; + this.timestampNanos = BASE_NANOS; + this.observedNanos = BASE_NANOS + OBSERVED_OFFSET_NANOS; + this.severityNumber = 9; + this.severityText = "INFO"; + this.attrs = emptyMap(); + this.spanContextIndex = -1; + } + + LogSpec scope(OtelInstrumentationScope scope) { + this.scope = scope; + return this; + } + + LogSpec severity(int number, @Nullable String text) { + this.severityNumber = number; + this.severityText = text; + return this; + } + + LogSpec attrs(Map attrs) { this.attrs = attrs; - this.spanContextIndex = spanContextIndex; - this.eventName = eventName; + return this; + } + + LogSpec spanContextIndex(int index) { + this.spanContextIndex = index; + return this; + } + + LogSpec eventName(@Nullable String name) { + this.eventName = name; + return this; } LogSpec use128BitTraceId() { @@ -148,6 +165,20 @@ LogSpec use128BitTraceId() { } } + private static final class ParsedScope { + final String name; + final String version; + final String schemaUrl; + final List logRecordBlobs; + + ParsedScope(String name, String version, String schemaUrl, List logRecordBlobs) { + this.name = name; + this.version = version; + this.schemaUrl = schemaUrl; + this.logRecordBlobs = logRecordBlobs; + } + } + // ── shorthand builders ───────────────────────────────────────────────────── private static final long BASE_NANOS = 1_700_000_000_000_000_000L; @@ -158,95 +189,32 @@ LogSpec use128BitTraceId() { DD128bTraceId.from(0x0123456789abcdefL, 0xfedcba9876543210L); private static LogSpec infoLog(String body) { - return new LogSpec( - DEFAULT_SCOPE, - BASE_NANOS, - BASE_NANOS + OBSERVED_OFFSET_NANOS, - 9, - "INFO", - body, - new HashMap<>(), - -1, - null); + return new LogSpec(body); } private static LogSpec scopedLog(OtelInstrumentationScope scope, String body) { - return new LogSpec( - scope, - BASE_NANOS, - BASE_NANOS + OBSERVED_OFFSET_NANOS, - 9, - "INFO", - body, - new HashMap<>(), - -1, - null); + return new LogSpec(body).scope(scope); } private static LogSpec severityLog( int severityNumber, @Nullable String severityText, String body) { - return new LogSpec( - DEFAULT_SCOPE, - BASE_NANOS, - BASE_NANOS + OBSERVED_OFFSET_NANOS, - severityNumber, - severityText, - body, - new HashMap<>(), - -1, - null); + return new LogSpec(body).severity(severityNumber, severityText); } private static LogSpec contextLog(String body, int spanContextIndex) { - return new LogSpec( - DEFAULT_SCOPE, - BASE_NANOS, - BASE_NANOS + OBSERVED_OFFSET_NANOS, - 9, - "INFO", - body, - new HashMap<>(), - spanContextIndex, - null); + return new LogSpec(body).spanContextIndex(spanContextIndex); } private static LogSpec eventLog(String body, String eventName) { - return new LogSpec( - DEFAULT_SCOPE, - BASE_NANOS, - BASE_NANOS + OBSERVED_OFFSET_NANOS, - 9, - "INFO", - body, - new HashMap<>(), - -1, - eventName); + return new LogSpec(body).eventName(eventName); } private static LogSpec eventOnlyLog(String eventName) { - return new LogSpec( - DEFAULT_SCOPE, - BASE_NANOS, - BASE_NANOS + OBSERVED_OFFSET_NANOS, - 9, - "INFO", - null, - new HashMap<>(), - -1, - eventName); + return new LogSpec(null).eventName(eventName); } private static LogSpec taggedLog(String body, Map attrs) { - return new LogSpec( - DEFAULT_SCOPE, - BASE_NANOS, - BASE_NANOS + OBSERVED_OFFSET_NANOS, - 9, - "INFO", - body, - attrs, - -1, - null); + return new LogSpec(body).attrs(attrs); } private static Map attrs(Object... keyValues) { @@ -367,28 +335,25 @@ void testCollectLogs(String caseName, List specs) throws IOException { OtlpPayload payload = OtlpLogsProtoCollector.INSTANCE.collectLogs( visitor -> { - OtelInstrumentationScope lastScope = null; - OtlpScopedLogsVisitor scoped = null; - for (LogSpec spec : specs) { - if (!spec.scope.equals(lastScope)) { - scoped = visitor.visitScopedLogs(spec.scope); - lastScope = spec.scope; - } - for (Map.Entry e : spec.attrs.entrySet()) { - scoped.visitAttribute(attrType(e.getValue()), e.getKey(), e.getValue()); + for (List scopeGroup : groupByScope(specs).values()) { + OtlpScopedLogsVisitor scoped = visitor.visitScopedLogs(scopeGroup.get(0).scope); + for (LogSpec spec : scopeGroup) { + for (Map.Entry attr : spec.attrs.entrySet()) { + scoped.visitAttribute( + attrType(attr.getValue()), attr.getKey(), attr.getValue()); + } + scoped.visitLogRecord( + new OtlpLogRecord( + spec.scope, + spec.timestampNanos, + spec.observedNanos, + spec.severityNumber, + spec.severityText, + spec.body, + emptyMap(), + resolveContext(spans, spec), + spec.eventName)); } - AgentSpanContext ctx = resolveContext(spans, spec); - scoped.visitLogRecord( - new OtlpLogRecord( - spec.scope, - spec.timestampNanos, - spec.observedNanos, - spec.severityNumber, - spec.severityText, - spec.body, - emptyMap(), - ctx, - spec.eventName)); } }); @@ -397,59 +362,62 @@ void testCollectLogs(String caseName, List specs) throws IOException { return; } - ByteArrayOutputStream baos = new ByteArrayOutputStream(payload.getContentLength()); - payload.drain(baos::write); - byte[] bytes = baos.toByteArray(); - assertTrue(bytes.length > 0, "non-empty specs must produce bytes"); + assertTrue(payload.getContentLength() > 0, "non-empty specs must produce bytes"); // ── parse LogsData ──────────────────────────────────────────────────── - CodedInputStream ld = CodedInputStream.newInstance(bytes); - int ldTag = ld.readTag(); - assertEquals(1, WireFormat.getTagFieldNumber(ldTag), "LogsData.resource_logs is field 1"); - assertEquals(WireFormat.WIRETYPE_LENGTH_DELIMITED, WireFormat.getTagWireType(ldTag)); - CodedInputStream rl = ld.readBytes().newCodedInput(); - assertTrue(ld.isAtEnd(), "expected exactly one ResourceLogs"); + CodedInputStream logsData = CodedInputStream.newInstance(payload.getContent()); + int logsTag = logsData.readTag(); + assertEquals(1, WireFormat.getTagFieldNumber(logsTag), "LogsData.resource_logs is field 1"); + assertEquals(WireFormat.WIRETYPE_LENGTH_DELIMITED, WireFormat.getTagWireType(logsTag)); + CodedInputStream resourceLogs = logsData.readBytes().newCodedInput(); + assertTrue(logsData.isAtEnd(), "expected exactly one ResourceLogs"); // ── parse ResourceLogs ──────────────────────────────────────────────── boolean resourceFound = false; List scopeBlobs = new ArrayList<>(); - while (!rl.isAtEnd()) { - int rlTag = rl.readTag(); - switch (WireFormat.getTagFieldNumber(rlTag)) { + while (!resourceLogs.isAtEnd()) { + int tag = resourceLogs.readTag(); + switch (WireFormat.getTagFieldNumber(tag)) { case 1: - verifyResource(rl.readBytes().newCodedInput()); + verifyResource(resourceLogs.readBytes().newCodedInput()); resourceFound = true; break; case 2: - scopeBlobs.add(rl.readBytes().toByteArray()); + scopeBlobs.add(resourceLogs.readBytes().toByteArray()); break; default: - rl.skipField(rlTag); + resourceLogs.skipField(tag); } } assertTrue(resourceFound, "Resource must be present in ResourceLogs [" + caseName + "]"); - // ── verify ScopeLogs groups ─────────────────────────────────────────── - List> scopeGroups = groupByScope(specs); + // ── verify ScopeLogs groups (order-insensitive) ─────────────────────── + Map> expectedByScopeName = groupByScope(specs); assertEquals( - scopeGroups.size(), scopeBlobs.size(), "ScopeLogs count mismatch [" + caseName + "]"); - - for (int s = 0; s < scopeGroups.size(); s++) { - List group = scopeGroups.get(s); - List logRecordBlobs = - parseScopeLogs( - CodedInputStream.newInstance(scopeBlobs.get(s)), group.get(0).scope, caseName); + expectedByScopeName.size(), + scopeBlobs.size(), + "ScopeLogs count mismatch [" + caseName + "]"); + + for (byte[] scopeBlob : scopeBlobs) { + ParsedScope parsedScope = parseScopeLogs(CodedInputStream.newInstance(scopeBlob), caseName); + List group = expectedByScopeName.get(parsedScope.name); + assertNotNull(group, "unexpected scope '" + parsedScope.name + "' [" + caseName + "]"); + verifyScope(parsedScope, group.get(0).scope, caseName); + + Map expectedByKey = new HashMap<>(); + for (LogSpec spec : group) { + expectedByKey.put(logKey(spec), spec); + } assertEquals( group.size(), - logRecordBlobs.size(), - "LogRecord count mismatch in scope " + s + " [" + caseName + "]"); - - for (int r = 0; r < group.size(); r++) { + parsedScope.logRecordBlobs.size(), + "LogRecord count in scope " + parsedScope.name + " [" + caseName + "]"); + for (byte[] recordBlob : parsedScope.logRecordBlobs) { + String key = parseLogKey(recordBlob); + LogSpec spec = expectedByKey.get(key); + assertNotNull(spec, "unexpected log record with key '" + key + "' [" + caseName + "]"); verifyLogRecord( - CodedInputStream.newInstance(logRecordBlobs.get(r)), - group.get(r), - resolveContext(spans, group.get(r)), - caseName); + CodedInputStream.newInstance(recordBlob), spec, resolveContext(spans, spec), caseName); } } } @@ -457,15 +425,22 @@ void testCollectLogs(String caseName, List specs) throws IOException { // ── span construction ───────────────────────────────────────────────────── private static List buildSpans(List specs) { - int maxIndex = specs.stream().mapToInt(s -> s.spanContextIndex).max().orElse(-1); + int maxIndex = -1; + for (LogSpec spec : specs) { + if (spec.spanContextIndex > maxIndex) maxIndex = spec.spanContextIndex; + } if (maxIndex < 0) { return new ArrayList<>(); } List spans = new ArrayList<>(maxIndex + 1); for (int i = 0; i <= maxIndex; i++) { - final int idx = i; - LogSpec refSpec = - specs.stream().filter(s -> s.spanContextIndex == idx).findFirst().orElse(null); + LogSpec refSpec = null; + for (LogSpec spec : specs) { + if (spec.spanContextIndex == i) { + refSpec = spec; + break; + } + } AgentSpan span; if (refSpec != null && refSpec.use128BitTraceId) { ExtractedContext parent128 = @@ -502,15 +477,10 @@ private static AgentSpanContext resolveContext(List spans, LogSpec spec) // ── grouping helper ─────────────────────────────────────────────────────── - private static List> groupByScope(List specs) { - List> groups = new ArrayList<>(); - OtelInstrumentationScope lastScope = null; + private static Map> groupByScope(List specs) { + Map> groups = new LinkedHashMap<>(); for (LogSpec spec : specs) { - if (!spec.scope.equals(lastScope)) { - groups.add(new ArrayList<>()); - lastScope = spec.scope; - } - groups.get(groups.size() - 1).add(spec); + groups.computeIfAbsent(spec.scope.getName().toString(), k -> new ArrayList<>()).add(spec); } return groups; } @@ -521,89 +491,82 @@ private static List> groupByScope(List specs) { * Parses a {@code Resource} message body and asserts it contains a {@code service.name} * attribute. */ - private static void verifyResource(CodedInputStream res) throws IOException { + private static void verifyResource(CodedInputStream resource) throws IOException { boolean foundServiceName = false; - while (!res.isAtEnd()) { - int tag = res.readTag(); + while (!resource.isAtEnd()) { + int tag = resource.readTag(); if (WireFormat.getTagFieldNumber(tag) == 1) { - String key = readKeyValueKey(res.readBytes().newCodedInput()); + String key = readKeyValueKey(resource.readBytes().newCodedInput()); if ("service.name".equals(key)) { foundServiceName = true; } } else { - res.skipField(tag); + resource.skipField(tag); } } assertTrue(foundServiceName, "Resource must contain a 'service.name' attribute"); } /** - * Parses a {@code ScopeLogs} message body, verifies its scope and schema_url, and returns the raw - * bytes of each {@code log_records} entry. + * Parses a {@code ScopeLogs} message body into a {@link ParsedScope} containing the scope + * identity and raw bytes of each {@code log_records} entry. * *

    *   ScopeLogs { scope=1, log_records=2, schema_url=3 }
    * 
*/ - private static List parseScopeLogs( - CodedInputStream sl, OtelInstrumentationScope expectedScope, String caseName) + private static ParsedScope parseScopeLogs(CodedInputStream scopeLogs, String caseName) throws IOException { + String name = null; + String version = null; + String schemaUrl = null; List logRecords = new ArrayList<>(); boolean scopeFound = false; - String parsedSchemaUrl = null; - while (!sl.isAtEnd()) { - int tag = sl.readTag(); + while (!scopeLogs.isAtEnd()) { + int tag = scopeLogs.readTag(); switch (WireFormat.getTagFieldNumber(tag)) { case 1: - verifyScope(sl.readBytes().newCodedInput(), expectedScope, caseName); + CodedInputStream scopeStream = scopeLogs.readBytes().newCodedInput(); scopeFound = true; + while (!scopeStream.isAtEnd()) { + int scopeTag = scopeStream.readTag(); + switch (WireFormat.getTagFieldNumber(scopeTag)) { + case 1: + name = scopeStream.readString(); + break; + case 2: + version = scopeStream.readString(); + break; + default: + scopeStream.skipField(scopeTag); + } + } break; case 2: - logRecords.add(sl.readBytes().toByteArray()); + logRecords.add(scopeLogs.readBytes().toByteArray()); break; case 3: - parsedSchemaUrl = sl.readString(); + schemaUrl = scopeLogs.readString(); break; default: - sl.skipField(tag); + scopeLogs.skipField(tag); } } assertTrue(scopeFound, "InstrumentationScope must be present in ScopeLogs [" + caseName + "]"); - String expectedSchemaUrl = - expectedScope.getSchemaUrl() != null ? expectedScope.getSchemaUrl().toString() : null; - assertEquals(expectedSchemaUrl, parsedSchemaUrl, "schema_url mismatch [" + caseName + "]"); - return logRecords; + return new ParsedScope(name, version, schemaUrl, logRecords); } - /** - * Parses an {@code InstrumentationScope} message body and verifies name and version. - * - *
-   *   InstrumentationScope { name=1, version=2 }
-   * 
- */ + /** Verifies that a parsed scope's identity fields match the expected scope. */ private static void verifyScope( - CodedInputStream cs, OtelInstrumentationScope expected, String caseName) throws IOException { - String parsedName = null; - String parsedVersion = null; - while (!cs.isAtEnd()) { - int tag = cs.readTag(); - switch (WireFormat.getTagFieldNumber(tag)) { - case 1: - parsedName = cs.readString(); - break; - case 2: - parsedVersion = cs.readString(); - break; - default: - cs.skipField(tag); - } - } + ParsedScope parsed, OtelInstrumentationScope expected, String caseName) { assertEquals( - expected.getName().toString(), parsedName, "scope.name mismatch [" + caseName + "]"); + expected.getName().toString(), parsed.name, "scope.name mismatch [" + caseName + "]"); String expectedVersion = expected.getVersion() != null ? expected.getVersion().toString() : null; - assertEquals(expectedVersion, parsedVersion, "scope.version mismatch [" + caseName + "]"); + assertEquals(expectedVersion, parsed.version, "scope.version mismatch [" + caseName + "]"); + String expectedSchemaUrl = + expected.getSchemaUrl() != null ? expected.getSchemaUrl().toString() : null; + assertEquals(expectedSchemaUrl, parsed.schemaUrl, "schema_url mismatch [" + caseName + "]"); } /** @@ -616,7 +579,7 @@ private static void verifyScope( * */ private static void verifyLogRecord( - CodedInputStream lr, LogSpec spec, @Nullable AgentSpanContext ctx, String caseName) + CodedInputStream logRecord, LogSpec spec, @Nullable AgentSpanContext ctx, String caseName) throws IOException { long parsedTimestamp = -1; long parsedObserved = -1; @@ -630,42 +593,42 @@ private static void verifyLogRecord( byte[] parsedSpanId = null; String parsedEventName = null; - while (!lr.isAtEnd()) { - int tag = lr.readTag(); + while (!logRecord.isAtEnd()) { + int tag = logRecord.readTag(); switch (WireFormat.getTagFieldNumber(tag)) { case 1: - parsedTimestamp = lr.readFixed64(); + parsedTimestamp = logRecord.readFixed64(); break; case 2: - parsedSeverityNumber = lr.readEnum(); + parsedSeverityNumber = logRecord.readEnum(); break; case 3: - parsedSeverityText = lr.readString(); + parsedSeverityText = logRecord.readString(); break; case 5: - parsedBody = readBodyString(lr.readBytes().newCodedInput()); + parsedBody = readBodyString(logRecord.readBytes().newCodedInput()); break; case 6: - attrKeys.add(readKeyValueKey(lr.readBytes().newCodedInput())); + attrKeys.add(readKeyValueKey(logRecord.readBytes().newCodedInput())); break; case 8: - parsedFlags = lr.readFixed32(); + parsedFlags = logRecord.readFixed32(); flagsFound = true; break; case 9: - parsedTraceId = lr.readBytes().toByteArray(); + parsedTraceId = logRecord.readBytes().toByteArray(); break; case 10: - parsedSpanId = lr.readBytes().toByteArray(); + parsedSpanId = logRecord.readBytes().toByteArray(); break; case 11: - parsedObserved = lr.readFixed64(); + parsedObserved = logRecord.readFixed64(); break; case 12: - parsedEventName = lr.readString(); + parsedEventName = logRecord.readString(); break; default: - lr.skipField(tag); + logRecord.skipField(tag); } } @@ -722,16 +685,40 @@ private static void verifyLogRecord( // ── proto parsing helpers ───────────────────────────────────────────────── + private static String logKey(LogSpec spec) { + return spec.body != null ? spec.body : "~event:" + spec.eventName; + } + + private static String parseLogKey(byte[] blob) throws IOException { + CodedInputStream logRecord = CodedInputStream.newInstance(blob); + String body = null; + String eventName = null; + while (!logRecord.isAtEnd()) { + int tag = logRecord.readTag(); + switch (WireFormat.getTagFieldNumber(tag)) { + case 5: + body = readBodyString(logRecord.readBytes().newCodedInput()); + break; + case 12: + eventName = logRecord.readString(); + break; + default: + logRecord.skipField(tag); + } + } + return body != null ? body : "~event:" + eventName; + } + /** * Reads a {@code AnyValue} body and returns the string value from field 1 ({@code string_value}). */ - private static String readBodyString(CodedInputStream av) throws IOException { - while (!av.isAtEnd()) { - int tag = av.readTag(); + private static String readBodyString(CodedInputStream anyValue) throws IOException { + while (!anyValue.isAtEnd()) { + int tag = anyValue.readTag(); if (WireFormat.getTagFieldNumber(tag) == 1) { - return av.readString(); + return anyValue.readString(); } - av.skipField(tag); + anyValue.skipField(tag); } return null; } @@ -740,17 +727,15 @@ private static String readBodyString(CodedInputStream av) throws IOException { * Reads a {@code KeyValue} body and returns the key (field 1). The value is skipped; its encoding * is covered by {@code OtlpCommonProtoTest}. */ - private static String readKeyValueKey(CodedInputStream kv) throws IOException { - String key = null; - while (!kv.isAtEnd()) { - int tag = kv.readTag(); + private static String readKeyValueKey(CodedInputStream keyValue) throws IOException { + while (!keyValue.isAtEnd()) { + int tag = keyValue.readTag(); if (WireFormat.getTagFieldNumber(tag) == 1) { - key = kv.readString(); - } else { - kv.skipField(tag); + return keyValue.readString(); } + keyValue.skipField(tag); } - return key; + return null; } /** Returns the {@link OtlpAttributeVisitor} type constant for a given value. */ diff --git a/dd-trace-core/src/test/java/datadog/trace/core/otlp/metrics/OtlpMetricsProtoTest.java b/dd-trace-core/src/test/java/datadog/trace/core/otlp/metrics/OtlpMetricsProtoTest.java index a336b7d4bcc..8cc2ac4ea81 100644 --- a/dd-trace-core/src/test/java/datadog/trace/core/otlp/metrics/OtlpMetricsProtoTest.java +++ b/dd-trace-core/src/test/java/datadog/trace/core/otlp/metrics/OtlpMetricsProtoTest.java @@ -1,14 +1,21 @@ package datadog.trace.core.otlp.metrics; +import static datadog.trace.bootstrap.otel.metrics.OtelInstrumentType.COUNTER; +import static datadog.trace.bootstrap.otel.metrics.OtelInstrumentType.GAUGE; +import static datadog.trace.bootstrap.otel.metrics.OtelInstrumentType.HISTOGRAM; +import static datadog.trace.bootstrap.otel.metrics.OtelInstrumentType.OBSERVABLE_COUNTER; +import static datadog.trace.bootstrap.otel.metrics.OtelInstrumentType.OBSERVABLE_GAUGE; +import static datadog.trace.bootstrap.otel.metrics.OtelInstrumentType.OBSERVABLE_UP_DOWN_COUNTER; +import static datadog.trace.bootstrap.otel.metrics.OtelInstrumentType.UP_DOWN_COUNTER; import static datadog.trace.bootstrap.otlp.common.OtlpAttributeVisitor.BOOLEAN_ATTRIBUTE; import static datadog.trace.bootstrap.otlp.common.OtlpAttributeVisitor.DOUBLE_ATTRIBUTE; import static datadog.trace.bootstrap.otlp.common.OtlpAttributeVisitor.LONG_ATTRIBUTE; import static datadog.trace.bootstrap.otlp.common.OtlpAttributeVisitor.STRING_ATTRIBUTE; import static java.util.Arrays.asList; import static java.util.Collections.emptyList; -import static java.util.stream.Collectors.toList; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; import com.google.protobuf.CodedInputStream; @@ -24,10 +31,11 @@ import datadog.trace.bootstrap.otlp.metrics.OtlpMetricVisitor; import datadog.trace.bootstrap.otlp.metrics.OtlpScopedMetricsVisitor; import datadog.trace.core.otlp.common.OtlpPayload; -import java.io.ByteArrayOutputStream; import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.stream.Stream; import org.junit.jupiter.params.ParameterizedTest; @@ -77,6 +85,10 @@ static final class ScopeSpec { this.schemaUrl = schemaUrl; this.metrics = metrics; } + + OtelInstrumentationScope toScope() { + return new OtelInstrumentationScope(name, version, schemaUrl); + } } static final class MetricSpec { @@ -104,6 +116,10 @@ static final class MetricSpec { this.point = point; this.attrs = attrs; } + + OtelInstrumentDescriptor toDescriptor() { + return new OtelInstrumentDescriptor(name, type, longValues, description, unit); + } } static final class AttrSpec { @@ -118,6 +134,20 @@ static final class AttrSpec { } } + private static final class ParsedScopeMetrics { + final String name; + final String version; + final String schemaUrl; + final List metricBlobs; + + ParsedScopeMetrics(String name, String version, String schemaUrl, List metricBlobs) { + this.name = name; + this.version = version; + this.schemaUrl = schemaUrl; + this.metricBlobs = metricBlobs; + } + } + // ── shorthand builders ──────────────────────────────────────────────────── private static ScopeSpec scope(String name, MetricSpec... metrics) { @@ -130,111 +160,62 @@ private static ScopeSpec scopeFull( } private static MetricSpec counterLong(String name, long value, AttrSpec... attrs) { - return new MetricSpec( - name, null, null, OtelInstrumentType.COUNTER, true, longPoint(value), asList(attrs)); + return new MetricSpec(name, null, null, COUNTER, true, longPoint(value), asList(attrs)); } private static MetricSpec counterLongFull( String name, String desc, String unit, long value, AttrSpec... attrs) { - return new MetricSpec( - name, desc, unit, OtelInstrumentType.COUNTER, true, longPoint(value), asList(attrs)); + return new MetricSpec(name, desc, unit, COUNTER, true, longPoint(value), asList(attrs)); } private static MetricSpec counterDouble(String name, double value, AttrSpec... attrs) { - return new MetricSpec( - name, null, null, OtelInstrumentType.COUNTER, false, doublePoint(value), asList(attrs)); + return new MetricSpec(name, null, null, COUNTER, false, doublePoint(value), asList(attrs)); } private static MetricSpec gaugeLong(String name, long value) { - return new MetricSpec( - name, null, null, OtelInstrumentType.GAUGE, true, longPoint(value), emptyList()); + return new MetricSpec(name, null, null, GAUGE, true, longPoint(value), emptyList()); } private static MetricSpec gaugeDouble(String name, double value) { - return new MetricSpec( - name, null, null, OtelInstrumentType.GAUGE, false, doublePoint(value), emptyList()); + return new MetricSpec(name, null, null, GAUGE, false, doublePoint(value), emptyList()); } private static MetricSpec upDownLong(String name, long value, AttrSpec... attrs) { - return new MetricSpec( - name, - null, - null, - OtelInstrumentType.UP_DOWN_COUNTER, - true, - longPoint(value), - asList(attrs)); + return new MetricSpec(name, null, null, UP_DOWN_COUNTER, true, longPoint(value), asList(attrs)); } private static MetricSpec upDownDouble(String name, double value, AttrSpec... attrs) { return new MetricSpec( - name, - null, - null, - OtelInstrumentType.UP_DOWN_COUNTER, - false, - doublePoint(value), - asList(attrs)); + name, null, null, UP_DOWN_COUNTER, false, doublePoint(value), asList(attrs)); } private static MetricSpec observableGaugeLong(String name, long value) { - return new MetricSpec( - name, null, null, OtelInstrumentType.OBSERVABLE_GAUGE, true, longPoint(value), emptyList()); + return new MetricSpec(name, null, null, OBSERVABLE_GAUGE, true, longPoint(value), emptyList()); } private static MetricSpec observableGaugeDouble(String name, double value) { return new MetricSpec( - name, - null, - null, - OtelInstrumentType.OBSERVABLE_GAUGE, - false, - doublePoint(value), - emptyList()); + name, null, null, OBSERVABLE_GAUGE, false, doublePoint(value), emptyList()); } private static MetricSpec observableCounterLong(String name, long value) { return new MetricSpec( - name, - null, - null, - OtelInstrumentType.OBSERVABLE_COUNTER, - true, - longPoint(value), - emptyList()); + name, null, null, OBSERVABLE_COUNTER, true, longPoint(value), emptyList()); } private static MetricSpec observableCounterDouble(String name, double value) { return new MetricSpec( - name, - null, - null, - OtelInstrumentType.OBSERVABLE_COUNTER, - false, - doublePoint(value), - emptyList()); + name, null, null, OBSERVABLE_COUNTER, false, doublePoint(value), emptyList()); } private static MetricSpec observableUpDownCounterLong(String name, long value) { return new MetricSpec( - name, - null, - null, - OtelInstrumentType.OBSERVABLE_UP_DOWN_COUNTER, - true, - longPoint(value), - emptyList()); + name, null, null, OBSERVABLE_UP_DOWN_COUNTER, true, longPoint(value), emptyList()); } private static MetricSpec observableUpDownCounterDouble(String name, double value) { return new MetricSpec( - name, - null, - null, - OtelInstrumentType.OBSERVABLE_UP_DOWN_COUNTER, - false, - doublePoint(value), - emptyList()); + name, null, null, OBSERVABLE_UP_DOWN_COUNTER, false, doublePoint(value), emptyList()); } private static MetricSpec histogram( @@ -250,7 +231,7 @@ private static MetricSpec histogram( name, null, null, - OtelInstrumentType.HISTOGRAM, + HISTOGRAM, false, histogramPoint(count, bounds, counts, sum, min, max), asList(attrs)); @@ -563,64 +544,93 @@ void testCollectMetrics(String caseName, List expectedScopes) throws OtlpPayload payload = collector.collectMetrics( visitor -> { - for (ScopeSpec s : expectedScopes) { - OtlpScopedMetricsVisitor sv = - visitor.visitScopedMetrics( - new OtelInstrumentationScope(s.name, s.version, s.schemaUrl)); - for (MetricSpec m : s.metrics) { - OtlpMetricVisitor mv = - sv.visitMetric( - descriptor(m.name, m.type, m.longValues, m.description, m.unit)); - for (AttrSpec a : m.attrs) { - mv.visitAttribute(a.type, a.key, a.value); + for (ScopeSpec scope : expectedScopes) { + OtlpScopedMetricsVisitor sv = visitor.visitScopedMetrics(scope.toScope()); + for (MetricSpec metric : scope.metrics) { + OtlpMetricVisitor mv = sv.visitMetric(metric.toDescriptor()); + for (AttrSpec attr : metric.attrs) { + mv.visitAttribute(attr.type, attr.key, attr.value); } - mv.visitDataPoint(m.point); + mv.visitDataPoint(metric.point); } } }); // Scopes with no metrics produce no wire output — filter them for verification - List nonEmptyScopes = - expectedScopes.stream().filter(s -> !s.metrics.isEmpty()).collect(toList()); + List nonEmptyScopes = new ArrayList<>(); + for (ScopeSpec scope : expectedScopes) { + if (!scope.metrics.isEmpty()) nonEmptyScopes.add(scope); + } if (nonEmptyScopes.isEmpty()) { assertEquals(0, payload.getContentLength(), "empty registry must produce empty payload"); return; } - // drain all chunks into a single contiguous byte array - ByteArrayOutputStream baos = new ByteArrayOutputStream(payload.getContentLength()); - payload.drain(baos::write); - byte[] bytes = baos.toByteArray(); - assertTrue(bytes.length > 0, "non-empty registry must produce bytes"); + assertTrue(payload.getContentLength() > 0, "non-empty registry must produce bytes"); // ── parse MetricsData ────────────────────────────────────────────────── // The full payload encodes a single MetricsData.resource_metrics (field 1, LEN). - CodedInputStream md = CodedInputStream.newInstance(bytes); - int mdTag = md.readTag(); - assertEquals(1, WireFormat.getTagFieldNumber(mdTag), "MetricsData.resource_metrics is field 1"); - assertEquals(WireFormat.WIRETYPE_LENGTH_DELIMITED, WireFormat.getTagWireType(mdTag)); - CodedInputStream rm = md.readBytes().newCodedInput(); - assertTrue(md.isAtEnd(), "expected exactly one ResourceMetrics"); - - // ── parse ResourceMetrics ────────────────────────────────────────────── + CodedInputStream metricsData = CodedInputStream.newInstance(payload.getContent()); + int metricsTag = metricsData.readTag(); + assertEquals( + 1, WireFormat.getTagFieldNumber(metricsTag), "MetricsData.resource_metrics is field 1"); + assertEquals(WireFormat.WIRETYPE_LENGTH_DELIMITED, WireFormat.getTagWireType(metricsTag)); + CodedInputStream resourceMetrics = metricsData.readBytes().newCodedInput(); + assertTrue(metricsData.isAtEnd(), "expected exactly one ResourceMetrics"); + + // ── parse ResourceMetrics (order-insensitive) ────────────────────────── // Fields: resource=1, scope_metrics=2 (repeated) boolean resourceFound = false; - int scopeIdx = 0; - while (!rm.isAtEnd()) { - int rmTag = rm.readTag(); - int rmField = WireFormat.getTagFieldNumber(rmTag); + Map parsedScopes = new HashMap<>(); + while (!resourceMetrics.isAtEnd()) { + int tag = resourceMetrics.readTag(); + int rmField = WireFormat.getTagFieldNumber(tag); if (rmField == 1) { - verifyResource(rm.readBytes().newCodedInput()); + verifyResource(resourceMetrics.readBytes().newCodedInput()); resourceFound = true; continue; } assertEquals(2, rmField, "ResourceMetrics.scope_metrics is field 2"); - assertTrue(scopeIdx < nonEmptyScopes.size(), "more ScopeMetrics than expected"); - verifyScopeMetrics(rm.readBytes().newCodedInput(), nonEmptyScopes.get(scopeIdx++)); + ParsedScopeMetrics parsedScope = + parseScopeMetrics(resourceMetrics.readBytes().newCodedInput()); + parsedScopes.put(parsedScope.name, parsedScope); } assertTrue(resourceFound, "Resource message must be present in ResourceMetrics"); - assertEquals(nonEmptyScopes.size(), scopeIdx, "scope count mismatch in case: " + caseName); + assertEquals( + nonEmptyScopes.size(), parsedScopes.size(), "scope count mismatch in case: " + caseName); + + for (ScopeSpec expected : nonEmptyScopes) { + ParsedScopeMetrics parsedScope = parsedScopes.get(expected.name); + assertNotNull( + parsedScope, + "no ScopeMetrics found for scope '" + expected.name + "' [" + caseName + "]"); + assertEquals(expected.version, parsedScope.version, "scope version"); + assertEquals(expected.schemaUrl, parsedScope.schemaUrl, "scope schemaUrl"); + assertEquals( + expected.metrics.size(), + parsedScope.metricBlobs.size(), + "metric count in scope " + expected.name); + + Map expectedMetricsByName = new HashMap<>(); + for (MetricSpec metricSpec : expected.metrics) { + expectedMetricsByName.put(metricSpec.name, metricSpec); + } + for (byte[] metricBlob : parsedScope.metricBlobs) { + String metricName = parseMetricName(metricBlob); + MetricSpec metricSpec = expectedMetricsByName.get(metricName); + assertNotNull( + metricSpec, + "unexpected metric '" + + metricName + + "' in scope " + + expected.name + + " [" + + caseName + + "]"); + verifyMetric(CodedInputStream.newInstance(metricBlob), metricSpec); + } + } } // ── verification helpers ────────────────────────────────────────────────── @@ -633,73 +643,78 @@ void testCollectMetrics(String caseName, List expectedScopes) throws * Resource { repeated KeyValue attributes = 1; } * */ - private static void verifyResource(CodedInputStream res) throws IOException { + private static void verifyResource(CodedInputStream resource) throws IOException { boolean foundServiceName = false; - while (!res.isAtEnd()) { - int tag = res.readTag(); + while (!resource.isAtEnd()) { + int tag = resource.readTag(); if (WireFormat.getTagFieldNumber(tag) == 1) { // attributes (repeated KeyValue) - String key = readKeyValueKey(res.readBytes().newCodedInput()); + String key = readKeyValueKey(resource.readBytes().newCodedInput()); if ("service.name".equals(key)) { foundServiceName = true; } } else { - res.skipField(tag); + resource.skipField(tag); } } assertTrue(foundServiceName, "Resource must contain a 'service.name' attribute"); } /** - * Parses a {@code ScopeMetrics} message body and asserts its content matches {@code expected}. + * Parses a {@code ScopeMetrics} message body into a {@link ParsedScopeMetrics} containing the + * scope identity and raw bytes of each {@code metrics} entry. * *
    *   ScopeMetrics { scope=1, metrics=2, schema_url=3 }
    * 
*/ - private static void verifyScopeMetrics(CodedInputStream sm, ScopeSpec expected) + private static ParsedScopeMetrics parseScopeMetrics(CodedInputStream scopeMetrics) throws IOException { - String parsedName = null; - String parsedVersion = null; - String parsedSchemaUrl = null; - int metricIdx = 0; - - while (!sm.isAtEnd()) { - int tag = sm.readTag(); + String name = null; + String version = null; + String schemaUrl = null; + List metricBlobs = new ArrayList<>(); + while (!scopeMetrics.isAtEnd()) { + int tag = scopeMetrics.readTag(); switch (WireFormat.getTagFieldNumber(tag)) { case 1: // InstrumentationScope - CodedInputStream scope = sm.readBytes().newCodedInput(); - while (!scope.isAtEnd()) { - int st = scope.readTag(); - switch (WireFormat.getTagFieldNumber(st)) { + CodedInputStream scopeStream = scopeMetrics.readBytes().newCodedInput(); + while (!scopeStream.isAtEnd()) { + int scopeTag = scopeStream.readTag(); + switch (WireFormat.getTagFieldNumber(scopeTag)) { case 1: - parsedName = scope.readString(); + name = scopeStream.readString(); break; case 2: - parsedVersion = scope.readString(); + version = scopeStream.readString(); break; default: - scope.skipField(st); + scopeStream.skipField(scopeTag); } } break; case 2: // Metric (repeated) - assertTrue( - metricIdx < expected.metrics.size(), - "more metrics than expected in scope " + expected.name); - verifyMetric(sm.readBytes().newCodedInput(), expected.metrics.get(metricIdx++)); + metricBlobs.add(scopeMetrics.readBytes().toByteArray()); break; case 3: // schema_url - parsedSchemaUrl = sm.readString(); + schemaUrl = scopeMetrics.readString(); break; default: - sm.skipField(tag); + scopeMetrics.skipField(tag); } } + return new ParsedScopeMetrics(name, version, schemaUrl, metricBlobs); + } - assertEquals(expected.name, parsedName, "scope name"); - assertEquals(expected.version, parsedVersion, "scope version"); - assertEquals(expected.schemaUrl, parsedSchemaUrl, "scope schemaUrl"); - assertEquals(expected.metrics.size(), metricIdx, "metric count in scope " + expected.name); + private static String parseMetricName(byte[] blob) throws IOException { + CodedInputStream metricStream = CodedInputStream.newInstance(blob); + while (!metricStream.isAtEnd()) { + int tag = metricStream.readTag(); + if (WireFormat.getTagFieldNumber(tag) == 1) { + return metricStream.readString(); + } + metricStream.skipField(tag); + } + return null; } /** @@ -709,44 +724,42 @@ private static void verifyScopeMetrics(CodedInputStream sm, ScopeSpec expected) * Metric { name=1, description=2, unit=3, gauge=5, sum=7, histogram=9 } * */ - private static void verifyMetric(CodedInputStream m, MetricSpec expected) throws IOException { + private static void verifyMetric(CodedInputStream metric, MetricSpec expected) + throws IOException { String parsedName = null; String parsedDesc = null; String parsedUnit = null; boolean dataFound = false; - while (!m.isAtEnd()) { - int tag = m.readTag(); + while (!metric.isAtEnd()) { + int tag = metric.readTag(); switch (WireFormat.getTagFieldNumber(tag)) { case 1: - parsedName = m.readString(); + parsedName = metric.readString(); break; case 2: - parsedDesc = m.readString(); + parsedDesc = metric.readString(); break; case 3: - parsedUnit = m.readString(); + parsedUnit = metric.readString(); break; case 5: // Gauge assertTrue(isGaugeType(expected.type), "unexpected gauge for " + expected.name); - verifyGauge(m.readBytes().newCodedInput(), expected); + verifyGauge(metric.readBytes().newCodedInput(), expected); dataFound = true; break; case 7: // Sum assertTrue(isSumType(expected.type), "unexpected sum for " + expected.name); - verifySum(m.readBytes().newCodedInput(), expected); + verifySum(metric.readBytes().newCodedInput(), expected); dataFound = true; break; case 9: // Histogram - assertEquals( - OtelInstrumentType.HISTOGRAM, - expected.type, - "unexpected histogram for " + expected.name); - verifyHistogram(m.readBytes().newCodedInput(), expected); + assertEquals(HISTOGRAM, expected.type, "unexpected histogram for " + expected.name); + verifyHistogram(metric.readBytes().newCodedInput(), expected); dataFound = true; break; default: - m.skipField(tag); + metric.skipField(tag); } } @@ -757,14 +770,14 @@ private static void verifyMetric(CodedInputStream m, MetricSpec expected) throws } private static boolean isGaugeType(OtelInstrumentType type) { - return type == OtelInstrumentType.GAUGE || type == OtelInstrumentType.OBSERVABLE_GAUGE; + return type == GAUGE || type == OBSERVABLE_GAUGE; } private static boolean isSumType(OtelInstrumentType type) { - return type == OtelInstrumentType.COUNTER - || type == OtelInstrumentType.OBSERVABLE_COUNTER - || type == OtelInstrumentType.UP_DOWN_COUNTER - || type == OtelInstrumentType.OBSERVABLE_UP_DOWN_COUNTER; + return type == COUNTER + || type == OBSERVABLE_COUNTER + || type == UP_DOWN_COUNTER + || type == OBSERVABLE_UP_DOWN_COUNTER; } /** @@ -774,16 +787,17 @@ private static boolean isSumType(OtelInstrumentType type) { * Gauge { data_points=1 } * */ - private static void verifyGauge(CodedInputStream g, MetricSpec expected) throws IOException { + private static void verifyGauge(CodedInputStream gauge, MetricSpec expected) throws IOException { boolean foundDataPoint = false; - while (!g.isAtEnd()) { - int tag = g.readTag(); + while (!gauge.isAtEnd()) { + int tag = gauge.readTag(); if (WireFormat.getTagFieldNumber(tag) == 1) { assertFalse(foundDataPoint, "expected exactly one data point in gauge " + expected.name); - verifyNumberDataPoint(g.readBytes().newCodedInput(), expected, /* hasStartTime= */ false); + verifyNumberDataPoint( + gauge.readBytes().newCodedInput(), expected, /* hasStartTime= */ false); foundDataPoint = true; } else { - g.skipField(tag); + gauge.skipField(tag); } } assertTrue(foundDataPoint, "no data point found in gauge " + expected.name); @@ -796,34 +810,34 @@ private static void verifyGauge(CodedInputStream g, MetricSpec expected) throws * Sum { data_points=1, aggregation_temporality=2, is_monotonic=3 } * */ - private static void verifySum(CodedInputStream s, MetricSpec expected) throws IOException { + private static void verifySum(CodedInputStream sum, MetricSpec expected) throws IOException { boolean foundDataPoint = false; boolean foundTemporality = false; - while (!s.isAtEnd()) { - int tag = s.readTag(); + while (!sum.isAtEnd()) { + int tag = sum.readTag(); switch (WireFormat.getTagFieldNumber(tag)) { case 1: // NumberDataPoint assertFalse(foundDataPoint, "expected exactly one data point in sum " + expected.name); - verifyNumberDataPoint(s.readBytes().newCodedInput(), expected, /* hasStartTime= */ true); + verifyNumberDataPoint( + sum.readBytes().newCodedInput(), expected, /* hasStartTime= */ true); foundDataPoint = true; break; case 2: // AggregationTemporality (1=DELTA, 2=CUMULATIVE) - int temporality = s.readEnum(); + int temporality = sum.readEnum(); assertTrue( temporality == 1 || temporality == 2, "aggregation_temporality must be DELTA(1) or CUMULATIVE(2)"); foundTemporality = true; break; case 3: // is_monotonic - boolean isMonotonic = s.readBool(); + boolean isMonotonic = sum.readBool(); boolean expectedMonotonic = - expected.type == OtelInstrumentType.COUNTER - || expected.type == OtelInstrumentType.OBSERVABLE_COUNTER; + expected.type == COUNTER || expected.type == OBSERVABLE_COUNTER; assertEquals(expectedMonotonic, isMonotonic, "is_monotonic for " + expected.name); break; default: - s.skipField(tag); + sum.skipField(tag); } } @@ -838,28 +852,29 @@ private static void verifySum(CodedInputStream s, MetricSpec expected) throws IO * Histogram { data_points=1, aggregation_temporality=2 } * */ - private static void verifyHistogram(CodedInputStream h, MetricSpec expected) throws IOException { + private static void verifyHistogram(CodedInputStream histogram, MetricSpec expected) + throws IOException { boolean foundDataPoint = false; boolean foundTemporality = false; - while (!h.isAtEnd()) { - int tag = h.readTag(); + while (!histogram.isAtEnd()) { + int tag = histogram.readTag(); switch (WireFormat.getTagFieldNumber(tag)) { case 1: // HistogramDataPoint assertFalse( foundDataPoint, "expected exactly one data point in histogram " + expected.name); - verifyHistogramDataPoint(h.readBytes().newCodedInput(), expected); + verifyHistogramDataPoint(histogram.readBytes().newCodedInput(), expected); foundDataPoint = true; break; case 2: // AggregationTemporality - int temporality = h.readEnum(); + int temporality = histogram.readEnum(); assertTrue( temporality == 1 || temporality == 2, "aggregation_temporality must be DELTA(1) or CUMULATIVE(2)"); foundTemporality = true; break; default: - h.skipField(tag); + histogram.skipField(tag); } } @@ -878,26 +893,27 @@ private static void verifyHistogram(CodedInputStream h, MetricSpec expected) thr * @param hasStartTime true for non-gauge types; gauges omit {@code start_time_unix_nano} */ private static void verifyNumberDataPoint( - CodedInputStream dp, MetricSpec expected, boolean hasStartTime) throws IOException { + CodedInputStream dataPoint, MetricSpec expected, boolean hasStartTime) throws IOException { boolean foundStartTime = false; boolean foundEndTime = false; boolean foundValue = false; List parsedAttrKeys = new ArrayList<>(); - while (!dp.isAtEnd()) { - int tag = dp.readTag(); + while (!dataPoint.isAtEnd()) { + int tag = dataPoint.readTag(); switch (WireFormat.getTagFieldNumber(tag)) { case 2: // start_time_unix_nano (fixed64) assertEquals( - START_EPOCH_NS, dp.readFixed64(), "start_time_unix_nano for " + expected.name); + START_EPOCH_NS, dataPoint.readFixed64(), "start_time_unix_nano for " + expected.name); foundStartTime = true; break; case 3: // time_unix_nano (fixed64) - assertEquals(END_EPOCH_NS, dp.readFixed64(), "time_unix_nano for " + expected.name); + assertEquals( + END_EPOCH_NS, dataPoint.readFixed64(), "time_unix_nano for " + expected.name); foundEndTime = true; break; case 4: // as_double (double via fixed64 wire type) - double parsedDouble = dp.readDouble(); + double parsedDouble = dataPoint.readDouble(); OtlpDoublePoint expectedDouble = (OtlpDoublePoint) expected.point; assertEquals( Double.doubleToRawLongBits(expectedDouble.value), @@ -906,16 +922,16 @@ private static void verifyNumberDataPoint( foundValue = true; break; case 6: // as_int (sfixed64) - long parsedLong = dp.readSFixed64(); + long parsedLong = dataPoint.readSFixed64(); OtlpLongPoint expectedLong = (OtlpLongPoint) expected.point; assertEquals(expectedLong.value, parsedLong, "as_int for " + expected.name); foundValue = true; break; case 7: // attributes (repeated KeyValue) - parsedAttrKeys.add(readKeyValueKey(dp.readBytes().newCodedInput())); + parsedAttrKeys.add(readKeyValueKey(dataPoint.readBytes().newCodedInput())); break; default: - dp.skipField(tag); + dataPoint.skipField(tag); } } @@ -943,7 +959,7 @@ private static void verifyNumberDataPoint( * min=11, max=12 } * */ - private static void verifyHistogramDataPoint(CodedInputStream dp, MetricSpec expected) + private static void verifyHistogramDataPoint(CodedInputStream dataPoint, MetricSpec expected) throws IOException { OtlpHistogramPoint hp = (OtlpHistogramPoint) expected.point; boolean foundStartTime = false; @@ -956,54 +972,55 @@ private static void verifyHistogramDataPoint(CodedInputStream dp, MetricSpec exp List parsedBounds = new ArrayList<>(); List parsedAttrKeys = new ArrayList<>(); - while (!dp.isAtEnd()) { - int tag = dp.readTag(); + while (!dataPoint.isAtEnd()) { + int tag = dataPoint.readTag(); switch (WireFormat.getTagFieldNumber(tag)) { case 2: // start_time_unix_nano assertEquals( - START_EPOCH_NS, dp.readFixed64(), "start_time_unix_nano for " + expected.name); + START_EPOCH_NS, dataPoint.readFixed64(), "start_time_unix_nano for " + expected.name); foundStartTime = true; break; case 3: // time_unix_nano - assertEquals(END_EPOCH_NS, dp.readFixed64(), "time_unix_nano for " + expected.name); + assertEquals( + END_EPOCH_NS, dataPoint.readFixed64(), "time_unix_nano for " + expected.name); foundEndTime = true; break; case 4: // count (fixed64) - assertEquals((long) hp.count, dp.readFixed64(), "histogram count"); + assertEquals((long) hp.count, dataPoint.readFixed64(), "histogram count"); foundCount = true; break; case 5: // sum (double via fixed64) assertEquals( Double.doubleToRawLongBits(hp.sum), - Double.doubleToRawLongBits(dp.readDouble()), + Double.doubleToRawLongBits(dataPoint.readDouble()), "histogram sum"); foundSum = true; break; case 6: // bucket_counts (repeated fixed64) - parsedBucketCounts.add(dp.readFixed64()); + parsedBucketCounts.add(dataPoint.readFixed64()); break; case 7: // explicit_bounds (repeated double) - parsedBounds.add(dp.readDouble()); + parsedBounds.add(dataPoint.readDouble()); break; case 9: // attributes (repeated KeyValue) - parsedAttrKeys.add(readKeyValueKey(dp.readBytes().newCodedInput())); + parsedAttrKeys.add(readKeyValueKey(dataPoint.readBytes().newCodedInput())); break; case 11: // min (double via fixed64) assertEquals( Double.doubleToRawLongBits(hp.min), - Double.doubleToRawLongBits(dp.readDouble()), + Double.doubleToRawLongBits(dataPoint.readDouble()), "histogram min"); foundMin = true; break; case 12: // max (double via fixed64) assertEquals( Double.doubleToRawLongBits(hp.max), - Double.doubleToRawLongBits(dp.readDouble()), + Double.doubleToRawLongBits(dataPoint.readDouble()), "histogram max"); foundMax = true; break; default: - dp.skipField(tag); + dataPoint.skipField(tag); } } @@ -1020,16 +1037,16 @@ private static void verifyHistogramDataPoint(CodedInputStream dp, MetricSpec exp // bucket_counts.size() == explicit_bounds.size() + 1, as required by the OTLP spec List expectedBounds = new ArrayList<>(); boolean hasOverflow = false; - for (double b : hp.bucketBoundaries) { - if (Double.isInfinite(b)) { + for (double boundary : hp.bucketBoundaries) { + if (Double.isInfinite(boundary)) { hasOverflow = true; } else { - expectedBounds.add(b); + expectedBounds.add(boundary); } } List expectedCounts = new ArrayList<>(); - for (double c : hp.bucketCounts) { - expectedCounts.add((long) c); + for (double count : hp.bucketCounts) { + expectedCounts.add((long) count); } if (!expectedCounts.isEmpty() && !hasOverflow) { expectedCounts.add(0L); @@ -1093,22 +1110,15 @@ private static void verifyHistogramDataPoint(CodedInputStream dp, MetricSpec exp * Reads a {@code KeyValue} body and returns the key (field 1). The value field is skipped; its * encoding is covered by {@code OtlpCommonProtoTest}. */ - private static String readKeyValueKey(CodedInputStream kv) throws IOException { - String key = null; - while (!kv.isAtEnd()) { - int tag = kv.readTag(); + private static String readKeyValueKey(CodedInputStream keyValue) throws IOException { + while (!keyValue.isAtEnd()) { + int tag = keyValue.readTag(); if (WireFormat.getTagFieldNumber(tag) == 1) { - key = kv.readString(); - } else { - kv.skipField(tag); + return keyValue.readString(); } + keyValue.skipField(tag); } - return key; - } - - static OtelInstrumentDescriptor descriptor( - String name, OtelInstrumentType type, boolean longValues, String description, String unit) { - return new OtelInstrumentDescriptor(name, type, longValues, description, unit); + return null; } static OtlpLongPoint longPoint(long value) { diff --git a/dd-trace-core/src/test/java/datadog/trace/core/otlp/trace/OtlpTraceProtoTest.java b/dd-trace-core/src/test/java/datadog/trace/core/otlp/trace/OtlpTraceProtoTest.java index 86bbd39d18f..6c8c219cbc6 100644 --- a/dd-trace-core/src/test/java/datadog/trace/core/otlp/trace/OtlpTraceProtoTest.java +++ b/dd-trace-core/src/test/java/datadog/trace/core/otlp/trace/OtlpTraceProtoTest.java @@ -9,6 +9,7 @@ import static java.util.Arrays.asList; import static java.util.Arrays.copyOfRange; import static java.util.Collections.emptyList; +import static java.util.Collections.emptyMap; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotEquals; @@ -32,7 +33,6 @@ import datadog.trace.core.otlp.common.OtlpPayload; import datadog.trace.core.propagation.ExtractedContext; import datadog.trace.core.propagation.PropagationTags; -import java.io.ByteArrayOutputStream; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; @@ -88,40 +88,40 @@ static final class SpanSpec { final String spanType; /** Span kind tag value; {@code null} → UNSPECIFIED (kind=0). */ - final String spanKind; + String spanKind; /** Start time in microseconds since epoch → start_time_unix_nano = startMicros * 1000. */ - final long startMicros; + long startMicros; /** Finish time in microseconds since epoch → end_time_unix_nano = finishMicros * 1000. */ - final long finishMicros; + long finishMicros; /** If true, marks the span as an error → status.code=ERROR(2). */ - final boolean error; + boolean error; /** Optional error message → status.message; ignored when {@code error} is false. */ - final String errorMessage; + String errorMessage; /** Sampling priority to set; 0 = not set explicitly. */ - final int samplingPriority; + int samplingPriority; /** Override service name; {@code null} → use tracer default. */ - final String serviceName; + String serviceName; /** Additional tags to set on the span, exercising string/long/boolean/double paths. */ - final Map extraTags; + Map extraTags; /** * If ≥ 0, index into the already-built span list to use as parent; creates a child span. If -1, * the span is a root span. */ - final int parentIndex; + int parentIndex; /** * Links to add to this span (one {@link SpanLink} per entry). Each link targets a span that * precedes this one in the list. An empty array means no links. */ - final LinkSpec[] links; + LinkSpec[] links; /** If true, the span is measured (sets the {@code _dd.measured} attribute). */ boolean measured; @@ -138,33 +138,57 @@ static final class SpanSpec { /** Trace origin carried in the extracted parent context; {@code null} = no origin. */ String origin; - SpanSpec( - String resourceName, - String operationName, - String spanType, - String spanKind, - long startMicros, - long finishMicros, - boolean error, - String errorMessage, - int samplingPriority, - String serviceName, - Map extraTags, - int parentIndex, - LinkSpec... links) { + SpanSpec(String resourceName, String operationName, String spanType) { this.resourceName = resourceName; this.operationName = operationName; this.spanType = spanType; - this.spanKind = spanKind; - this.startMicros = startMicros; - this.finishMicros = finishMicros; - this.error = error; - this.errorMessage = errorMessage; - this.samplingPriority = samplingPriority; - this.serviceName = serviceName; - this.extraTags = extraTags; - this.parentIndex = parentIndex; + this.startMicros = BASE_MICROS; + this.finishMicros = BASE_MICROS + DURATION_MICROS; + this.parentIndex = -1; + this.links = new LinkSpec[0]; + this.extraTags = emptyMap(); + } + + SpanSpec spanKind(String kind) { + this.spanKind = kind; + return this; + } + + SpanSpec times(long start, long finish) { + this.startMicros = start; + this.finishMicros = finish; + return this; + } + + SpanSpec error(String message) { + this.error = true; + this.errorMessage = message; + return this; + } + + SpanSpec samplingPriority(int priority) { + this.samplingPriority = priority; + return this; + } + + SpanSpec serviceName(String name) { + this.serviceName = name; + return this; + } + + SpanSpec extraTags(Map tags) { + this.extraTags = tags; + return this; + } + + SpanSpec parentIndex(int index) { + this.parentIndex = index; + return this; + } + + SpanSpec links(LinkSpec... links) { this.links = links; + return this; } SpanSpec measured() { @@ -231,115 +255,33 @@ static final class LinkSpec { DD128bTraceId.from(0x0123456789abcdefL, 0xfedcba9876543210L); private static SpanSpec span(String resourceName, String operationName, String spanType) { - return new SpanSpec( - resourceName, - operationName, - spanType, - null, - BASE_MICROS, - BASE_MICROS + DURATION_MICROS, - false, - null, - 0, - null, - new HashMap<>(), - -1); + return new SpanSpec(resourceName, operationName, spanType); } private static SpanSpec kindSpan(String resourceName, String kind) { - return new SpanSpec( - resourceName, - "op." + kind, - "web", - kind, - BASE_MICROS, - BASE_MICROS + DURATION_MICROS, - false, - null, - 0, - null, - new HashMap<>(), - -1); + return span(resourceName, "op." + kind, "web").spanKind(kind); } private static SpanSpec sampledSpan(String resourceName) { - return new SpanSpec( - resourceName, - "op.sampled", - "web", - null, - BASE_MICROS, - BASE_MICROS + DURATION_MICROS, - false, - null, - PrioritySampling.USER_KEEP, - null, - new HashMap<>(), - -1); + return span(resourceName, "op.sampled", "web").samplingPriority(PrioritySampling.USER_KEEP); } private static SpanSpec errorSpan(String resourceName, String errorMessage) { - return new SpanSpec( - resourceName, - "op.error", - "web", - null, - BASE_MICROS, - BASE_MICROS + DURATION_MICROS, - true, - errorMessage, - 0, - null, - new HashMap<>(), - -1); + return span(resourceName, "op.error", "web").error(errorMessage); } private static SpanSpec taggedSpan(String resourceName, Map extraTags) { - return new SpanSpec( - resourceName, - "op.tagged", - "web", - null, - BASE_MICROS, - BASE_MICROS + DURATION_MICROS, - false, - null, - 0, - null, - extraTags, - -1); + return span(resourceName, "op.tagged", "web").extraTags(extraTags); } private static SpanSpec childSpan(String resourceName, int parentIndex) { - return new SpanSpec( - resourceName, - "op.child", - "web", - null, - BASE_MICROS + 10_000, - BASE_MICROS + DURATION_MICROS - 10_000, - false, - null, - 0, - null, - new HashMap<>(), - parentIndex); + return span(resourceName, "op.child", "web") + .times(BASE_MICROS + 10_000, BASE_MICROS + DURATION_MICROS - 10_000) + .parentIndex(parentIndex); } private static SpanSpec serviceSpan(String resourceName, String serviceName) { - return new SpanSpec( - resourceName, - "op.service", - "web", - null, - BASE_MICROS, - BASE_MICROS + DURATION_MICROS, - false, - null, - 0, - serviceName, - new HashMap<>(), - -1); + return span(resourceName, "op.service", "web").serviceName(serviceName); } /** A span with {@link SpanLink}s pointing to the spans at the given {@code targetIndices}. */ @@ -348,20 +290,7 @@ private static SpanSpec linkedSpan(String resourceName, int... targetIndices) { for (int i = 0; i < targetIndices.length; i++) { links[i] = new LinkSpec(targetIndices[i]); } - return new SpanSpec( - resourceName, - "op.linked", - "web", - null, - BASE_MICROS, - BASE_MICROS + DURATION_MICROS, - false, - null, - 0, - null, - new HashMap<>(), - -1, - links); + return span(resourceName, "op.linked", "web").links(links); } /** @@ -370,58 +299,21 @@ private static SpanSpec linkedSpan(String resourceName, int... targetIndices) { */ private static SpanSpec linkedSpanWithAttrs( String resourceName, int targetIndex, SpanAttributes attributes) { - return new SpanSpec( - resourceName, - "op.linked", - "web", - null, - BASE_MICROS, - BASE_MICROS + DURATION_MICROS, - false, - null, - 0, - null, - new HashMap<>(), - -1, - new LinkSpec(targetIndex, attributes)); + return span(resourceName, "op.linked", "web").links(new LinkSpec(targetIndex, attributes)); } /** A span with one {@link SpanLink} carrying the given W3C tracestate string. */ private static SpanSpec linkedSpanWithTracestate( String resourceName, int targetIndex, String traceState) { - return new SpanSpec( - resourceName, - "op.linked", - "web", - null, - BASE_MICROS, - BASE_MICROS + DURATION_MICROS, - false, - null, - 0, - null, - new HashMap<>(), - -1, - new LinkSpec(targetIndex, SpanAttributes.EMPTY, traceState, SpanLink.DEFAULT_FLAGS)); + return span(resourceName, "op.linked", "web") + .links(new LinkSpec(targetIndex, SpanAttributes.EMPTY, traceState, SpanLink.DEFAULT_FLAGS)); } /** A span with one {@link SpanLink} carrying the given trace flags. */ private static SpanSpec linkedSpanWithFlags( String resourceName, int targetIndex, byte traceFlags) { - return new SpanSpec( - resourceName, - "op.linked", - "web", - null, - BASE_MICROS, - BASE_MICROS + DURATION_MICROS, - false, - null, - 0, - null, - new HashMap<>(), - -1, - new LinkSpec(targetIndex, SpanAttributes.EMPTY, "", traceFlags)); + return span(resourceName, "op.linked", "web") + .links(new LinkSpec(targetIndex, SpanAttributes.EMPTY, "", traceFlags)); } private static Map tags(Object... keyValues) { @@ -570,55 +462,53 @@ void testCollectTraces(String caseName, List specs) throws IOException return; } - ByteArrayOutputStream baos = new ByteArrayOutputStream(payload.getContentLength()); - payload.drain(baos::write); - byte[] bytes = baos.toByteArray(); - assertTrue(bytes.length > 0, "non-empty span list must produce bytes"); + assertTrue(payload.getContentLength() > 0, "non-empty span list must produce bytes"); // ── parse TracesData ───────────────────────────────────────────────── // Full payload encodes a single TracesData.resource_spans entry (field 1, LEN). - CodedInputStream td = CodedInputStream.newInstance(bytes); - int tdTag = td.readTag(); - assertEquals(1, WireFormat.getTagFieldNumber(tdTag), "TracesData.resource_spans is field 1"); - assertEquals(WireFormat.WIRETYPE_LENGTH_DELIMITED, WireFormat.getTagWireType(tdTag)); - CodedInputStream rs = td.readBytes().newCodedInput(); - assertTrue(td.isAtEnd(), "expected exactly one ResourceSpans"); + CodedInputStream tracesData = CodedInputStream.newInstance(payload.getContent()); + int tracesTag = tracesData.readTag(); + assertEquals( + 1, WireFormat.getTagFieldNumber(tracesTag), "TracesData.resource_spans is field 1"); + assertEquals(WireFormat.WIRETYPE_LENGTH_DELIMITED, WireFormat.getTagWireType(tracesTag)); + CodedInputStream resourceSpans = tracesData.readBytes().newCodedInput(); + assertTrue(tracesData.isAtEnd(), "expected exactly one ResourceSpans"); // ── parse ResourceSpans ────────────────────────────────────────────── // Fields: resource=1, scope_spans=2 boolean resourceFound = false; - CodedInputStream ss = null; - while (!rs.isAtEnd()) { - int rsTag = rs.readTag(); - switch (WireFormat.getTagFieldNumber(rsTag)) { + CodedInputStream scopeSpans = null; + while (!resourceSpans.isAtEnd()) { + int tag = resourceSpans.readTag(); + switch (WireFormat.getTagFieldNumber(tag)) { case 1: - verifyResource(rs.readBytes().newCodedInput()); + verifyResource(resourceSpans.readBytes().newCodedInput()); resourceFound = true; break; case 2: - ss = rs.readBytes().newCodedInput(); + scopeSpans = resourceSpans.readBytes().newCodedInput(); break; default: - rs.skipField(rsTag); + resourceSpans.skipField(tag); } } assertTrue(resourceFound, "Resource must be present in ResourceSpans"); - assertNotNull(ss, "ScopeSpans must be present in ResourceSpans"); + assertNotNull(scopeSpans, "ScopeSpans must be present in ResourceSpans"); // ── parse ScopeSpans ───────────────────────────────────────────────── // Fields: scope=1, spans=2 (repeated), schema_url=3 List spanBlobs = new ArrayList<>(); - while (!ss.isAtEnd()) { - int ssTag = ss.readTag(); - switch (WireFormat.getTagFieldNumber(ssTag)) { + while (!scopeSpans.isAtEnd()) { + int tag = scopeSpans.readTag(); + switch (WireFormat.getTagFieldNumber(tag)) { case 1: - verifyDefaultScope(ss.readBytes().newCodedInput()); + verifyDefaultScope(scopeSpans.readBytes().newCodedInput()); break; case 2: - spanBlobs.add(ss.readBytes().toByteArray()); + spanBlobs.add(scopeSpans.readBytes().toByteArray()); break; default: - ss.skipField(ssTag); + scopeSpans.skipField(tag); } } assertEquals(spans.size(), spanBlobs.size(), "span count mismatch in case: " + caseName); @@ -660,63 +550,55 @@ void testCollectMultipleTraces() throws IOException { // Collect all span IDs we expect to find across all three traces. Set expectedSpanIds = new HashSet<>(); Set expectedTraceIds = new HashSet<>(); - for (DDSpan s : trace1) { - expectedSpanIds.add(s.getSpanId()); - expectedTraceIds.add(s.getTraceId().toLong()); - } - for (DDSpan s : trace2) { - expectedSpanIds.add(s.getSpanId()); - expectedTraceIds.add(s.getTraceId().toLong()); - } - for (DDSpan s : trace3) { - expectedSpanIds.add(s.getSpanId()); - expectedTraceIds.add(s.getTraceId().toLong()); + for (List trace : asList(trace1, trace2, trace3)) { + for (DDSpan span : trace) { + expectedSpanIds.add(span.getSpanId()); + expectedTraceIds.add(span.getTraceId().toLong()); + } } int totalSpans = trace1.size() + trace2.size() + trace3.size(); // 6 - ByteArrayOutputStream baos = new ByteArrayOutputStream(payload.getContentLength()); - payload.drain(baos::write); - byte[] bytes = baos.toByteArray(); - assertTrue(bytes.length > 0, "multi-trace payload must be non-empty"); + assertTrue(payload.getContentLength() > 0, "multi-trace payload must be non-empty"); // Parse TracesData → ResourceSpans → ScopeSpans → extract span_id and trace_id per span. - CodedInputStream td = CodedInputStream.newInstance(bytes); - int tdTag = td.readTag(); - assertEquals(1, WireFormat.getTagFieldNumber(tdTag), "TracesData.resource_spans is field 1"); - assertEquals(WireFormat.WIRETYPE_LENGTH_DELIMITED, WireFormat.getTagWireType(tdTag)); - CodedInputStream rs = td.readBytes().newCodedInput(); - assertTrue(td.isAtEnd(), "expected exactly one ResourceSpans"); - - CodedInputStream ss = null; - while (!rs.isAtEnd()) { - int rsTag = rs.readTag(); - if (WireFormat.getTagFieldNumber(rsTag) == 2) { - ss = rs.readBytes().newCodedInput(); + CodedInputStream tracesData = CodedInputStream.newInstance(payload.getContent()); + int tracesTag = tracesData.readTag(); + assertEquals( + 1, WireFormat.getTagFieldNumber(tracesTag), "TracesData.resource_spans is field 1"); + assertEquals(WireFormat.WIRETYPE_LENGTH_DELIMITED, WireFormat.getTagWireType(tracesTag)); + CodedInputStream resourceSpans = tracesData.readBytes().newCodedInput(); + assertTrue(tracesData.isAtEnd(), "expected exactly one ResourceSpans"); + + CodedInputStream scopeSpans = null; + while (!resourceSpans.isAtEnd()) { + int tag = resourceSpans.readTag(); + if (WireFormat.getTagFieldNumber(tag) == 2) { + scopeSpans = resourceSpans.readBytes().newCodedInput(); } else { - rs.skipField(rsTag); + resourceSpans.skipField(tag); } } - assertNotNull(ss, "ScopeSpans must be present in ResourceSpans"); + assertNotNull(scopeSpans, "ScopeSpans must be present in ResourceSpans"); Set parsedSpanIds = new HashSet<>(); Set parsedTraceIds = new HashSet<>(); - while (!ss.isAtEnd()) { - int ssTag = ss.readTag(); - if (WireFormat.getTagFieldNumber(ssTag) == 2) { - CodedInputStream sp = ss.readBytes().newCodedInput(); + while (!scopeSpans.isAtEnd()) { + int tag = scopeSpans.readTag(); + if (WireFormat.getTagFieldNumber(tag) == 2) { + CodedInputStream spanData = scopeSpans.readBytes().newCodedInput(); byte[] parsedTraceId = null; byte[] parsedSpanId = null; - while (!sp.isAtEnd()) { - int spTag = sp.readTag(); - switch (WireFormat.getTagFieldNumber(spTag)) { + while (!spanData.isAtEnd()) { + int spanTag = spanData.readTag(); + switch (WireFormat.getTagFieldNumber(spanTag)) { case 1: - parsedTraceId = sp.readBytes().toByteArray(); + parsedTraceId = spanData.readBytes().toByteArray(); break; case 2: - parsedSpanId = sp.readBytes().toByteArray(); + parsedSpanId = spanData.readBytes().toByteArray(); break; default: - sp.skipField(spTag); + spanData.skipField(spanTag); } } assertNotNull(parsedSpanId, "span_id must be present in every span"); @@ -727,7 +609,7 @@ void testCollectMultipleTraces() throws IOException { // low-order part of traceId occupies parsedTraceId[8..15] (big-endian) parsedTraceIds.add(readBigEndianLong(copyOfRange(parsedTraceId, 8, 16))); } else { - ss.skipField(ssTag); + scopeSpans.skipField(tag); } } @@ -740,6 +622,65 @@ void testCollectMultipleTraces() throws IOException { "payload must contain spans with all three distinct trace IDs"); } + @Test + void testSpanOrderInTracePreserved() throws IOException { + // Verifies that spans appear in the payload with the same order as the original trace + List spans = + buildSpans( + asList( + span("first.span", "op.first", "web"), + span("second.span", "op.second", "web"), + span("third.span", "op.third", "web"))); + + OtlpTraceProtoCollector collector = new OtlpTraceProtoCollector(); + collector.addTrace(spans); + OtlpPayload payload = collector.collectTraces(); + + assertEquals( + asList("first.span", "second.span", "third.span"), + parseSpanNamesFromPayload(payload), + "spans must appear in trace order in the payload"); + } + + private static List parseSpanNamesFromPayload(OtlpPayload payload) throws IOException { + CodedInputStream tracesData = CodedInputStream.newInstance(payload.getContent()); + tracesData.readTag(); // field 1: TracesData.resource_spans + CodedInputStream resourceSpans = tracesData.readBytes().newCodedInput(); + + CodedInputStream scopeSpans = null; + while (!resourceSpans.isAtEnd()) { + int tag = resourceSpans.readTag(); + if (WireFormat.getTagFieldNumber(tag) == 2) { + scopeSpans = resourceSpans.readBytes().newCodedInput(); + } else { + resourceSpans.skipField(tag); + } + } + assertNotNull(scopeSpans, "ScopeSpans must be present in ResourceSpans"); + + List names = new ArrayList<>(); + while (!scopeSpans.isAtEnd()) { + int tag = scopeSpans.readTag(); + if (WireFormat.getTagFieldNumber(tag) == 2) { + CodedInputStream spanData = scopeSpans.readBytes().newCodedInput(); + String name = null; + while (!spanData.isAtEnd()) { + int spanTag = spanData.readTag(); + if (WireFormat.getTagFieldNumber(spanTag) == 5) { + name = spanData.readString(); + break; + } else { + spanData.skipField(spanTag); + } + } + names.add(name); + } else { + scopeSpans.skipField(tag); + } + } + return names; + } + // ── span construction ───────────────────────────────────────────────────── /** Builds {@link DDSpan} instances from the given specs, collecting them in order. */ @@ -837,17 +778,17 @@ private static List buildSpans(List specs) { * Resource { repeated KeyValue attributes = 1; } * */ - private static void verifyResource(CodedInputStream res) throws IOException { + private static void verifyResource(CodedInputStream resource) throws IOException { boolean foundServiceName = false; - while (!res.isAtEnd()) { - int tag = res.readTag(); + while (!resource.isAtEnd()) { + int tag = resource.readTag(); if (WireFormat.getTagFieldNumber(tag) == 1) { - String key = readKeyValueKey(res.readBytes().newCodedInput()); + String key = readKeyValueKey(resource.readBytes().newCodedInput()); if ("service.name".equals(key)) { foundServiceName = true; } } else { - res.skipField(tag); + resource.skipField(tag); } } assertTrue(foundServiceName, "Resource must contain a 'service.name' attribute"); @@ -883,7 +824,8 @@ private static void verifyDefaultScope(CodedInputStream scope) throws IOExceptio * links=13 (repeated), status=15, flags=16 } * */ - private static void verifySpan(CodedInputStream sp, DDSpan span, SpanSpec spec, String caseName) + private static void verifySpan( + CodedInputStream spanData, DDSpan originalSpan, SpanSpec spec, String caseName) throws IOException { byte[] parsedTraceId = null; byte[] parsedSpanId = null; @@ -899,44 +841,44 @@ private static void verifySpan(CodedInputStream sp, DDSpan span, SpanSpec spec, Set attrKeys = new HashSet<>(); int linkCount = 0; - while (!sp.isAtEnd()) { - int tag = sp.readTag(); + while (!spanData.isAtEnd()) { + int tag = spanData.readTag(); switch (WireFormat.getTagFieldNumber(tag)) { case 1: - parsedTraceId = sp.readBytes().toByteArray(); + parsedTraceId = spanData.readBytes().toByteArray(); break; case 2: - parsedSpanId = sp.readBytes().toByteArray(); + parsedSpanId = spanData.readBytes().toByteArray(); break; case 3: - sp.skipField( + spanData.skipField( tag); // trace_state: absent for locally-started spans, present when propagated break; case 4: - parsedParentSpanId = sp.readBytes().toByteArray(); + parsedParentSpanId = spanData.readBytes().toByteArray(); break; case 5: - parsedName = sp.readString(); + parsedName = spanData.readString(); break; case 6: - parsedKind = sp.readEnum(); + parsedKind = spanData.readEnum(); break; case 7: - parsedStartNano = sp.readFixed64(); + parsedStartNano = spanData.readFixed64(); break; case 8: - parsedEndNano = sp.readFixed64(); + parsedEndNano = spanData.readFixed64(); break; case 9: - attrKeys.add(readKeyValueKey(sp.readBytes().newCodedInput())); + attrKeys.add(readKeyValueKey(spanData.readBytes().newCodedInput())); break; case 13: - verifyLink(sp.readBytes().newCodedInput(), spec.links[linkCount], caseName); + verifyLink(spanData.readBytes().newCodedInput(), spec.links[linkCount], caseName); linkCount++; break; case 15: { - CodedInputStream status = sp.readBytes().newCodedInput(); + CodedInputStream status = spanData.readBytes().newCodedInput(); statusFound = true; while (!status.isAtEnd()) { int st = status.readTag(); @@ -954,10 +896,10 @@ private static void verifySpan(CodedInputStream sp, DDSpan span, SpanSpec spec, break; } case 16: - parsedFlags = sp.readFixed32(); + parsedFlags = spanData.readFixed32(); break; default: - sp.skipField(tag); + spanData.skipField(tag); } } @@ -977,7 +919,9 @@ private static void verifySpan(CodedInputStream sp, DDSpan span, SpanSpec spec, assertNotNull(parsedSpanId, "span_id must be present [" + caseName + "]"); assertEquals(8, parsedSpanId.length, "span_id must be 8 bytes [" + caseName + "]"); assertEquals( - span.getSpanId(), readBigEndianLong(parsedSpanId), "span_id mismatch [" + caseName + "]"); + originalSpan.getSpanId(), + readBigEndianLong(parsedSpanId), + "span_id mismatch [" + caseName + "]"); // ── parent_span_id (field 4) ────────────────────────────────────────────── if (spec.parentIndex >= 0) { @@ -986,7 +930,7 @@ private static void verifySpan(CodedInputStream sp, DDSpan span, SpanSpec spec, assertEquals( 8, parsedParentSpanId.length, "parent_span_id must be 8 bytes [" + caseName + "]"); assertEquals( - span.getParentId(), + originalSpan.getParentId(), readBigEndianLong(parsedParentSpanId), "parent_span_id mismatch [" + caseName + "]"); } else { @@ -1177,17 +1121,15 @@ private static int expectedKind(String spanKind) { * Reads a {@code KeyValue} body and returns the key (field 1). The value is skipped; its encoding * is covered by {@code OtlpCommonProtoTest}. */ - private static String readKeyValueKey(CodedInputStream kv) throws IOException { - String key = null; - while (!kv.isAtEnd()) { - int tag = kv.readTag(); + private static String readKeyValueKey(CodedInputStream keyValue) throws IOException { + while (!keyValue.isAtEnd()) { + int tag = keyValue.readTag(); if (WireFormat.getTagFieldNumber(tag) == 1) { - key = kv.readString(); - } else { - kv.skipField(tag); + return keyValue.readString(); } + keyValue.skipField(tag); } - return key; + return null; } /** Reads a big-endian 64-bit value from the first 8 bytes of the given array. */