From 594c82e9df8bb57ff158da5d598470425eaf8022 Mon Sep 17 00:00:00 2001 From: mnajafian-nv Date: Tue, 23 Jun 2026 13:44:08 -0700 Subject: [PATCH 1/2] refactor: share exporter fallback extraction Signed-off-by: mnajafian-nv --- crates/core/src/observability/atif.rs | 116 ++-- .../src/observability/manual_llm_fallback.rs | 570 ++++++++++++++++++ crates/core/src/observability/mod.rs | 1 + .../core/src/observability/openinference.rs | 204 +------ crates/core/src/observability/otel.rs | 157 +---- 5 files changed, 633 insertions(+), 415 deletions(-) create mode 100644 crates/core/src/observability/manual_llm_fallback.rs diff --git a/crates/core/src/observability/atif.rs b/crates/core/src/observability/atif.rs index 4d4d1f31..d0219065 100644 --- a/crates/core/src/observability/atif.rs +++ b/crates/core/src/observability/atif.rs @@ -41,6 +41,12 @@ use crate::api::subscriber::flush_subscribers; use crate::codec::response::{Usage, estimate_cost_for_provider}; use crate::error::Result; use crate::json::Json; +use crate::observability::manual_llm_fallback::{ + manual_cost_estimate_from_usage, manual_model_name_from_llm_output, + manual_reasoning_content_from_llm_output, manual_reasoning_effort_from_llm_input, + manual_tool_call_arguments, manual_tool_call_id, manual_tool_call_name, + manual_usage_fields_from_preferred_token_usage, +}; /// The ATIF schema version string embedded in all exported trajectories. /// @@ -690,21 +696,34 @@ fn extract_metrics( model_name: Option<&str>, ) -> Option { let usage = token_usage_object(output)?; - let prompt = usage_u64(usage, &["prompt_tokens", "input_tokens"]); - let completion = usage_u64(usage, &["completion_tokens", "output_tokens"]); - let cache_read = usage_u64(usage, &["cached_tokens"]) + let fallback_fields = manual_usage_fields_from_preferred_token_usage(Some(output)); + let prompt = fallback_fields + .as_ref() + .and_then(|fields| fields.prompt_tokens) + .or_else(|| usage_u64(usage, &["prompt_tokens", "input_tokens"])); + let completion = fallback_fields + .as_ref() + .and_then(|fields| fields.completion_tokens) + .or_else(|| usage_u64(usage, &["completion_tokens", "output_tokens"])); + let cache_read = fallback_fields + .as_ref() + .and_then(|fields| fields.cache_read_tokens) + .or_else(|| usage_u64(usage, &["cached_tokens"])) .or_else(|| prompt_tokens_detail_u64(usage, "cached_tokens")) .or_else(|| input_tokens_detail_u64(usage, "cached_tokens")) .or_else(|| usage_u64(usage, &["cache_read_input_tokens"])); - let cache_write = usage_u64( - usage, - &["cache_creation_input_tokens", "cache_write_tokens"], - ); + let cache_write = fallback_fields + .as_ref() + .and_then(|fields| fields.cache_write_tokens) + .or_else(|| { + usage_u64( + usage, + &["cache_creation_input_tokens", "cache_write_tokens"], + ) + }); let cached = sum_options(cache_read, cache_write); - let explicit_cost = usage - .get("cost_usd") - .and_then(Json::as_f64) - .or_else(|| usage.get("cost").and_then(cost_usd_from_cost_object)); + let explicit_cost = manual_cost_estimate_from_usage(usage) + .and_then(|cost| cost.total_or_component_sum_for_currency("USD")); let has_reported_cost = usage.get("cost").is_some(); let cost = if has_reported_cost { explicit_cost @@ -717,7 +736,10 @@ fn extract_metrics( &Usage { prompt_tokens: prompt, completion_tokens: completion, - total_tokens: usage_u64(usage, &["total_tokens"]), + total_tokens: fallback_fields + .as_ref() + .and_then(|fields| fields.reported_total_tokens) + .or_else(|| usage_u64(usage, &["total_tokens"])), cache_read_tokens: cache_read, cache_write_tokens: cache_write, cost: None, @@ -764,30 +786,6 @@ fn extract_metrics( }) } -fn cost_usd_from_cost_object(cost: &Json) -> Option { - let cost = cost.as_object()?; - let currency = cost.get("currency").and_then(Json::as_str); - let is_relay_normalized_cost = cost - .get("source") - .and_then(Json::as_str) - .is_some_and(|source| matches!(source, "provider_reported" | "model_pricing")); - let has_legacy_provider_total = - currency.is_none() && cost.get("total").and_then(Json::as_f64).is_some(); - let is_usd_cost = currency.is_some_and(|currency| currency.eq_ignore_ascii_case("USD")) - || currency.is_none() && (is_relay_normalized_cost || has_legacy_provider_total); - if !is_usd_cost { - return None; - } - - cost.get("total").and_then(Json::as_f64).or_else(|| { - let (has_component, component_total) = ["input", "output", "cache_read", "cache_write"] - .iter() - .filter_map(|field| cost.get(*field).and_then(Json::as_f64)) - .fold((false, 0.0), |(_, total), value| (true, total + value)); - has_component.then_some(component_total) - }) -} - fn merge_metrics( primary: Option, supplemental: Option<&AtifMetrics>, @@ -859,9 +857,7 @@ fn usage_u64(usage: &serde_json::Map, keys: &[&str]) -> Option Option<&str> { - output - .as_object() - .and_then(|object| object.get("model").and_then(Json::as_str)) + manual_model_name_from_llm_output(Some(output)) } fn sum_options(left: Option, right: Option) -> Option { @@ -893,13 +889,7 @@ fn input_tokens_detail_u64(usage: &serde_json::Map, key: &str) -> /// The request content may have `reasoning_effort` (e.g. `"high"`, `"medium"`, /// or a numeric value). Returns the value as Json for flexibility. fn extract_reasoning_effort(input: &Json) -> Option { - if let Some(obj) = input.as_object() - && let Some(v) = obj.get("reasoning_effort") - && !v.is_null() - { - return Some(v.clone()); - } - None + manual_reasoning_effort_from_llm_input(input) } /// Extract `reasoning` (reasoning_content) from an LLM response output. @@ -907,12 +897,7 @@ fn extract_reasoning_effort(input: &Json) -> Option { /// The agent's explicit internal reasoning may appear in the response under the /// `"reasoning"` key. Returns `None` if absent or not a string. fn extract_reasoning_content(output: &Json) -> Option { - if let Some(obj) = output.as_object() - && let Some(r) = obj.get("reasoning") - { - return r.as_str().map(String::from); - } - None + manual_reasoning_content_from_llm_output(output) } /// Extract the latest user-facing message from an LLM request payload. @@ -1013,32 +998,13 @@ fn extract_tool_calls(output: &Json) -> Option> { .or_else(|| anthropic_messages_tool_use_items(output))?; let mut calls = Vec::with_capacity(arr.len()); for (index, tc) in arr.iter().enumerate() { - let tc_obj = tc.as_object()?; - let mut id = tc_obj - .get("id") - .or_else(|| tc_obj.get("tool_call_id")) - .or_else(|| tc_obj.get("call_id")) - .and_then(Json::as_str) - .unwrap_or("") - .to_string(); - // The function details live under "function". - let func = tc_obj.get("function").and_then(Json::as_object); - let name = func - .and_then(|f| f.get("name")) - .or_else(|| tc_obj.get("name")) - .or_else(|| tc_obj.get("tool_name")) - .or_else(|| tc_obj.get("function_name")) - .and_then(Json::as_str) - .unwrap_or("") - .to_string(); + tc.as_object()?; + let mut id = manual_tool_call_id(tc).unwrap_or("").to_string(); + let name = manual_tool_call_name(tc).unwrap_or("").to_string(); if id.is_empty() && !name.is_empty() { id = format!("{name}:{}", index + 1); } - let raw_arguments = func - .and_then(|f| f.get("arguments")) - .or_else(|| tc_obj.get("arguments")) - .or_else(|| tc_obj.get("args")) - .or_else(|| tc_obj.get("input")); + let raw_arguments = manual_tool_call_arguments(tc); let arguments = normalize_tool_arguments(raw_arguments); // Skip entries with no id and no name — they are not meaningful. if id.is_empty() && name.is_empty() { diff --git a/crates/core/src/observability/manual_llm_fallback.rs b/crates/core/src/observability/manual_llm_fallback.rs new file mode 100644 index 00000000..98070199 --- /dev/null +++ b/crates/core/src/observability/manual_llm_fallback.rs @@ -0,0 +1,570 @@ +// SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +//! Shared manual fallback readers for exporter-local LLM observability projection. + +#[cfg(any(feature = "otel", feature = "openinference", test))] +use crate::codec::response::Usage; +use crate::codec::response::{CostEstimate, CostSource}; +use crate::json::Json; + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub(crate) struct ManualUsageFields { + pub prompt_tokens: Option, + pub completion_tokens: Option, + pub reported_total_tokens: Option, + pub cache_read_tokens: Option, + pub cache_write_tokens: Option, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum ManualUsagePrecedence { + /// Preserve OTel/OpenInference's existing behavior: try each preferred key + /// across both maps before moving to weaker aliases. + #[cfg(any(feature = "otel", feature = "openinference", test))] + KeyOrder, + /// Preserve ATIF's selected-map preference: search all aliases in + /// `token_usage` before using provider-native `usage` for the same field. + PrimaryMap, +} + +#[cfg(any(feature = "otel", feature = "openinference", test))] +pub(crate) fn manual_usage_from_llm_output_with( + output: Option<&Json>, + normalize_total_tokens: impl FnOnce(Option, Option, Option) -> Option, +) -> Option { + let fields = manual_usage_fields_from_llm_output(output)?; + Some(Usage { + prompt_tokens: fields.prompt_tokens, + completion_tokens: fields.completion_tokens, + total_tokens: normalize_total_tokens( + fields.reported_total_tokens, + fields.prompt_tokens, + fields.completion_tokens, + ), + cache_read_tokens: fields.cache_read_tokens, + cache_write_tokens: fields.cache_write_tokens, + cost: None, + }) +} + +#[cfg(any(feature = "otel", feature = "openinference", test))] +pub(crate) fn manual_usage_fields_from_llm_output( + output: Option<&Json>, +) -> Option { + let object = output?.as_object()?; + let usage = object.get("usage").and_then(Json::as_object); + let token_usage = object.get("token_usage").and_then(Json::as_object); + manual_usage_fields_from_maps(usage, token_usage, ManualUsagePrecedence::KeyOrder) +} + +pub(crate) fn manual_usage_fields_from_preferred_token_usage( + output: Option<&Json>, +) -> Option { + let object = output?.as_object()?; + let usage = object.get("usage").and_then(Json::as_object); + let token_usage = object.get("token_usage").and_then(Json::as_object); + manual_usage_fields_from_maps(token_usage, usage, ManualUsagePrecedence::PrimaryMap) +} + +fn manual_usage_fields_from_maps( + primary_usage: Option<&serde_json::Map>, + secondary_usage: Option<&serde_json::Map>, + precedence: ManualUsagePrecedence, +) -> Option { + if primary_usage.is_none() && secondary_usage.is_none() { + return None; + } + + let prompt_tokens = first_u64_from_manual_usage( + primary_usage, + secondary_usage, + &["prompt_tokens", "input_tokens", "inputTokens", "input"], + precedence, + ); + let completion_tokens = first_u64_from_manual_usage( + primary_usage, + secondary_usage, + &[ + "completion_tokens", + "output_tokens", + "completionTokens", + "outputTokens", + "output", + ], + precedence, + ); + let reported_total_tokens = first_u64_from_manual_usage( + primary_usage, + secondary_usage, + &["total_tokens", "totalTokens", "total"], + precedence, + ); + let cache_read_tokens = first_u64_from_manual_usage( + primary_usage, + secondary_usage, + &[ + "cache_read_tokens", + "cached_tokens", + "cache_read_input_tokens", + "cacheReadTokens", + "cachedTokens", + "cacheReadInputTokens", + "cacheRead", + ], + precedence, + ) + .or_else(|| { + first_nested_u64_from_manual_usage( + primary_usage, + secondary_usage, + "input_tokens_details", + "cached_tokens", + ) + }) + .or_else(|| { + first_nested_u64_from_manual_usage( + primary_usage, + secondary_usage, + "prompt_tokens_details", + "cached_tokens", + ) + }); + let cache_write_tokens = first_u64_from_manual_usage( + primary_usage, + secondary_usage, + &[ + "cache_write_tokens", + "cache_creation_input_tokens", + "cacheWriteTokens", + "cacheCreationInputTokens", + "cacheWrite", + ], + precedence, + ); + + if prompt_tokens.is_none() + && completion_tokens.is_none() + && reported_total_tokens.is_none() + && cache_read_tokens.is_none() + && cache_write_tokens.is_none() + { + return None; + } + + Some(ManualUsageFields { + prompt_tokens, + completion_tokens, + reported_total_tokens, + cache_read_tokens, + cache_write_tokens, + }) +} + +pub(crate) fn manual_model_name_from_llm_output(output: Option<&Json>) -> Option<&str> { + output?.as_object()?.get("model").and_then(Json::as_str) +} + +#[cfg(any(feature = "otel", feature = "openinference", test))] +pub(crate) fn manual_cost_estimate_from_llm_output(output: Option<&Json>) -> Option { + let object = output?.as_object()?; + let usage = object.get("usage").and_then(Json::as_object); + let token_usage = object.get("token_usage").and_then(Json::as_object); + manual_cost_estimate_from_maps(usage, token_usage) +} + +pub(crate) fn manual_cost_estimate_from_usage( + usage: &serde_json::Map, +) -> Option { + cost_estimate_from_manual_usage(usage) +} + +#[cfg(any(feature = "openinference", test))] +pub(crate) fn manual_cost_total_for_currency_from_llm_output( + output: Option<&Json>, + currency: &str, +) -> Option { + let object = output?.as_object()?; + let usage = object.get("usage").and_then(Json::as_object); + let token_usage = object.get("token_usage").and_then(Json::as_object); + manual_cost_total_for_currency_from_maps(usage, token_usage, currency) +} + +#[cfg(any(feature = "otel", feature = "openinference", test))] +fn manual_cost_estimate_from_maps( + primary_usage: Option<&serde_json::Map>, + secondary_usage: Option<&serde_json::Map>, +) -> Option { + primary_usage + .and_then(cost_estimate_from_manual_usage) + .or_else(|| secondary_usage.and_then(cost_estimate_from_manual_usage)) +} + +#[cfg(any(feature = "openinference", test))] +fn manual_cost_total_for_currency_from_maps( + primary_usage: Option<&serde_json::Map>, + secondary_usage: Option<&serde_json::Map>, + currency: &str, +) -> Option { + primary_usage + .and_then(cost_estimate_from_manual_usage) + .and_then(|cost| cost.total_or_component_sum_for_currency(currency)) + .or_else(|| { + secondary_usage + .and_then(cost_estimate_from_manual_usage) + .and_then(|cost| cost.total_or_component_sum_for_currency(currency)) + }) +} + +fn cost_estimate_from_manual_usage(usage: &serde_json::Map) -> Option { + if let Some(total) = usage.get("cost_usd").and_then(Json::as_f64) { + return Some(CostEstimate { + total: Some(total), + currency: "USD".to_string(), + input: None, + output: None, + cache_read: None, + cache_write: None, + source: CostSource::ProviderReported, + pricing_provider: None, + pricing_model: None, + pricing_as_of: None, + pricing_source: None, + }); + } + + let cost = usage.get("cost")?.as_object()?; + let estimate = CostEstimate { + total: cost.get("total").and_then(Json::as_f64), + currency: cost + .get("currency") + .and_then(Json::as_str) + .unwrap_or("USD") + .to_string(), + input: cost.get("input").and_then(Json::as_f64), + output: cost.get("output").and_then(Json::as_f64), + cache_read: cost.get("cache_read").and_then(Json::as_f64), + cache_write: cost.get("cache_write").and_then(Json::as_f64), + source: CostSource::ProviderReported, + pricing_provider: None, + pricing_model: None, + pricing_as_of: None, + pricing_source: None, + }; + estimate.total_or_component_sum().map(|_| estimate) +} + +fn first_u64_from_manual_usage( + usage: Option<&serde_json::Map>, + token_usage: Option<&serde_json::Map>, + keys: &[&str], + precedence: ManualUsagePrecedence, +) -> Option { + match precedence { + #[cfg(any(feature = "otel", feature = "openinference", test))] + ManualUsagePrecedence::KeyOrder => keys.iter().find_map(|key| { + usage + .and_then(|value| value.get(*key).and_then(Json::as_u64)) + .or_else(|| token_usage.and_then(|value| value.get(*key).and_then(Json::as_u64))) + }), + ManualUsagePrecedence::PrimaryMap => usage + .and_then(|value| first_u64(value, keys)) + .or_else(|| token_usage.and_then(|value| first_u64(value, keys))), + } +} + +fn first_nested_u64_from_manual_usage( + usage: Option<&serde_json::Map>, + token_usage: Option<&serde_json::Map>, + parent_key: &str, + child_key: &str, +) -> Option { + usage + .and_then(|value| nested_u64(value, parent_key, child_key)) + .or_else(|| token_usage.and_then(|value| nested_u64(value, parent_key, child_key))) +} + +fn nested_u64( + usage: &serde_json::Map, + parent_key: &str, + child_key: &str, +) -> Option { + usage + .get(parent_key) + .and_then(Json::as_object) + .and_then(|details| details.get(child_key)) + .and_then(Json::as_u64) +} + +fn first_u64(usage: &serde_json::Map, keys: &[&str]) -> Option { + keys.iter() + .find_map(|key| usage.get(*key).and_then(Json::as_u64)) +} + +pub(crate) fn manual_reasoning_effort_from_llm_input(input: &Json) -> Option { + input + .as_object()? + .get("reasoning_effort") + .filter(|value| !value.is_null()) + .cloned() +} + +pub(crate) fn manual_reasoning_content_from_llm_output(output: &Json) -> Option { + output + .as_object()? + .get("reasoning") + .and_then(Json::as_str) + .map(String::from) +} + +pub(crate) fn manual_tool_call_id(tool_call: &Json) -> Option<&str> { + tool_call + .get("id") + .or_else(|| tool_call.get("tool_call_id")) + .or_else(|| tool_call.get("call_id")) + .and_then(Json::as_str) +} + +pub(crate) fn manual_tool_call_name(tool_call: &Json) -> Option<&str> { + tool_call + .get("function") + .and_then(|function| function.get("name")) + .and_then(Json::as_str) + .or_else(|| tool_call.get("name").and_then(Json::as_str)) + .or_else(|| tool_call.get("tool_name").and_then(Json::as_str)) + .or_else(|| tool_call.get("toolName").and_then(Json::as_str)) + .or_else(|| tool_call.get("function_name").and_then(Json::as_str)) +} + +pub(crate) fn manual_tool_call_arguments(tool_call: &Json) -> Option<&Json> { + tool_call + .get("function") + .and_then(|function| function.get("arguments")) + .or_else(|| tool_call.get("arguments")) + .or_else(|| tool_call.get("args")) + .or_else(|| tool_call.get("input")) +} + +#[cfg(any(feature = "openinference", test))] +pub(crate) fn manual_replay_llm_payload(input: &Json) -> Option<&Json> { + let content = input.as_object().and_then(|object| object.get("content"))?; + let content_object = content.as_object()?; + is_openclaw_replay_payload(content_object).then_some(content) +} + +#[cfg(any(feature = "openinference", test))] +pub(crate) fn manual_replay_llm_response(output: &Json) -> Option<&Json> { + output + .as_object() + .and_then(|object| object.get("openclaw")) + .and_then(Json::as_object) + .map(|_| output) +} + +#[cfg(any(feature = "openinference", test))] +fn is_openclaw_replay_payload(content: &serde_json::Map) -> bool { + content + .get("source") + .and_then(Json::as_str) + .is_some_and(|source| source.starts_with("openclaw.")) + || content.contains_key("placeholderRequest") +} + +#[cfg(test)] +mod tests { + use super::{ + manual_cost_estimate_from_llm_output, manual_cost_estimate_from_usage, + manual_cost_total_for_currency_from_llm_output, manual_model_name_from_llm_output, + manual_reasoning_content_from_llm_output, manual_reasoning_effort_from_llm_input, + manual_replay_llm_payload, manual_replay_llm_response, manual_tool_call_arguments, + manual_tool_call_id, manual_tool_call_name, manual_usage_fields_from_llm_output, + manual_usage_fields_from_preferred_token_usage, + }; + use serde_json::json; + + #[test] + fn manual_usage_fields_support_aliases_and_nested_cache_details() { + let output = json!({ + "usage": { + "inputTokens": 11, + "outputTokens": 7, + "totalTokens": 18, + "prompt_tokens_details": {"cached_tokens": 5}, + "cacheWriteTokens": 2 + } + }); + + let fields = manual_usage_fields_from_llm_output(Some(&output)).unwrap(); + assert_eq!(fields.prompt_tokens, Some(11)); + assert_eq!(fields.completion_tokens, Some(7)); + assert_eq!(fields.reported_total_tokens, Some(18)); + assert_eq!(fields.cache_read_tokens, Some(5)); + assert_eq!(fields.cache_write_tokens, Some(2)); + } + + #[test] + fn manual_usage_fields_can_preserve_token_usage_precedence() { + let output = json!({ + "usage": {"prompt_tokens": 1}, + "token_usage": {"inputTokens": 2} + }); + + let fields = manual_usage_fields_from_preferred_token_usage(Some(&output)).unwrap(); + assert_eq!(fields.prompt_tokens, Some(2)); + } + + #[test] + fn manual_usage_fields_preserve_key_order_for_manual_output() { + let output = json!({ + "usage": {"input": 1}, + "token_usage": {"prompt_tokens": 2} + }); + + let fields = manual_usage_fields_from_llm_output(Some(&output)).unwrap(); + assert_eq!(fields.prompt_tokens, Some(2)); + } + + #[test] + fn manual_cost_estimate_preserves_currency_and_component_costs() { + let output = json!({ + "model": "demo-model", + "usage": { + "cost": { + "currency": "EUR", + "input": 0.25, + "output": 0.5, + "cache_read": 0.125 + } + } + }); + + let cost = manual_cost_estimate_from_llm_output(Some(&output)).unwrap(); + assert_eq!( + manual_model_name_from_llm_output(Some(&output)), + Some("demo-model") + ); + assert_eq!(cost.currency, "EUR"); + assert_eq!(cost.total, None); + assert_eq!(cost.input, Some(0.25)); + assert_eq!(cost.output, Some(0.5)); + assert_eq!(cost.cache_read, Some(0.125)); + assert_eq!(cost.total_or_component_sum(), Some(0.875)); + } + + #[test] + fn manual_cost_estimate_prefers_explicit_cost_usd() { + let output = json!({ + "usage": { + "cost_usd": 0.005, + "cost": { + "currency": "EUR", + "total": 9.0 + } + } + }); + + let cost = manual_cost_estimate_from_llm_output(Some(&output)).unwrap(); + assert_eq!(cost.currency, "USD"); + assert_eq!(cost.total, Some(0.005)); + assert_eq!(cost.total_for_currency("USD"), Some(0.005)); + } + + #[test] + fn manual_cost_estimate_falls_through_amountless_primary_cost() { + let output = json!({ + "usage": { + "cost": { + "currency": "USD" + } + }, + "token_usage": { + "cost_usd": 0.005 + } + }); + + let cost = manual_cost_estimate_from_llm_output(Some(&output)).unwrap(); + assert_eq!(cost.currency, "USD"); + assert_eq!(cost.total, Some(0.005)); + } + + #[test] + fn manual_cost_estimate_from_usage_returns_none_without_amounts() { + let usage = json!({ + "cost": { + "currency": "USD" + } + }); + + let usage = usage.as_object().unwrap(); + assert!(manual_cost_estimate_from_usage(usage).is_none()); + } + + #[test] + fn manual_cost_total_for_currency_falls_through_non_matching_currency() { + let output = json!({ + "usage": { + "cost": { + "currency": "EUR", + "total": 9.0 + } + }, + "token_usage": { + "cost_usd": 0.005 + } + }); + + assert_eq!( + manual_cost_total_for_currency_from_llm_output(Some(&output), "USD"), + Some(0.005) + ); + } + + #[test] + fn manual_reasoning_and_tool_call_readers_cover_provider_aliases() { + let input = json!({"reasoning_effort": "high"}); + let output = json!({"reasoning": "chain summary"}); + let tool_call = json!({ + "call_id": "call_1", + "tool_name": "read_file", + "args": {"path": "README.md"} + }); + + assert_eq!( + manual_reasoning_effort_from_llm_input(&input), + Some(json!("high")) + ); + assert_eq!( + manual_reasoning_content_from_llm_output(&output), + Some("chain summary".to_string()) + ); + assert_eq!(manual_tool_call_id(&tool_call), Some("call_1")); + assert_eq!(manual_tool_call_name(&tool_call), Some("read_file")); + assert_eq!( + manual_tool_call_arguments(&tool_call), + Some(&json!({"path": "README.md"})) + ); + } + + #[test] + fn manual_replay_payload_readers_detect_openclaw_shapes() { + let input = json!({ + "content": { + "source": "openclaw.llm_input", + "messages": [{"role": "user", "content": "hi"}] + } + }); + let output = json!({ + "openclaw": {"model": "demo"}, + "content": "hello" + }); + + assert_eq!( + manual_replay_llm_payload(&input).and_then(|payload| payload.get("source")), + Some(&json!("openclaw.llm_input")) + ); + assert_eq!( + manual_replay_llm_response(&output).and_then(|payload| payload.get("content")), + Some(&json!("hello")) + ); + } +} diff --git a/crates/core/src/observability/mod.rs b/crates/core/src/observability/mod.rs index 08aa2de3..967e4fb7 100644 --- a/crates/core/src/observability/mod.rs +++ b/crates/core/src/observability/mod.rs @@ -13,6 +13,7 @@ pub(crate) fn test_mutex() -> &'static Mutex<()> { pub mod atif; pub mod atof; +pub(crate) mod manual_llm_fallback; #[cfg(feature = "openinference")] pub mod openinference; #[cfg(feature = "otel")] diff --git a/crates/core/src/observability/openinference.rs b/crates/core/src/observability/openinference.rs index 54be164e..329b8a3a 100644 --- a/crates/core/src/observability/openinference.rs +++ b/crates/core/src/observability/openinference.rs @@ -32,6 +32,11 @@ use crate::codec::response::{ }; use crate::error::FlowError; use crate::json::Json; +use crate::observability::manual_llm_fallback::{ + manual_cost_total_for_currency_from_llm_output, manual_model_name_from_llm_output, + manual_replay_llm_payload, manual_replay_llm_response, manual_tool_call_arguments, + manual_tool_call_id, manual_tool_call_name, manual_usage_from_llm_output_with, +}; use chrono::{DateTime, Utc}; use openinference_semantic_conventions::SpanKind as OpenInferenceSpanKind; use openinference_semantic_conventions::attributes as oi; @@ -937,25 +942,11 @@ fn message_content_text(content: &MessageContent) -> Option { } fn replay_llm_payload(input: &Json) -> Option<&Json> { - let content = input.as_object().and_then(|object| object.get("content"))?; - let content_object = content.as_object()?; - is_openclaw_replay_payload(content_object).then_some(content) + manual_replay_llm_payload(input) } fn replay_llm_response(output: &Json) -> Option<&Json> { - output - .as_object() - .and_then(|object| object.get("openclaw")) - .and_then(Json::as_object) - .map(|_| output) -} - -fn is_openclaw_replay_payload(content: &serde_json::Map) -> bool { - content - .get("source") - .and_then(Json::as_str) - .is_some_and(|source| source.starts_with("openclaw.")) - || content.contains_key("placeholderRequest") + manual_replay_llm_response(output) } fn push_replay_input_messages(attributes: &mut Vec, input: &Json) { @@ -1024,7 +1015,7 @@ fn push_raw_output_tool_calls( attributes, message_index, call_index, - tool_call.get("id").and_then(Json::as_str), + manual_tool_call_id(tool_call), raw_tool_call_name(tool_call), raw_tool_call_arguments(tool_call).and_then(|value| { value @@ -1037,20 +1028,11 @@ fn push_raw_output_tool_calls( } fn raw_tool_call_name(tool_call: &Json) -> Option<&str> { - tool_call - .get("function") - .and_then(|function| function.get("name")) - .and_then(Json::as_str) - .or_else(|| tool_call.get("name").and_then(Json::as_str)) - .or_else(|| tool_call.get("toolName").and_then(Json::as_str)) + manual_tool_call_name(tool_call) } fn raw_tool_call_arguments(tool_call: &Json) -> Option<&Json> { - tool_call - .get("function") - .and_then(|function| function.get("arguments")) - .or_else(|| tool_call.get("arguments")) - .or_else(|| tool_call.get("input")) + manual_tool_call_arguments(tool_call) } fn push_output_tool_call( @@ -1098,12 +1080,7 @@ fn finish_reason_value(reason: &FinishReason) -> String { } fn cost_total_from_manual_llm_output(output: Option<&Json>) -> Option { - let object = output?.as_object()?; - let usage = object.get("usage").and_then(Json::as_object); - let token_usage = object.get("token_usage").and_then(Json::as_object); - usage - .and_then(cost_total_from_usage) - .or_else(|| token_usage.and_then(cost_total_from_usage)) + manual_cost_total_for_currency_from_llm_output(output, "USD") } fn cost_total_from_llm_event(event: &Event, fallback_usage: Option<&Usage>) -> Option { @@ -1132,116 +1109,11 @@ fn cost_total_from_llm_event(event: &Event, fallback_usage: Option<&Usage>) -> O } fn model_name_from_manual_llm_output(output: Option<&Json>) -> Option<&str> { - output?.as_object()?.get("model").and_then(Json::as_str) -} - -fn cost_total_from_usage(usage: &serde_json::Map) -> Option { - usage.get("cost_usd").and_then(Json::as_f64).or_else(|| { - let cost = usage.get("cost")?.as_object()?; - let currency = cost.get("currency").and_then(Json::as_str); - let is_usd_cost = currency.is_none_or(|currency| currency.eq_ignore_ascii_case("USD")); - if !is_usd_cost { - return None; - } - cost.get("total").and_then(Json::as_f64).or_else(|| { - let (has_component, component_total) = ["input", "output", "cache_read", "cache_write"] - .iter() - .filter_map(|field| cost.get(*field).and_then(Json::as_f64)) - .fold((false, 0.0), |(_, total), value| (true, total + value)); - has_component.then_some(component_total) - }) - }) + manual_model_name_from_llm_output(output) } fn usage_from_manual_llm_output(output: Option<&Json>) -> Option { - let object = output?.as_object()?; - let usage = object.get("usage").and_then(Json::as_object); - let token_usage = object.get("token_usage").and_then(Json::as_object); - if usage.is_none() && token_usage.is_none() { - return None; - } - - let prompt_tokens = first_u64_from_manual_usage( - usage, - token_usage, - &["prompt_tokens", "input_tokens", "inputTokens", "input"], - ); - let completion_tokens = first_u64_from_manual_usage( - usage, - token_usage, - &[ - "completion_tokens", - "output_tokens", - "completionTokens", - "outputTokens", - "output", - ], - ); - let reported_total_tokens = first_u64_from_manual_usage( - usage, - token_usage, - &["total_tokens", "totalTokens", "total"], - ); - let cache_read_tokens = first_u64_from_manual_usage( - usage, - token_usage, - &[ - "cache_read_tokens", - "cached_tokens", - "cache_read_input_tokens", - "cacheReadTokens", - "cachedTokens", - "cacheReadInputTokens", - "cacheRead", - ], - ) - .or_else(|| { - first_nested_u64_from_manual_usage( - usage, - token_usage, - "input_tokens_details", - "cached_tokens", - ) - }) - .or_else(|| { - first_nested_u64_from_manual_usage( - usage, - token_usage, - "prompt_tokens_details", - "cached_tokens", - ) - }); - let cache_write_tokens = first_u64_from_manual_usage( - usage, - token_usage, - &[ - "cache_write_tokens", - "cache_creation_input_tokens", - "cacheWriteTokens", - "cacheCreationInputTokens", - "cacheWrite", - ], - ); - - if prompt_tokens.is_none() - && completion_tokens.is_none() - && reported_total_tokens.is_none() - && cache_read_tokens.is_none() - && cache_write_tokens.is_none() - { - return None; - } - let total_tokens = - normalize_total_tokens(reported_total_tokens, prompt_tokens, completion_tokens); - - Some(Usage { - prompt_tokens, - completion_tokens, - total_tokens, - cache_read_tokens, - cache_write_tokens, - cost: None, - }) + manual_usage_from_llm_output_with(output, normalize_total_tokens) } fn normalize_total_tokens( @@ -1260,44 +1132,6 @@ fn normalize_total_tokens( } } -fn first_u64_from_manual_usage( - usage: Option<&serde_json::Map>, - token_usage: Option<&serde_json::Map>, - keys: &[&str], -) -> Option { - usage - .and_then(|value| first_u64(value, keys)) - .or_else(|| token_usage.and_then(|value| first_u64(value, keys))) -} - -fn first_nested_u64_from_manual_usage( - usage: Option<&serde_json::Map>, - token_usage: Option<&serde_json::Map>, - parent_key: &str, - child_key: &str, -) -> Option { - usage - .and_then(|value| nested_u64(value, parent_key, child_key)) - .or_else(|| token_usage.and_then(|value| nested_u64(value, parent_key, child_key))) -} - -fn nested_u64( - usage: &serde_json::Map, - parent_key: &str, - child_key: &str, -) -> Option { - usage - .get(parent_key) - .and_then(Json::as_object) - .and_then(|details| details.get(child_key)) - .and_then(Json::as_u64) -} - -fn first_u64(usage: &serde_json::Map, keys: &[&str]) -> Option { - keys.iter() - .find_map(|key| usage.get(*key).and_then(Json::as_u64)) -} - fn mark_attributes(event: &Event) -> Vec { let handle_attributes = event.attributes(); let mut attributes = vec![ @@ -1617,17 +1451,7 @@ fn display_text_from_tool_calls(value: &Json) -> Option { } fn tool_call_name(value: &Json) -> Option { - value - .get("name") - .and_then(Json::as_str) - .or_else(|| value.get("toolName").and_then(Json::as_str)) - .or_else(|| { - value - .get("function") - .and_then(|function| function.get("name")) - .and_then(Json::as_str) - }) - .map(str::to_string) + manual_tool_call_name(value).map(str::to_string) } fn to_json_string(value: &T) -> Option { diff --git a/crates/core/src/observability/otel.rs b/crates/core/src/observability/otel.rs index 74fefe59..46c904a3 100644 --- a/crates/core/src/observability/otel.rs +++ b/crates/core/src/observability/otel.rs @@ -28,6 +28,10 @@ use crate::api::subscriber::{deregister_subscriber, flush_subscribers, register_ use crate::codec::response::{CostEstimate, Usage, estimate_cost_for_provider}; use crate::error::FlowError; use crate::json::Json; +use crate::observability::manual_llm_fallback::{ + manual_cost_estimate_from_llm_output, manual_model_name_from_llm_output, + manual_usage_from_llm_output_with, +}; use chrono::{DateTime, Utc}; use opentelemetry::trace::{ Span as _, SpanContext, SpanKind, TraceContextExt, Tracer, TracerProvider as _, @@ -708,162 +712,15 @@ fn cost_total_and_currency(cost: &CostEstimate) -> Option<(f64, String)> { } fn cost_from_manual_llm_output(output: Option<&Json>) -> Option<(f64, String)> { - let object = output?.as_object()?; - let usage = object.get("usage").and_then(Json::as_object); - let token_usage = object.get("token_usage").and_then(Json::as_object); - usage - .and_then(cost_from_manual_usage) - .or_else(|| token_usage.and_then(cost_from_manual_usage)) -} - -fn cost_from_manual_usage(usage: &serde_json::Map) -> Option<(f64, String)> { - usage - .get("cost_usd") - .and_then(Json::as_f64) - .map(|total| (total, "USD".to_string())) - .or_else(|| { - let cost = usage.get("cost")?.as_object()?; - let total = cost.get("total").and_then(Json::as_f64).or_else(|| { - let (has_component, component_total) = - ["input", "output", "cache_read", "cache_write"] - .iter() - .filter_map(|field| cost.get(*field).and_then(Json::as_f64)) - .fold((false, 0.0), |(_, total), value| (true, total + value)); - has_component.then_some(component_total) - })?; - Some(( - total, - cost.get("currency") - .and_then(Json::as_str) - .unwrap_or("USD") - .to_string(), - )) - }) + manual_cost_estimate_from_llm_output(output).and_then(|cost| cost_total_and_currency(&cost)) } fn usage_from_manual_llm_output(output: Option<&Json>) -> Option { - let object = output?.as_object()?; - let usage = object.get("usage").and_then(Json::as_object); - let token_usage = object.get("token_usage").and_then(Json::as_object); - if usage.is_none() && token_usage.is_none() { - return None; - } - - let prompt_tokens = first_u64_from_manual_usage( - usage, - token_usage, - &["prompt_tokens", "input_tokens", "inputTokens", "input"], - ); - let completion_tokens = first_u64_from_manual_usage( - usage, - token_usage, - &[ - "completion_tokens", - "output_tokens", - "completionTokens", - "outputTokens", - "output", - ], - ); - let reported_total_tokens = first_u64_from_manual_usage( - usage, - token_usage, - &["total_tokens", "totalTokens", "total"], - ); - let cache_read_tokens = first_u64_from_manual_usage( - usage, - token_usage, - &[ - "cache_read_tokens", - "cached_tokens", - "cache_read_input_tokens", - "cacheReadTokens", - "cachedTokens", - "cacheReadInputTokens", - "cacheRead", - ], - ) - .or_else(|| { - first_nested_u64_from_manual_usage( - usage, - token_usage, - "input_tokens_details", - "cached_tokens", - ) - }) - .or_else(|| { - first_nested_u64_from_manual_usage( - usage, - token_usage, - "prompt_tokens_details", - "cached_tokens", - ) - }); - let cache_write_tokens = first_u64_from_manual_usage( - usage, - token_usage, - &[ - "cache_write_tokens", - "cache_creation_input_tokens", - "cacheWriteTokens", - "cacheCreationInputTokens", - "cacheWrite", - ], - ); - - if prompt_tokens.is_none() - && completion_tokens.is_none() - && reported_total_tokens.is_none() - && cache_read_tokens.is_none() - && cache_write_tokens.is_none() - { - return None; - } - - Some(Usage { - prompt_tokens, - completion_tokens, - total_tokens: normalize_total_tokens( - reported_total_tokens, - prompt_tokens, - completion_tokens, - ), - cache_read_tokens, - cache_write_tokens, - cost: None, - }) + manual_usage_from_llm_output_with(output, normalize_total_tokens) } fn model_name_from_manual_llm_output(output: Option<&Json>) -> Option<&str> { - output?.as_object()?.get("model").and_then(Json::as_str) -} - -fn first_u64_from_manual_usage( - usage: Option<&serde_json::Map>, - token_usage: Option<&serde_json::Map>, - keys: &[&str], -) -> Option { - keys.iter().find_map(|key| { - usage - .and_then(|usage| usage.get(*key).and_then(Json::as_u64)) - .or_else(|| token_usage.and_then(|usage| usage.get(*key).and_then(Json::as_u64))) - }) -} - -fn first_nested_u64_from_manual_usage( - usage: Option<&serde_json::Map>, - token_usage: Option<&serde_json::Map>, - parent: &str, - key: &str, -) -> Option { - usage - .and_then(|usage| usage.get(parent).and_then(Json::as_object)) - .and_then(|details| details.get(key).and_then(Json::as_u64)) - .or_else(|| { - token_usage - .and_then(|usage| usage.get(parent).and_then(Json::as_object)) - .and_then(|details| details.get(key).and_then(Json::as_u64)) - }) + manual_model_name_from_llm_output(output) } fn normalize_total_tokens( From d29f0b1c1b36ba9f383ac7201a13e6c4f286323a Mon Sep 17 00:00:00 2001 From: mnajafian-nv Date: Tue, 23 Jun 2026 14:22:41 -0700 Subject: [PATCH 2/2] fix: preserve exporter fallback behavior Signed-off-by: mnajafian-nv --- crates/core/src/observability/atif.rs | 26 +- .../src/observability/manual_llm_fallback.rs | 228 +++++++++++++----- .../core/src/observability/openinference.rs | 12 +- crates/core/tests/unit/atif_tests.rs | 61 +++++ .../unit/observability/openinference_tests.rs | 15 ++ 5 files changed, 282 insertions(+), 60 deletions(-) diff --git a/crates/core/src/observability/atif.rs b/crates/core/src/observability/atif.rs index d0219065..3e7a0339 100644 --- a/crates/core/src/observability/atif.rs +++ b/crates/core/src/observability/atif.rs @@ -722,8 +722,7 @@ fn extract_metrics( ) }); let cached = sum_options(cache_read, cache_write); - let explicit_cost = manual_cost_estimate_from_usage(usage) - .and_then(|cost| cost.total_or_component_sum_for_currency("USD")); + let explicit_cost = atif_cost_usd_from_usage(usage); let has_reported_cost = usage.get("cost").is_some(); let cost = if has_reported_cost { explicit_cost @@ -786,6 +785,29 @@ fn extract_metrics( }) } +fn atif_cost_usd_from_usage(usage: &serde_json::Map) -> Option { + if let Some(total) = usage.get("cost_usd").and_then(Json::as_f64) { + return Some(total); + } + + let cost = usage.get("cost")?.as_object()?; + let currency = cost.get("currency").and_then(Json::as_str); + let is_relay_normalized_cost = cost + .get("source") + .and_then(Json::as_str) + .is_some_and(|source| matches!(source, "provider_reported" | "model_pricing")); + let has_legacy_provider_total = + currency.is_none() && cost.get("total").and_then(Json::as_f64).is_some(); + let is_usd_cost = currency.is_some_and(|currency| currency.eq_ignore_ascii_case("USD")) + || currency.is_none() && (is_relay_normalized_cost || has_legacy_provider_total); + if !is_usd_cost { + return None; + } + + manual_cost_estimate_from_usage(usage) + .and_then(|cost| cost.total_or_component_sum_for_currency("USD")) +} + fn merge_metrics( primary: Option, supplemental: Option<&AtifMetrics>, diff --git a/crates/core/src/observability/manual_llm_fallback.rs b/crates/core/src/observability/manual_llm_fallback.rs index 98070199..e0195981 100644 --- a/crates/core/src/observability/manual_llm_fallback.rs +++ b/crates/core/src/observability/manual_llm_fallback.rs @@ -23,8 +23,8 @@ enum ManualUsagePrecedence { /// across both maps before moving to weaker aliases. #[cfg(any(feature = "otel", feature = "openinference", test))] KeyOrder, - /// Preserve ATIF's selected-map preference: search all aliases in - /// `token_usage` before using provider-native `usage` for the same field. + /// Preserve ATIF's selected-map preference: choose `token_usage` when + /// present, otherwise provider-native `usage`, and only search that map. PrimaryMap, } @@ -64,7 +64,8 @@ pub(crate) fn manual_usage_fields_from_preferred_token_usage( let object = output?.as_object()?; let usage = object.get("usage").and_then(Json::as_object); let token_usage = object.get("token_usage").and_then(Json::as_object); - manual_usage_fields_from_maps(token_usage, usage, ManualUsagePrecedence::PrimaryMap) + let preferred_usage = token_usage.or(usage); + manual_usage_fields_from_maps(preferred_usage, None, ManualUsagePrecedence::PrimaryMap) } fn manual_usage_fields_from_maps( @@ -100,48 +101,10 @@ fn manual_usage_fields_from_maps( &["total_tokens", "totalTokens", "total"], precedence, ); - let cache_read_tokens = first_u64_from_manual_usage( - primary_usage, - secondary_usage, - &[ - "cache_read_tokens", - "cached_tokens", - "cache_read_input_tokens", - "cacheReadTokens", - "cachedTokens", - "cacheReadInputTokens", - "cacheRead", - ], - precedence, - ) - .or_else(|| { - first_nested_u64_from_manual_usage( - primary_usage, - secondary_usage, - "input_tokens_details", - "cached_tokens", - ) - }) - .or_else(|| { - first_nested_u64_from_manual_usage( - primary_usage, - secondary_usage, - "prompt_tokens_details", - "cached_tokens", - ) - }); - let cache_write_tokens = first_u64_from_manual_usage( - primary_usage, - secondary_usage, - &[ - "cache_write_tokens", - "cache_creation_input_tokens", - "cacheWriteTokens", - "cacheCreationInputTokens", - "cacheWrite", - ], - precedence, - ); + let cache_read_tokens = + cache_read_tokens_from_manual_usage(primary_usage, secondary_usage, precedence); + let cache_write_tokens = + cache_write_tokens_from_manual_usage(primary_usage, secondary_usage, precedence); if prompt_tokens.is_none() && completion_tokens.is_none() @@ -165,7 +128,7 @@ pub(crate) fn manual_model_name_from_llm_output(output: Option<&Json>) -> Option output?.as_object()?.get("model").and_then(Json::as_str) } -#[cfg(any(feature = "otel", feature = "openinference", test))] +#[cfg(any(feature = "otel", test))] pub(crate) fn manual_cost_estimate_from_llm_output(output: Option<&Json>) -> Option { let object = output?.as_object()?; let usage = object.get("usage").and_then(Json::as_object); @@ -190,7 +153,7 @@ pub(crate) fn manual_cost_total_for_currency_from_llm_output( manual_cost_total_for_currency_from_maps(usage, token_usage, currency) } -#[cfg(any(feature = "otel", feature = "openinference", test))] +#[cfg(any(feature = "otel", test))] fn manual_cost_estimate_from_maps( primary_usage: Option<&serde_json::Map>, secondary_usage: Option<&serde_json::Map>, @@ -254,34 +217,141 @@ fn cost_estimate_from_manual_usage(usage: &serde_json::Map) -> Opt estimate.total_or_component_sum().map(|_| estimate) } -fn first_u64_from_manual_usage( +fn cache_read_tokens_from_manual_usage( + primary_usage: Option<&serde_json::Map>, + secondary_usage: Option<&serde_json::Map>, + precedence: ManualUsagePrecedence, +) -> Option { + match precedence { + #[cfg(any(feature = "otel", feature = "openinference", test))] + ManualUsagePrecedence::KeyOrder => first_u64_from_manual_usage( + primary_usage, + secondary_usage, + &[ + "cache_read_tokens", + "cached_tokens", + "cache_read_input_tokens", + "cacheReadTokens", + "cachedTokens", + "cacheReadInputTokens", + "cacheRead", + ], + precedence, + ) + .or_else(|| { + first_nested_u64_from_manual_usage( + primary_usage, + secondary_usage, + "input_tokens_details", + "cached_tokens", + ) + }) + .or_else(|| { + first_nested_u64_from_manual_usage( + primary_usage, + secondary_usage, + "prompt_tokens_details", + "cached_tokens", + ) + }), + ManualUsagePrecedence::PrimaryMap => cache_read_tokens_from_preferred_usage(primary_usage) + .or_else(|| cache_read_tokens_from_preferred_usage(secondary_usage)), + } +} + +fn cache_read_tokens_from_preferred_usage( usage: Option<&serde_json::Map>, - token_usage: Option<&serde_json::Map>, +) -> Option { + let usage = usage?; + first_u64(usage, &["cached_tokens"]) + .or_else(|| nested_u64(usage, "prompt_tokens_details", "cached_tokens")) + .or_else(|| nested_u64(usage, "input_tokens_details", "cached_tokens")) + .or_else(|| { + first_u64( + usage, + &[ + "cache_read_input_tokens", + "cache_read_tokens", + "cacheReadTokens", + "cachedTokens", + "cacheReadInputTokens", + "cacheRead", + ], + ) + }) +} + +fn cache_write_tokens_from_manual_usage( + primary_usage: Option<&serde_json::Map>, + secondary_usage: Option<&serde_json::Map>, + precedence: ManualUsagePrecedence, +) -> Option { + match precedence { + #[cfg(any(feature = "otel", feature = "openinference", test))] + ManualUsagePrecedence::KeyOrder => first_u64_from_manual_usage( + primary_usage, + secondary_usage, + &[ + "cache_write_tokens", + "cache_creation_input_tokens", + "cacheWriteTokens", + "cacheCreationInputTokens", + "cacheWrite", + ], + precedence, + ), + ManualUsagePrecedence::PrimaryMap => cache_write_tokens_from_preferred_usage(primary_usage) + .or_else(|| cache_write_tokens_from_preferred_usage(secondary_usage)), + } +} + +fn cache_write_tokens_from_preferred_usage( + usage: Option<&serde_json::Map>, +) -> Option { + let usage = usage?; + first_u64( + usage, + &[ + "cache_creation_input_tokens", + "cache_write_tokens", + "cacheCreationInputTokens", + "cacheWriteTokens", + "cacheWrite", + ], + ) +} + +fn first_u64_from_manual_usage( + primary_usage: Option<&serde_json::Map>, + secondary_usage: Option<&serde_json::Map>, keys: &[&str], precedence: ManualUsagePrecedence, ) -> Option { match precedence { #[cfg(any(feature = "otel", feature = "openinference", test))] ManualUsagePrecedence::KeyOrder => keys.iter().find_map(|key| { - usage + primary_usage .and_then(|value| value.get(*key).and_then(Json::as_u64)) - .or_else(|| token_usage.and_then(|value| value.get(*key).and_then(Json::as_u64))) + .or_else(|| { + secondary_usage.and_then(|value| value.get(*key).and_then(Json::as_u64)) + }) }), - ManualUsagePrecedence::PrimaryMap => usage + ManualUsagePrecedence::PrimaryMap => primary_usage .and_then(|value| first_u64(value, keys)) - .or_else(|| token_usage.and_then(|value| first_u64(value, keys))), + .or_else(|| secondary_usage.and_then(|value| first_u64(value, keys))), } } +#[cfg(any(feature = "otel", feature = "openinference", test))] fn first_nested_u64_from_manual_usage( - usage: Option<&serde_json::Map>, - token_usage: Option<&serde_json::Map>, + primary_usage: Option<&serde_json::Map>, + secondary_usage: Option<&serde_json::Map>, parent_key: &str, child_key: &str, ) -> Option { - usage + primary_usage .and_then(|value| nested_u64(value, parent_key, child_key)) - .or_else(|| token_usage.and_then(|value| nested_u64(value, parent_key, child_key))) + .or_else(|| secondary_usage.and_then(|value| nested_u64(value, parent_key, child_key))) } fn nested_u64( @@ -413,6 +483,50 @@ mod tests { assert_eq!(fields.prompt_tokens, Some(2)); } + #[test] + fn manual_usage_fields_do_not_merge_atif_usage_maps() { + let output = json!({ + "usage": {"prompt_tokens": 1}, + "token_usage": {"completion_tokens": 2} + }); + + let fields = manual_usage_fields_from_preferred_token_usage(Some(&output)).unwrap(); + assert_eq!(fields.prompt_tokens, None); + assert_eq!(fields.completion_tokens, Some(2)); + } + + #[test] + fn manual_usage_fields_preserve_atif_cache_read_precedence() { + let output = json!({ + "token_usage": { + "prompt_tokens": 1, + "cached_tokens": 2, + "prompt_tokens_details": {"cached_tokens": 3}, + "input_tokens_details": {"cached_tokens": 4}, + "cache_read_input_tokens": 5, + "cache_read_tokens": 6, + "cache_creation_input_tokens": 7, + "cache_write_tokens": 8 + } + }); + + let fields = manual_usage_fields_from_preferred_token_usage(Some(&output)).unwrap(); + assert_eq!(fields.cache_read_tokens, Some(2)); + assert_eq!(fields.cache_write_tokens, Some(7)); + + let output = json!({ + "token_usage": { + "prompt_tokens": 1, + "prompt_tokens_details": {"cached_tokens": 3}, + "input_tokens_details": {"cached_tokens": 4}, + "cache_read_input_tokens": 5 + } + }); + + let fields = manual_usage_fields_from_preferred_token_usage(Some(&output)).unwrap(); + assert_eq!(fields.cache_read_tokens, Some(3)); + } + #[test] fn manual_usage_fields_preserve_key_order_for_manual_output() { let output = json!({ diff --git a/crates/core/src/observability/openinference.rs b/crates/core/src/observability/openinference.rs index 329b8a3a..0b7d54e4 100644 --- a/crates/core/src/observability/openinference.rs +++ b/crates/core/src/observability/openinference.rs @@ -1451,7 +1451,17 @@ fn display_text_from_tool_calls(value: &Json) -> Option { } fn tool_call_name(value: &Json) -> Option { - manual_tool_call_name(value).map(str::to_string) + value + .get("name") + .and_then(Json::as_str) + .or_else(|| value.get("toolName").and_then(Json::as_str)) + .or_else(|| { + value + .get("function") + .and_then(|function| function.get("name")) + .and_then(Json::as_str) + }) + .map(str::to_string) } fn to_json_string(value: &T) -> Option { diff --git a/crates/core/tests/unit/atif_tests.rs b/crates/core/tests/unit/atif_tests.rs index 1bfe2cdd..2a401f18 100644 --- a/crates/core/tests/unit/atif_tests.rs +++ b/crates/core/tests/unit/atif_tests.rs @@ -801,6 +801,47 @@ fn test_extract_metrics_supports_provider_usage_payloads() { assert_eq!(non_usd_metrics.cost_usd, None); } +#[test] +fn test_extract_metrics_preserves_cache_token_precedence() { + let metrics = extract_metrics( + &json!({ + "usage": { + "prompt_tokens": 10, + "completion_tokens": 20, + "cached_tokens": 2, + "prompt_tokens_details": {"cached_tokens": 3}, + "input_tokens_details": {"cached_tokens": 4}, + "cache_read_input_tokens": 5, + "cache_read_tokens": 6, + "cache_creation_input_tokens": 7, + "cache_write_tokens": 8 + } + }), + None, + None, + ) + .unwrap(); + + assert_eq!(metrics.cached_tokens, Some(9)); + + let metrics = extract_metrics( + &json!({ + "usage": { + "prompt_tokens": 10, + "completion_tokens": 20, + "prompt_tokens_details": {"cached_tokens": 3}, + "input_tokens_details": {"cached_tokens": 4}, + "cache_read_input_tokens": 5 + } + }), + None, + None, + ) + .unwrap(); + + assert_eq!(metrics.cached_tokens, Some(3)); +} + #[test] fn test_reported_cost_object_blocks_model_pricing_estimation() { let _pricing_guard = pricing_test_mutex().lock().unwrap(); @@ -882,6 +923,26 @@ fn test_reported_cost_object_blocks_model_pricing_estimation() { .unwrap(); assert_eq!(component_metrics.cost_usd, Some(0.875)); + + let component_missing_currency_metrics = extract_metrics( + &json!({ + "usage": { + "prompt_tokens": 1000, + "completion_tokens": 500, + "total_tokens": 1500, + "cost": { + "input": 0.25, + "output": 0.5, + "cache_read": 0.125 + } + } + }), + Some("test"), + Some("priced-model"), + ) + .unwrap(); + + assert_eq!(component_missing_currency_metrics.cost_usd, None); } #[test] diff --git a/crates/core/tests/unit/observability/openinference_tests.rs b/crates/core/tests/unit/observability/openinference_tests.rs index 22d0698f..0e811284 100644 --- a/crates/core/tests/unit/observability/openinference_tests.rs +++ b/crates/core/tests/unit/observability/openinference_tests.rs @@ -2110,6 +2110,21 @@ fn helper_functions_cover_additional_openinference_branches() { ); } +#[test] +fn display_text_from_tool_calls_prefers_top_level_name() { + let text = display_text_from_chat_choices(&json!([{ + "message": { + "tool_calls": [{ + "name": "display_name", + "toolName": "camel_name", + "function": {"name": "nested_name"} + }] + } + }])); + + assert_eq!(text, Some("Requested tools: display_name".to_string())); +} + #[test] fn provider_builders_cover_success_paths() { let http_provider = build_tracer_provider(