Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions datadog-ipc/src/shm_stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -818,6 +818,7 @@ impl ShmSpanConcentrator {
.unwrap_or_default(),
service_source: read_str!(f.service_source),
span_derived_primary_tags: vec![],
additional_metric_tags: vec![],
}
}
}
Expand Down
1 change: 1 addition & 0 deletions libdd-data-pipeline/src/trace_exporter/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -541,6 +541,7 @@ impl TraceExporterBuilder {
std::time::SystemTime::now(),
span_kinds,
self.peer_tags.clone(),
vec![],
#[cfg(feature = "stats-obfuscation")]
None,
)));
Expand Down
1 change: 1 addition & 0 deletions libdd-data-pipeline/src/trace_exporter/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ pub(crate) fn start_stats_computation<
std::time::SystemTime::now(),
span_kinds,
peer_tags,
vec![],
#[cfg(feature = "stats-obfuscation")]
Some(client_side_stats.obfuscation_config.clone()),
)));
Expand Down
10 changes: 9 additions & 1 deletion libdd-trace-protobuf/src/pb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -654,18 +654,26 @@ pub struct ClientGroupedStats {
#[serde(rename = "HTTPEndpoint")]
pub http_endpoint: ::prost::alloc::string::String,
/// @inject_tag: msg:"srv_src"
/// used to identify service override origin
#[prost(string, tag = "21")]
#[serde(default)]
#[serde(rename = "srv_src")]
pub service_source: ::prost::alloc::string::String,
/// used to identify service override origin
/// span_derived_primary_tags are user-configured tags that are extracted from spans and used for stats aggregation
/// E.g., `aws.s3.bucket`, `http.url`, or any custom tag
/// Deprecated: use additional_metric_tags (field 23) instead.
#[prost(string, repeated, tag = "22")]
#[serde(default)]
pub span_derived_primary_tags: ::prost::alloc::vec::Vec<
::prost::alloc::string::String,
>,
/// additional_metric_tags are tags to be used as additional dimensions for stats aggregation
/// E.g., `aws.s3.bucket`, `http.url`, or any custom tag
#[prost(string, repeated, tag = "23")]
#[serde(default)]
pub additional_metric_tags: ::prost::alloc::vec::Vec<
::prost::alloc::string::String,
>,
}
/// Trilean is an expanded boolean type that is meant to differentiate between being unset and false.
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)]
Expand Down
1 change: 1 addition & 0 deletions libdd-trace-stats/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ let mut concentrator = SpanConcentrator::new(
SystemTime::now(),
vec!["client".to_string(), "server".to_string()], // eligible span kinds
vec!["peer.service".to_string()], // peer tag keys
vec!["example.key".to_string()], // additional metric tag keys
);

// Add spans
Expand Down
1 change: 1 addition & 0 deletions libdd-trace-stats/benches/span_concentrator_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ pub fn criterion_benchmark(c: &mut Criterion) {
now,
vec![],
vec!["db_name".into(), "bucket_s3".into()],
vec![],
#[cfg(feature = "stats-obfuscation")]
None,
);
Expand Down
162 changes: 144 additions & 18 deletions libdd-trace-stats/src/span_concentrator/aggregation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,13 @@ use libdd_trace_obfuscation::ip_address::quantize_peer_ip_addresses;
use libdd_trace_protobuf::pb;
use libdd_trace_utils::span::SpanText;
use std::borrow::{Borrow, Cow};
use tracing::warn;

use crate::span_concentrator::StatSpan;

const TAG_STATUS_CODE: &str = "http.status_code";
const ADDITIONAL_METRIC_TAG_VALUE_MAX_LEN: usize = 200;
const TRACER_BLOCKED_VALUE: &str = "tracer_blocked_value";
const TAG_SYNTHETICS: &str = "synthetics";
const TAG_SPANKIND: &str = "span.kind";
const TAG_ORIGIN: &str = "_dd.origin";
Expand Down Expand Up @@ -82,6 +85,7 @@ impl<T> FixedAggregationKey<T> {
pub(super) struct BorrowedAggregationKey<'a> {
fixed: FixedAggregationKey<&'a str>,
peer_tags: Vec<(&'a str, Cow<'a, str>)>,
additional_metric_tags: Vec<(&'a str, &'a str)>,
}

