Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 16 additions & 11 deletions crates/adaptive/src/acg/request_surfaces/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ pub(crate) mod openai_responses;
use std::collections::HashSet;

use nemo_relay::api::llm::LlmRequest;
use nemo_relay::codec::resolve::{ProviderSurface, detect_request_surface};
use serde_json::Value;

use crate::acg::prompt_ir::PromptIR;
Expand All @@ -40,6 +41,14 @@ pub(crate) trait RequestSurfaceApplier: Send + Sync {
}

impl RequestSurface {
fn from_provider_surface(surface: ProviderSurface) -> Self {
match surface {
ProviderSurface::OpenAIChat => Self::OpenAIChat,
ProviderSurface::OpenAIResponses => Self::OpenAIResponses,
ProviderSurface::AnthropicMessages => Self::AnthropicMessages,
}
}

pub(crate) fn supports_provider(self, provider: &str) -> bool {
match provider {
"anthropic" => matches!(self, Self::AnthropicMessages),
Expand Down Expand Up @@ -71,17 +80,13 @@ impl RequestSurface {
pub(crate) fn resolve_request_surface_from_request(
request: &LlmRequest,
) -> crate::acg::Result<RequestSurface> {
if request.content.get("input").is_some() || request.content.get("instructions").is_some() {
Ok(RequestSurface::OpenAIResponses)
} else if request.content.get("system").is_some() {
Ok(RequestSurface::AnthropicMessages)
} else if request.content.get("messages").is_some() {
Ok(RequestSurface::OpenAIChat)
} else {
Err(crate::acg::AcgError::Internal(
"unable to resolve request surface from request shape".to_string(),
))
}
detect_request_surface(&request.content)
.map(RequestSurface::from_provider_surface)
.ok_or_else(|| {
crate::acg::AcgError::Internal(
"unable to resolve request surface from request shape".to_string(),
)
})
Comment thread
zhongxuanwang-nv marked this conversation as resolved.
}

#[cfg_attr(not(test), allow(dead_code))]
Expand Down
30 changes: 29 additions & 1 deletion crates/core/src/api/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

//! Event types for Agent Trajectory Observability Format (ATOF) runtime events.

use std::borrow::Cow;
use std::collections::BTreeMap;
use std::sync::Arc;

Expand All @@ -23,10 +24,11 @@ use serde::{Deserialize, Serialize};
use typed_builder::TypedBuilder;
use uuid::Uuid;

use crate::api::llm::LlmAttributes;
use crate::api::llm::{LlmAttributes, LlmRequest};
use crate::api::scope::{HandleAttributes, ScopeAttributes, ScopeType};
use crate::api::tool::ToolAttributes;
use crate::codec::request::AnnotatedLlmRequest;
use crate::codec::resolve;
use crate::codec::response::AnnotatedLlmResponse;
use crate::json::Json;

Expand Down Expand Up @@ -609,6 +611,32 @@ impl Event {
.and_then(|profile| profile.annotated_response.as_ref())
}

/// Normalized LLM request: the codec annotation when present, otherwise a
/// best-effort decode of the start-event input payload.
///
/// The fallback decode requires the start-event input to be the serialized
/// [`LlmRequest`] wire shape (`{headers, content}`) emitted by the managed
/// LLM pipeline; events whose input is a bare payload or a non-LLM shape
/// yield `None`.
#[must_use]
pub fn normalized_llm_request(&self) -> Option<Cow<'_, AnnotatedLlmRequest>> {
if let Some(annotated) = self.annotated_request() {
return Some(Cow::Borrowed(annotated.as_ref()));
}
let request: LlmRequest = serde_json::from_value(self.input()?.clone()).ok()?;
resolve::normalize_request(&request).map(Cow::Owned)
}

/// Normalized LLM response: the codec annotation when present, otherwise a
/// best-effort decode of the end-event output payload.
#[must_use]
pub fn normalized_llm_response(&self) -> Option<Cow<'_, AnnotatedLlmResponse>> {
if let Some(annotated) = self.annotated_response() {
return Some(Cow::Borrowed(annotated.as_ref()));
}
resolve::normalize_response(self.output()?).map(Cow::Owned)
}

/// Return true for scope-start events.
///
/// # Returns
Expand Down
4 changes: 4 additions & 0 deletions crates/core/src/codec/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,16 @@
//! the streaming response codec
//! ([`streaming::StreamingCodec`]) used with the managed
//! streaming LLM execution pipeline.
//!
//! [`resolve`] is the detect-then-decode entry point for selecting a built-in
//! provider codec from a raw payload when no codec annotation is present.

pub mod anthropic;
pub mod openai_chat;
pub mod openai_responses;
pub mod pricing;
pub mod request;
pub mod resolve;
pub mod response;
pub mod streaming;
pub mod traits;
16 changes: 15 additions & 1 deletion crates/core/src/codec/openai_responses.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,11 @@ fn collect_output_item(
.unwrap_or("")
{
"message" => collect_message_text_parts(item, text_parts),
"output_text" => {
if let Some(text) = output_text_block(item) {
text_parts.push(text);
}
}
"function_call" => tool_calls.push(parse_function_call(item)),
_ => {}
}
Expand Down Expand Up @@ -244,6 +249,14 @@ fn message_from_text_parts(text_parts: Vec<String>) -> Option<MessageContent> {
}
}

fn top_level_output_text(response: &Json) -> Option<MessageContent> {
response
.get("output_text")
.and_then(|value| value.as_str())
.filter(|text| !text.is_empty())
.map(|text| MessageContent::Text(text.to_string()))
}

fn optional_vec<T>(items: Vec<T>) -> Option<Vec<T>> {
(!items.is_empty()).then_some(items)
}
Expand Down Expand Up @@ -456,7 +469,8 @@ impl LlmResponseCodec for OpenAIResponsesCodec {

let all_output_items = raw.output.clone();
let (text_parts, tool_calls) = collect_output_parts(raw.output.as_deref());
let message = message_from_text_parts(text_parts);
let message =
message_from_text_parts(text_parts).or_else(|| top_level_output_text(response));
let tool_calls = optional_vec(tool_calls);

// Map finish reason from status + incomplete_details.
Expand Down
94 changes: 94 additions & 0 deletions crates/core/src/codec/resolve.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
// SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

//! Provider-surface detection and best-effort normalization: the preferred path
//! for turning raw provider JSON into normalized types when no codec annotation
//! is present.

use crate::api::llm::LlmRequest;
use crate::json::Json;

use super::anthropic::AnthropicMessagesCodec;
use super::openai_chat::OpenAIChatCodec;
use super::openai_responses::OpenAIResponsesCodec;
use super::request::AnnotatedLlmRequest;
use super::response::AnnotatedLlmResponse;
use super::traits::{LlmCodec, LlmResponseCodec};

/// A built-in provider request/response surface.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ProviderSurface {
Comment thread
zhongxuanwang-nv marked this conversation as resolved.
/// OpenAI Chat Completions.
OpenAIChat,
/// OpenAI Responses.
OpenAIResponses,
/// Anthropic Messages.
AnthropicMessages,
}

/// Detect the request surface from a raw request body by top-level key.
///
/// Priority: OpenAI Responses (`input`/`instructions`) > Anthropic Messages
/// (`system`) > OpenAI Chat (`messages`). `None` when no key matches or `body`
/// is not an object. This is a best-effort heuristic: an Anthropic request that
/// omits the optional top-level `system` is indistinguishable from OpenAI Chat
/// and classifies as `OpenAIChat`.
#[must_use]
pub fn detect_request_surface(body: &Json) -> Option<ProviderSurface> {
let obj = body.as_object()?;
if obj.contains_key("input") || obj.contains_key("instructions") {
Comment thread
zhongxuanwang-nv marked this conversation as resolved.
Some(ProviderSurface::OpenAIResponses)
} else if obj.contains_key("system") {
Some(ProviderSurface::AnthropicMessages)
} else if obj.contains_key("messages") {
Some(ProviderSurface::OpenAIChat)
} else {
None
}
}

/// Detect the response surface from a raw provider response, classifying only
/// when exactly one built-in shape matches (the built-in codecs accept minimal
/// objects, so decode success alone is not a reliable classifier).
#[must_use]
pub fn detect_response_surface(raw: &Json) -> Option<ProviderSurface> {
let obj = raw.as_object()?;
let is_chat = obj.get("choices").is_some_and(Json::is_array);
let is_responses = obj.get("output").is_some_and(Json::is_array)
|| obj.get("output_text").is_some_and(Json::is_string);
let is_anthropic = obj.get("type").and_then(Json::as_str) == Some("message")
&& obj.get("content").is_some_and(Json::is_array);

match (is_chat, is_responses, is_anthropic) {
(true, false, false) => Some(ProviderSurface::OpenAIChat),
(false, true, false) => Some(ProviderSurface::OpenAIResponses),
(false, false, true) => Some(ProviderSurface::AnthropicMessages),
_ => None,
}
}

/// Best-effort decode of a raw request into [`AnnotatedLlmRequest`] (fail-open).
#[must_use]
pub fn normalize_request(request: &LlmRequest) -> Option<AnnotatedLlmRequest> {
match detect_request_surface(&request.content)? {
ProviderSurface::OpenAIChat => OpenAIChatCodec.decode(request),
ProviderSurface::OpenAIResponses => OpenAIResponsesCodec.decode(request),
ProviderSurface::AnthropicMessages => AnthropicMessagesCodec.decode(request),
}
.ok()
}

/// Best-effort decode of a raw response into [`AnnotatedLlmResponse`] (fail-open).
#[must_use]
pub fn normalize_response(raw: &Json) -> Option<AnnotatedLlmResponse> {
match detect_response_surface(raw)? {
ProviderSurface::OpenAIChat => OpenAIChatCodec.decode_response(raw),
ProviderSurface::OpenAIResponses => OpenAIResponsesCodec.decode_response(raw),
ProviderSurface::AnthropicMessages => AnthropicMessagesCodec.decode_response(raw),
}
.ok()
}

#[cfg(test)]
#[path = "../../tests/unit/codec/resolve_tests.rs"]
mod tests;
5 changes: 3 additions & 2 deletions crates/core/src/codec/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@ use super::response::AnnotatedLlmResponse;
/// structured [`AnnotatedLlmRequest`].
///
/// Codecs are implemented by integration patches (LangChain, LangChain-NVIDIA,
/// LangGraph, etc.) since each SDK has its own request format. They are
/// registered by name in the global codec registry.
/// LangGraph, etc.) since each SDK has its own request format. A codec is
/// supplied per call by the caller; the built-in provider codecs can also be
/// selected from a raw payload via [`crate::codec::resolve`].
///
/// # Design
///
Expand Down
43 changes: 40 additions & 3 deletions crates/core/src/observability/atif.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ use uuid::Uuid;
use crate::api::event::Event;
use crate::api::runtime::EventSubscriberFn;
use crate::api::subscriber::flush_subscribers;
use crate::codec::response::{Usage, estimate_cost_for_provider};
use crate::codec::request::{AnnotatedLlmRequest, Message, MessageContent};
use crate::codec::response::{AnnotatedLlmResponse, Usage, estimate_cost_for_provider};
use crate::error::Result;
use crate::json::Json;

Expand Down Expand Up @@ -1054,6 +1055,36 @@ fn extract_tool_calls(output: &Json) -> Option<Vec<AtifToolCall>> {
if calls.is_empty() { None } else { Some(calls) }
}

// Annotation adapters: read the normalized message from an annotation, returning
// None for multimodal content so the caller falls back to the raw extractor.

fn atif_message_from_annotated_request(request: &AnnotatedLlmRequest) -> Option<Json> {
let content = request
.messages
.iter()
.rev()
.find_map(|message| match message {
Message::User { content, .. } => Some(content),
_ => None,
})?;
match content {
MessageContent::Text(text) => Some(Json::String(text.clone())),
MessageContent::Parts(_) => None,
}
}

fn atif_message_from_annotated_response(response: &AnnotatedLlmResponse) -> Option<Json> {
match &response.message {
Some(MessageContent::Text(text)) => Some(Json::String(text.clone())),
// Multimodal: defer to the raw extractor (returns None to fall back).
Some(MessageContent::Parts(_)) => None,
// No assistant text (tool-call-only, or reasoning/thinking-only): emit an
// empty message rather than the raw extractor's stringified payload. Tool
// calls and metrics are still recovered from the raw output downstream.
None => Some(empty_message()),
}
}

fn tool_call_array(output: &Json) -> Option<&Vec<Json>> {
output
.as_object()
Expand Down Expand Up @@ -2212,7 +2243,10 @@ impl StepConversionState {
self.steps.push(AtifStep {
step_id: 0,
source: "user".to_string(),
message: extract_user_messages(&content),
message: event
.annotated_request()
.and_then(|request| atif_message_from_annotated_request(request))
.unwrap_or_else(|| extract_user_messages(&content)),
Comment thread
zhongxuanwang-nv marked this conversation as resolved.
timestamp: Some(event.timestamp().to_rfc3339()),
model_name: None,
reasoning_effort: None,
Expand Down Expand Up @@ -2253,7 +2287,10 @@ impl StepConversionState {
self.steps.push(AtifStep {
step_id: 0,
source: "agent".to_string(),
message: extract_llm_response_message(output),
message: event
.annotated_response()
.and_then(|response| atif_message_from_annotated_response(response))
.unwrap_or_else(|| extract_llm_response_message(output)),
Comment thread
zhongxuanwang-nv marked this conversation as resolved.
timestamp: Some(event.timestamp().to_rfc3339()),
model_name: event.model_name().map(ToOwned::to_owned),
reasoning_effort,
Expand Down
Loading
Loading