Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 48 additions & 60 deletions crates/core/src/observability/atif.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,12 @@ use crate::api::subscriber::flush_subscribers;
use crate::codec::response::{Usage, estimate_cost_for_provider};
use crate::error::Result;
use crate::json::Json;
use crate::observability::manual_llm_fallback::{
manual_cost_estimate_from_usage, manual_model_name_from_llm_output,
manual_reasoning_content_from_llm_output, manual_reasoning_effort_from_llm_input,
manual_tool_call_arguments, manual_tool_call_id, manual_tool_call_name,
manual_usage_fields_from_preferred_token_usage,
};

/// The ATIF schema version string embedded in all exported trajectories.
///
Expand Down Expand Up @@ -690,21 +696,33 @@ fn extract_metrics(
model_name: Option<&str>,
) -> Option<AtifMetrics> {
let usage = token_usage_object(output)?;
let prompt = usage_u64(usage, &["prompt_tokens", "input_tokens"]);
let completion = usage_u64(usage, &["completion_tokens", "output_tokens"]);
let cache_read = usage_u64(usage, &["cached_tokens"])
let fallback_fields = manual_usage_fields_from_preferred_token_usage(Some(output));
let prompt = fallback_fields
.as_ref()
.and_then(|fields| fields.prompt_tokens)
.or_else(|| usage_u64(usage, &["prompt_tokens", "input_tokens"]));
let completion = fallback_fields
.as_ref()
.and_then(|fields| fields.completion_tokens)
.or_else(|| usage_u64(usage, &["completion_tokens", "output_tokens"]));
let cache_read = fallback_fields
.as_ref()
.and_then(|fields| fields.cache_read_tokens)
.or_else(|| usage_u64(usage, &["cached_tokens"]))
.or_else(|| prompt_tokens_detail_u64(usage, "cached_tokens"))
.or_else(|| input_tokens_detail_u64(usage, "cached_tokens"))
.or_else(|| usage_u64(usage, &["cache_read_input_tokens"]));
let cache_write = usage_u64(
usage,
&["cache_creation_input_tokens", "cache_write_tokens"],
);
let cache_write = fallback_fields
.as_ref()
.and_then(|fields| fields.cache_write_tokens)
.or_else(|| {
usage_u64(
usage,
&["cache_creation_input_tokens", "cache_write_tokens"],
)
});
let cached = sum_options(cache_read, cache_write);
let explicit_cost = usage
.get("cost_usd")
.and_then(Json::as_f64)
.or_else(|| usage.get("cost").and_then(cost_usd_from_cost_object));
let explicit_cost = atif_cost_usd_from_usage(usage);
let has_reported_cost = usage.get("cost").is_some();
let cost = if has_reported_cost {
explicit_cost
Expand All @@ -717,7 +735,10 @@ fn extract_metrics(
&Usage {
prompt_tokens: prompt,
completion_tokens: completion,
total_tokens: usage_u64(usage, &["total_tokens"]),
total_tokens: fallback_fields
.as_ref()
.and_then(|fields| fields.reported_total_tokens)
.or_else(|| usage_u64(usage, &["total_tokens"])),
cache_read_tokens: cache_read,
cache_write_tokens: cache_write,
cost: None,
Expand Down Expand Up @@ -764,8 +785,12 @@ fn extract_metrics(
})
}

fn cost_usd_from_cost_object(cost: &Json) -> Option<f64> {
let cost = cost.as_object()?;
fn atif_cost_usd_from_usage(usage: &serde_json::Map<String, Json>) -> Option<f64> {
if let Some(total) = usage.get("cost_usd").and_then(Json::as_f64) {
return Some(total);
}

let cost = usage.get("cost")?.as_object()?;
let currency = cost.get("currency").and_then(Json::as_str);
let is_relay_normalized_cost = cost
.get("source")
Expand All @@ -779,13 +804,8 @@ fn cost_usd_from_cost_object(cost: &Json) -> Option<f64> {
return None;
}

cost.get("total").and_then(Json::as_f64).or_else(|| {
let (has_component, component_total) = ["input", "output", "cache_read", "cache_write"]
.iter()
.filter_map(|field| cost.get(*field).and_then(Json::as_f64))
.fold((false, 0.0), |(_, total), value| (true, total + value));
has_component.then_some(component_total)
})
manual_cost_estimate_from_usage(usage)
.and_then(|cost| cost.total_or_component_sum_for_currency("USD"))
}

fn merge_metrics(
Expand Down Expand Up @@ -859,9 +879,7 @@ fn usage_u64(usage: &serde_json::Map<String, Json>, keys: &[&str]) -> Option<u64
}

fn response_model_name(output: &Json) -> Option<&str> {
output
.as_object()
.and_then(|object| object.get("model").and_then(Json::as_str))
manual_model_name_from_llm_output(Some(output))
}

fn sum_options(left: Option<u64>, right: Option<u64>) -> Option<u64> {
Expand Down Expand Up @@ -893,26 +911,15 @@ fn input_tokens_detail_u64(usage: &serde_json::Map<String, Json>, key: &str) ->
/// The request content may have `reasoning_effort` (e.g. `"high"`, `"medium"`,
/// or a numeric value). Returns the value as Json for flexibility.
fn extract_reasoning_effort(input: &Json) -> Option<Json> {
if let Some(obj) = input.as_object()
&& let Some(v) = obj.get("reasoning_effort")
&& !v.is_null()
{
return Some(v.clone());
}
None
manual_reasoning_effort_from_llm_input(input)
}

/// Extract `reasoning` (reasoning_content) from an LLM response output.
///
/// The agent's explicit internal reasoning may appear in the response under the
/// `"reasoning"` key. Returns `None` if absent or not a string.
fn extract_reasoning_content(output: &Json) -> Option<String> {
if let Some(obj) = output.as_object()
&& let Some(r) = obj.get("reasoning")
{
return r.as_str().map(String::from);
}
None
manual_reasoning_content_from_llm_output(output)
}

/// Extract the latest user-facing message from an LLM request payload.
Expand Down Expand Up @@ -1013,32 +1020,13 @@ fn extract_tool_calls(output: &Json) -> Option<Vec<AtifToolCall>> {
.or_else(|| anthropic_messages_tool_use_items(output))?;
let mut calls = Vec::with_capacity(arr.len());
for (index, tc) in arr.iter().enumerate() {
let tc_obj = tc.as_object()?;
let mut id = tc_obj
.get("id")
.or_else(|| tc_obj.get("tool_call_id"))
.or_else(|| tc_obj.get("call_id"))
.and_then(Json::as_str)
.unwrap_or("")
.to_string();
// The function details live under "function".
let func = tc_obj.get("function").and_then(Json::as_object);
let name = func
.and_then(|f| f.get("name"))
.or_else(|| tc_obj.get("name"))
.or_else(|| tc_obj.get("tool_name"))
.or_else(|| tc_obj.get("function_name"))
.and_then(Json::as_str)
.unwrap_or("")
.to_string();
tc.as_object()?;
let mut id = manual_tool_call_id(tc).unwrap_or("").to_string();
let name = manual_tool_call_name(tc).unwrap_or("").to_string();
if id.is_empty() && !name.is_empty() {
id = format!("{name}:{}", index + 1);
}
let raw_arguments = func
.and_then(|f| f.get("arguments"))
.or_else(|| tc_obj.get("arguments"))
.or_else(|| tc_obj.get("args"))
.or_else(|| tc_obj.get("input"));
let raw_arguments = manual_tool_call_arguments(tc);
let arguments = normalize_tool_arguments(raw_arguments);
// Skip entries with no id and no name — they are not meaningful.
if id.is_empty() && name.is_empty() {
Expand Down
Loading
Loading