impl hashbrown::Equivalent<OwnedAggregationKey> for BorrowedAggregationKey<'_> {
Expand All @@ -94,6 +98,12 @@ impl hashbrown::Equivalent<OwnedAggregationKey> for BorrowedAggregationKey<'_> {
.iter()
.zip(other.peer_tags.iter())
.all(|((k1, v1), (k2, v2))| k1 == k2 && v1 == v2)
&& self.additional_metric_tags.len() == other.additional_metric_tags.len()
&& self
.additional_metric_tags
.iter()
.zip(other.additional_metric_tags.iter())
.all(|((k1, v1), (k2, v2))| k1 == k2 && v1 == v2)
}
}

Expand All @@ -108,6 +118,7 @@ impl hashbrown::Equivalent<OwnedAggregationKey> for BorrowedAggregationKey<'_> {
pub(super) struct OwnedAggregationKey {
fixed: FixedAggregationKey<String>,
peer_tags: Vec<(String, String)>,
additional_metric_tags: Vec<(String, String)>,
}

impl From<&BorrowedAggregationKey<'_>> for OwnedAggregationKey {
Expand All @@ -119,6 +130,11 @@ impl From<&BorrowedAggregationKey<'_>> for OwnedAggregationKey {
.iter()
.map(|(k, v)| (k.to_string(), v.to_string()))
.collect(),
additional_metric_tags: value
.additional_metric_tags
.iter()
.map(|(k, v)| (k.to_string(), v.to_string()))
.collect(),
}
}
}
Expand Down Expand Up @@ -212,16 +228,27 @@ fn grpc_status_str_to_int_value(v: &str) -> Option<u8> {
impl<'a> BorrowedAggregationKey<'a> {
/// Return an AggregationKey matching the given span.
///
/// If `peer_tags_keys` is not empty then the peer tags of the span will be included in the
/// If `peer_tag_keys` is not empty then the peer tags of the span will be included in the
/// key.
pub(super) fn from_span<T: StatSpan<'a>>(span: &'a T, peer_tag_keys: &'a [String]) -> Self {
Self::from_obfuscated_span(span.resource(), span, peer_tag_keys)
/// If `additional_metric_tags` is not empty then matching span tags keys are included in the key.
pub(super) fn from_span<T: StatSpan<'a>>(
span: &'a T,
peer_tag_keys: &'a [String],
additional_metric_tag_keys: &'a [String],
) -> Self {
Self::from_obfuscated_span(
span.resource(),
span,
peer_tag_keys,
additional_metric_tag_keys,
)
}

pub(crate) fn from_obfuscated_span<'b, T>(
resource_name: &'a str,
span: &'b T,
peer_tag_keys: &'b [String],
additional_metric_tag_keys: &'b [String],
) -> BorrowedAggregationKey<'a>
where
T: StatSpan<'b>,
Expand Down Expand Up @@ -265,6 +292,24 @@ impl<'a> BorrowedAggregationKey<'a> {

let service_source = span.get_meta(TAG_SVC_SRC).unwrap_or_default();

let additional_metric_tags: Vec<(&'a str, &'a str)> = additional_metric_tag_keys
.iter()
.filter_map(|key| match span.get_meta(key.as_str()) {
Some(v) if !v.is_empty() => {
if v.len() > ADDITIONAL_METRIC_TAG_VALUE_MAX_LEN {
warn!(
"additional_metric_tags: value for key '{}' exceeds {} characters; substituting tracer_blocked_value",
key, ADDITIONAL_METRIC_TAG_VALUE_MAX_LEN,
);
Some((key.as_str(), TRACER_BLOCKED_VALUE))
} else {
Some((key.as_str(), v))
}
}
_ => None,
})
.collect();

Self {
fixed: FixedAggregationKey {
resource_name,
Expand All @@ -283,6 +328,25 @@ impl<'a> BorrowedAggregationKey<'a> {
is_trace_root: span.is_trace_root(),
},
peer_tags,
additional_metric_tags,
}
}

/// Return an owned copy of this key with all additional metric tag values replaced by
/// `TRACER_BLOCKED_VALUE`. Used when the per-bucket stat-entry limit is exceeded.
pub(super) fn into_masked_owned(self) -> OwnedAggregationKey {
OwnedAggregationKey {
fixed: self.fixed.convert(str::to_owned),
peer_tags: self
.peer_tags
.iter()
.map(|(k, v)| (k.to_string(), v.to_string()))
.collect(),
additional_metric_tags: self
.additional_metric_tags
.iter()
.map(|(k, _)| (k.to_string(), TRACER_BLOCKED_VALUE.to_string()))
.collect(),
}
}
}
Expand Down Expand Up @@ -312,6 +376,14 @@ impl From<pb::ClientGroupedStats> for OwnedAggregationKey {
Some((key.to_string(), value.to_string()))
})
.collect(),
additional_metric_tags: value
.additional_metric_tags
.into_iter()
.filter_map(|t| {
let (key, value) = t.split_once(':')?;
Some((key.to_string(), value.to_string()))
})
.collect(),
}
}
}
Expand Down Expand Up @@ -414,19 +486,31 @@ pub struct OtlpStatsBucket {
pub(super) struct StatsBucket {
data: HashMap<OwnedAggregationKey, GroupedStats>,
start: u64,
/// Number of distinct entries with additional metric tags admitted this bucket.
additional_metric_tags_entry_count: usize,
/// Maximum distinct entries with additional metric tags per bucket.
additional_metric_tags_cardinality_limit: usize,
}

impl StatsBucket {
/// Return a new StatsBucket starting at the given timestamp
pub(super) fn new(start_timestamp: u64) -> Self {
/// Return a new StatsBucket starting at the given timestamp.
/// `additional_metric_tags_cardinality_limit` limits the number of distinct aggregation keys that include
/// additional metric tags; overflow entries have their tag values masked to TRACER_BLOCKED_VALUE.
pub(super) fn new(start_timestamp: u64, additional_metric_tags_cardinality_limit: usize) -> Self {
Self {
data: HashMap::new(),
start: start_timestamp,
additional_metric_tags_entry_count: 0,
additional_metric_tags_cardinality_limit,
}
}

/// Insert a value as stats in the group corresponding to the aggregation key, if it does
/// not exist it creates it.
///
/// If the key has additional metric tags and would create a new entry beyond the limit, all
/// additional tag values are replaced with `TRACER_BLOCKED_VALUE` before insertion. Keys that
/// already exist in this bucket always merge normally regardless of the limit.
pub(super) fn insert(
&mut self,
key: BorrowedAggregationKey<'_>,
Expand All @@ -435,13 +519,47 @@ impl StatsBucket {
is_top_level: bool,
grpc_method: &str,
) {
self.data
.entry_ref(&key)
.or_insert_with(|| GroupedStats {
grpc_method: grpc_method.to_owned(),
..Default::default()
})
.insert(duration, is_error, is_top_level);
if key.additional_metric_tags.is_empty() {
self.data
.entry_ref(&key)
.or_insert_with(|| GroupedStats {
grpc_method: grpc_method.to_owned(),
..Default::default()
})
.insert(duration, is_error, is_top_level);
return;
}

// Key has additional metric tags: check existence before applying the limit.
match self.data.entry_ref(&key) {
hashbrown::hash_map::EntryRef::Occupied(mut e) => {
// Already exists — merge normally, no limit check needed.
e.get_mut().insert(duration, is_error, is_top_level);
}
hashbrown::hash_map::EntryRef::Vacant(e) => {
if self.additional_metric_tags_entry_count < self.additional_metric_tags_cardinality_limit {
// Under limit — admit new entry.
self.additional_metric_tags_entry_count += 1;
e.insert(GroupedStats {
grpc_method: grpc_method.to_owned(),
..Default::default()
})
.insert(duration, is_error, is_top_level);
} else {
// Cap exceeded — mask tag values and merge into the overflow entry.
// Drop the vacant entry to release the mutable borrow before re-entering.
drop(e);
let masked = key.into_masked_owned();
self.data
.entry(masked)
.or_insert_with(|| GroupedStats {
grpc_method: grpc_method.to_owned(),
..Default::default()
})
.insert(duration, is_error, is_top_level);
}
}
}
}

/// Consume the bucket and return a ClientStatsBucket containing the bucket stats.
Expand Down Expand Up @@ -524,7 +642,12 @@ fn encode_grouped_stats(key: OwnedAggregationKey, group: GroupedStats) -> pb::Cl
.map(|c| c.to_string())
.unwrap_or_default(),
service_source: f.service_source,
span_derived_primary_tags: vec![], // Todo
span_derived_primary_tags: vec![], // Deprecated
additional_metric_tags: key
.additional_metric_tags
.into_iter()
.map(|(k, v)| format!("{k}:{v}"))
.collect(),
}
}

Expand All @@ -547,12 +670,14 @@ mod tests {
OwnedAggregationKey {
fixed: self,
peer_tags: vec![],
additional_metric_tags: vec![],
}
}
fn into_key_with_peers(self, peer_tags: Vec<(String, String)>) -> OwnedAggregationKey {
OwnedAggregationKey {
fixed: self,
peer_tags,
additional_metric_tags: vec![],
}
}
}
Expand Down Expand Up @@ -1065,7 +1190,7 @@ mod tests {
];

for (span, expected_key) in test_cases {
let borrowed_key = BorrowedAggregationKey::from_span(&span, &[]);
let borrowed_key = BorrowedAggregationKey::from_span(&span, &[], &[]);
assert_eq!(
OwnedAggregationKey::from(&borrowed_key),
expected_key,
Expand All @@ -1078,7 +1203,8 @@ mod tests {
}

for (span, expected_key) in test_cases_with_peer_tags {
let borrowed_key = BorrowedAggregationKey::from_span(&span, test_peer_tags.as_slice());
let borrowed_key =
BorrowedAggregationKey::from_span(&span, test_peer_tags.as_slice(), &[]);
assert_eq!(OwnedAggregationKey::from(&borrowed_key), expected_key);
assert_eq!(
get_hash(&borrowed_key),
Expand Down Expand Up @@ -1106,7 +1232,7 @@ mod tests {
.into(),
..Default::default()
};
let key = BorrowedAggregationKey::from_span(&span_ipv4, &peer_tag_keys);
let key = BorrowedAggregationKey::from_span(&span_ipv4, &peer_tag_keys, &[]);
let owned = OwnedAggregationKey::from(&key);
assert_eq!(
owned.peer_tags,
Expand Down Expand Up @@ -1134,7 +1260,7 @@ mod tests {
..Default::default()
};
let ipv6_keys = vec!["peer.hostname".to_string()];
let key = BorrowedAggregationKey::from_span(&span_ipv6, &ipv6_keys);
let key = BorrowedAggregationKey::from_span(&span_ipv6, &ipv6_keys, &[]);
let owned = OwnedAggregationKey::from(&key);
assert_eq!(
owned.peer_tags,
Expand All @@ -1155,7 +1281,7 @@ mod tests {
..Default::default()
};
let non_ip_keys = vec!["db.instance".to_string()];
let key = BorrowedAggregationKey::from_span(&span_non_ip, &non_ip_keys);
let key = BorrowedAggregationKey::from_span(&span_non_ip, &non_ip_keys, &[]);
let owned = OwnedAggregationKey::from(&key);
assert_eq!(
owned.peer_tags,
Expand Down
Loading
Loading