diff --git a/rust/Cargo.lock b/rust/Cargo.lock index 4e22845..1493357 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -1053,7 +1053,6 @@ dependencies = [ "bytes", "http", "opentelemetry", - "reqwest 0.13.4", ] [[package]] @@ -1068,7 +1067,6 @@ dependencies = [ "opentelemetry-proto", "opentelemetry_sdk", "prost", - "reqwest 0.13.4", "serde_json", "thiserror", ] diff --git a/rust/observability/Cargo.toml b/rust/observability/Cargo.toml index 9f2ad3a..83fa1f4 100644 --- a/rust/observability/Cargo.toml +++ b/rust/observability/Cargo.toml @@ -10,20 +10,20 @@ categories = ["development-tools::debugging", "api-bindings"] readme = "README.md" [dependencies] -# `smooai-fetch` wraps reqwest with timeouts + retries + circuit-breaking; the -# SDK's own app-style outbound HTTP (M2M token exchange in `auth.rs`, the batched -# webhook POST in `transport.rs`) goes through it so those calls inherit the -# resilience policy every other SmooAI client uses (SMOODEV-2026). +# `smooai-fetch` wraps reqwest with timeouts + retries + circuit-breaking. ALL of +# this SDK's outbound HTTP goes through it: the M2M token exchange (`auth.rs`), the +# batched webhook POST (`transport.rs`), AND the OTLP trace + metric export +# (`otel/auth_client.rs`, via a smooai-fetch-backed `opentelemetry_http::HttpClient` +# adapter) — so every call inherits the resilience policy every other SmooAI client +# uses (SMOODEV-2026, SMOODEV-2029). smooai-fetch pulls reqwest transitively. smooai-fetch = "3" -# reqwest 0.13 stays for two reasons: (1) the OTLP exporter's auth-injecting -# client in `otel.rs` is coupled to `opentelemetry-http`'s `HttpClient` trait, -# which is impl'd for `reqwest::Client` — smooai-fetch doesn't impl that trait, and -# that path is the OTLP protocol transport, not an app call; (2) the -# `reqwest-middleware` feature IS the reqwest integration for downstream reqwest -# users. It also remains a transitive dep of smooai-fetch. 0.13 unifies with -# opentelemetry-http 0.32 (a 0.12 pin would be a different `Client` type and the -# trait impl wouldn't apply). -reqwest = { version = "0.13", features = ["json", "form", "rustls"], default-features = false } +# reqwest is now needed ONLY by the optional `reqwest-middleware` integration — +# the client-span middleware in `reqwest_mw.rs` IS the reqwest integration for +# downstream reqwest users. No code path in the default build references reqwest +# directly (the OTLP exporter transport moved to smooai-fetch in SMOODEV-2029), so +# the dep is feature-gated to keep the default build from pulling reqwest as a +# DIRECT dependency. (It still arrives transitively through smooai-fetch.) +reqwest = { version = "0.13", features = ["json", "form", "rustls"], default-features = false, optional = true } serde = { version = "1", features = ["derive"] } serde_json = "1" tokio = { version = "1", features = ["rt", "rt-multi-thread", "macros", "sync", "time"] } @@ -38,10 +38,19 @@ thiserror = "2" # OpenTelemetry — traces + metrics over OTLP/HTTP with JSON payloads so the # wire format matches the api.smoo.ai ingest the TS SDK already speaks. +# The exporter's HTTP transport is supplied explicitly via `.with_http_client(...)` +# (a smooai-fetch-backed `opentelemetry_http::HttpClient`), so we do NOT enable any +# of opentelemetry-otlp's built-in HTTP clients (`reqwest-client` etc.) — the +# explicit client always wins and the default-client code is never compiled in. +# `experimental-http-retry` is deliberately OFF: smooai-fetch owns transport retry, +# so leaving the exporter's retry feature off prevents double-retry (SMOODEV-2029). +# opentelemetry-http is pulled with no features — we only need the `HttpClient` +# trait to implement it ourselves (the `reqwest` feature, which impls it for +# reqwest::Client, is no longer used). opentelemetry = { version = "0.32", features = ["trace", "metrics"] } opentelemetry_sdk = { version = "0.32", features = ["trace", "metrics", "rt-tokio"] } -opentelemetry-otlp = { version = "0.32", features = ["trace", "metrics", "http-json", "reqwest-client"], default-features = false } -opentelemetry-http = { version = "0.32", features = ["reqwest"], default-features = false } +opentelemetry-otlp = { version = "0.32", features = ["trace", "metrics", "http-json"], default-features = false } +opentelemetry-http = { version = "0.32", default-features = false } opentelemetry-semantic-conventions = "0.32" # --- Optional HTTP framework instrumentation (feature-gated, default off) ----- @@ -58,8 +67,10 @@ reqwest-middleware = { version = "0.5", optional = true } default = [] # OTel server-span layer for Tower/Axum services. tower = ["dep:tower-layer", "dep:tower-service", "dep:pin-project-lite"] -# OTel client-span middleware for reqwest. -reqwest-middleware = ["dep:reqwest-middleware"] +# OTel client-span middleware for reqwest. Pulls reqwest as a DIRECT dep (the +# only thing in this crate that needs it directly — the OTLP transport is on +# smooai-fetch as of SMOODEV-2029). +reqwest-middleware = ["dep:reqwest-middleware", "dep:reqwest"] [dev-dependencies] tokio = { version = "1", features = ["rt-multi-thread", "macros", "time", "test-util"] } diff --git a/rust/observability/src/otel.rs b/rust/observability/src/otel.rs index fc15880..b8e75b6 100644 --- a/rust/observability/src/otel.rs +++ b/rust/observability/src/otel.rs @@ -7,12 +7,18 @@ //! so a refreshed token starts being used on the next export with no //! exporter restart. This sidesteps the header-snapshot staleness that bit //! the TS SDK (SMOODEV-1206) and applies here too: the OTLP exporter holds -//! its `reqwest::Client` + headers for the life of the process. +//! its HTTP client + headers for the life of the process. //! //! The per-request mode is implemented with a custom [`opentelemetry_http::HttpClient`] //! wrapper ([`AuthInjectingHttpClient`]) that asks the `TokenProvider` for a //! fresh token, sets the `Authorization` header, and on a 401 invalidates + -//! retries once before delegating to an inner `reqwest` client. +//! retries once. The OTLP protocol transport itself runs over `smooai-fetch` +//! (timeouts + retries + circuit breaking) rather than raw `reqwest`, so the +//! export path inherits the same resilience policy every other SmooAI outbound +//! call uses (SMOODEV-2029). smooai-fetch owns transport retry; the exporter +//! layer doesn't retry because this crate leaves opentelemetry-otlp's +//! `experimental-http-retry` feature off (see `auth_client.rs` for the no-double- +//! retry rationale). //! //! Wire format is OTLP/HTTP/JSON (`http-json` feature) so the bytes match what //! the TS `AuthInjectingTraceExporter` POSTs. @@ -177,13 +183,13 @@ fn build_and_install(options: SetupOtelOptions) -> OtelSdkHandle { } } -/// Build the auth-injecting reqwest client wrapper shared by both exporters. +/// Per-export HTTP timeout baked into the smooai-fetch client backing the OTLP +/// exporter. Matches the 10s the previous raw-reqwest client used. +const OTLP_EXPORT_TIMEOUT_MS: u64 = 10_000; + +/// Build the auth-injecting smooai-fetch HTTP client shared by both exporters. fn build_http_client(options: &SetupOtelOptions) -> AuthInjectingHttpClient { - let inner = reqwest::Client::builder() - .timeout(Duration::from_secs(10)) - .build() - .unwrap_or_default(); - AuthInjectingHttpClient::new(inner, options.token_provider.clone()) + AuthInjectingHttpClient::new(OTLP_EXPORT_TIMEOUT_MS, options.token_provider.clone()) } fn build_span_exporter(options: &SetupOtelOptions) -> Option { diff --git a/rust/observability/src/otel/auth_client.rs b/rust/observability/src/otel/auth_client.rs index dbb6230..7ea0388 100644 --- a/rust/observability/src/otel/auth_client.rs +++ b/rust/observability/src/otel/auth_client.rs @@ -1,5 +1,10 @@ -//! Custom [`opentelemetry_http::HttpClient`] that injects a fresh Bearer token -//! on every export, mirroring the TS `auth-injecting-exporter.ts`. +//! [`opentelemetry_http::HttpClient`] backed by `smooai-fetch`, injecting a +//! fresh Bearer token on every export — mirroring the TS +//! `auth-injecting-exporter.ts` and routing the OTLP protocol transport through +//! the same resilient client (`timeouts + retries + circuit breaking`) every +//! other SmooAI outbound call uses (SMOODEV-2029). +//! +//! ## Why this exists //! //! The upstream OTLP exporter holds its HTTP client + headers for the life of //! the process, so a token minted once at setup goes stale after ~1h and every @@ -9,49 +14,189 @@ //! 2. sets `authorization: Bearer ` on the outgoing request, //! 3. on a 401 response, invalidates the cached token + retries ONCE. //! -//! When no `TokenProvider` is configured the wrapper is a transparent -//! pass-through to the inner `reqwest::Client` (static-header / no-auth mode). +//! When no `TokenProvider` is configured the wrapper sends the request as-is +//! (static-header / no-auth mode — the static `authorization` header, if any, +//! is merged onto the request by the OTLP exporter before it reaches us). +//! +//! ## Retry ownership — avoiding double-retry (SMOODEV-2029) +//! +//! `smooai-fetch` OWNS transport retry: it retries 429 / 5xx / timeouts / +//! connection errors with exponential backoff + `Retry-After` honoring + circuit +//! breaking. The OTLP exporter layer must NOT also retry the same failure. That +//! is guaranteed here because this crate does NOT enable opentelemetry-otlp's +//! `experimental-http-retry` feature — with that feature off, the exporter calls +//! `send_bytes` exactly once per export and never retries (see +//! `opentelemetry-otlp`'s `export_http_once` no-retry path). So smooai-fetch is +//! the single retry layer for 429/5xx/transport, and the only retry WE add is the +//! 401→invalidate→re-mint→retry-once below — which smooai-fetch deliberately does +//! NOT do (401 is not in its retryable set), so the two never stack. +//! +//! ## Response translation +//! +//! `opentelemetry_http::HttpClient::send_bytes` returns `Response` for any +//! completed HTTP exchange — including non-2xx — because the OTLP exporter reads +//! `response.status()` itself to decide success/failure. `smooai-fetch`, by +//! contrast, returns `Err(FetchError::HttpResponse { .. })` for non-2xx. We +//! therefore reconstruct an `http::Response` (status + headers + body) from that +//! error variant so the exporter sees the real status, and surface only genuine +//! transport failures (timeout, exhausted retries, connection error) as `Err`. use crate::auth::TokenProvider; use async_trait::async_trait; use bytes::Bytes; use http::{Request, Response}; use opentelemetry_http::{HttpClient, HttpError}; +use smooai_fetch::defaults::default_retry_options; +use smooai_fetch::error::FetchError; +use smooai_fetch::response::FetchResponse; +use smooai_fetch::types::{Method, RequestInit}; +use smooai_fetch::{FetchBuilder, FetchClient}; +use std::collections::HashMap; +use std::sync::Arc; -/// HTTP client wrapper that injects M2M auth per request. Cloneable; clones -/// share the inner client + token cache. -#[derive(Debug, Clone)] +/// HTTP client wrapper that sends OTLP exports through `smooai-fetch` and injects +/// M2M auth per request. Cheap to clone (`Arc`-shared); clones share the inner +/// fetch client + token cache. +#[derive(Clone)] pub struct AuthInjectingHttpClient { - inner: reqwest::Client, + // Typed to `serde_json::Value` because OTLP/HTTP/JSON responses are JSON; the + // parsed body is unused (we reconstruct the response from the raw `body` + // string), but the type makes a 2xx JSON body parse cleanly rather than trip + // smooai-fetch's schema-validation guard. + http: Arc>, token_provider: Option, } +// `opentelemetry_http::HttpClient` requires `Debug`. `FetchClient` doesn't +// implement it, and the `TokenProvider` Debug is hand-rolled to redact the +// client secret — so derive nothing and print only the auth-mode flag. +impl std::fmt::Debug for AuthInjectingHttpClient { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("AuthInjectingHttpClient") + .field("auth", &self.token_provider.is_some()) + .finish_non_exhaustive() + } +} + impl AuthInjectingHttpClient { - pub fn new(inner: reqwest::Client, token_provider: Option) -> Self { + /// Build the adapter. `timeout_ms` and the retry policy are baked into the + /// underlying `smooai-fetch` client (this is the SINGLE retry layer — the + /// OTLP exporter does not retry; see the module docs). + pub fn new(timeout_ms: u64, token_provider: Option) -> Self { + let http = FetchBuilder::::new() + .with_timeout(timeout_ms) + .with_retry(default_retry_options()) + .build(); AuthInjectingHttpClient { - inner, + http: Arc::new(http), token_provider, } } - /// Clone a `Request` (parts + cheap `Bytes` body clone) so the - /// 401-retry path can resend after re-minting the token. - fn clone_request(req: &Request) -> Request { - let mut builder = Request::builder() - .method(req.method().clone()) - .uri(req.uri().clone()); - if let Some(headers) = builder.headers_mut() { - *headers = req.headers().clone(); + /// Translate an `http::Request` (what `opentelemetry-http` hands us) + /// into a `smooai-fetch` `(url, RequestInit)`. The OTLP exporter only ever + /// POSTs, but we map the method faithfully. The body for OTLP/HTTP/JSON is + /// UTF-8 JSON, so `Bytes` → `String` is lossless; a non-UTF-8 body would be a + /// protocol bug, and `from_utf8_lossy` keeps us from panicking on it. + fn to_fetch(req: &Request, bearer: Option<&str>) -> (String, RequestInit) { + let url = req.uri().to_string(); + let method = match *req.method() { + http::Method::POST => Method::POST, + http::Method::PUT => Method::PUT, + http::Method::PATCH => Method::PATCH, + http::Method::DELETE => Method::DELETE, + http::Method::HEAD => Method::HEAD, + http::Method::OPTIONS => Method::OPTIONS, + _ => Method::GET, + }; + let mut headers = HashMap::new(); + for (name, value) in req.headers().iter() { + if let Ok(v) = value.to_str() { + headers.insert(name.as_str().to_string(), v.to_string()); + } + } + // Inject / override the Authorization header with the fresh token. + if let Some(token) = bearer { + headers.insert("authorization".to_string(), format!("Bearer {token}")); + } + let body = String::from_utf8_lossy(req.body()).into_owned(); + ( + url, + RequestInit { + method, + headers, + body: Some(body), + }, + ) + } + + /// Rebuild an `http::Response` from a (status, headers, body) triple — + /// used for BOTH the success path (`FetchResponse`) and the non-2xx path + /// (`FetchError::HttpResponse`), so the OTLP exporter always sees the real + /// status it inspects. + fn build_response( + status: u16, + headers: &HashMap, + body: String, + ) -> Result, HttpError> { + let mut builder = Response::builder().status(status); + for (k, v) in headers.iter() { + if let (Ok(name), Ok(value)) = ( + http::header::HeaderName::from_bytes(k.as_bytes()), + http::header::HeaderValue::from_str(v), + ) { + builder = builder.header(name, value); + } } - // Unwrap is safe: we rebuilt from a valid request's parts. builder - .body(req.body().clone()) - .expect("rebuilding a valid request must not fail") + .body(Bytes::from(body.into_bytes())) + .map_err(Into::into) + } + + /// Map a `smooai-fetch` result into the `Response` the OTLP exporter + /// expects. A non-2xx surfaces from `smooai-fetch` as + /// `Err(FetchError::HttpResponse { .. })` — we turn that back into a real + /// `Response` (preserving status/headers/body). The retry loop wraps the + /// final failure in `FetchError::Retry`, so unwrap one level to recover the + /// underlying response. Everything else (timeout, transport, exhausted + /// non-HTTP retries) is a genuine transport failure → `Err(HttpError)`. + fn into_http_response( + result: Result, FetchError>, + ) -> Result, HttpError> { + match result { + Ok(resp) => Self::build_response(resp.status, &resp.headers, resp.body), + Err(err) => { + let unwrapped = match &err { + FetchError::Retry { source, .. } => source.as_ref(), + other => other, + }; + match unwrapped { + FetchError::HttpResponse { + status, + headers, + body, + .. + } => Self::build_response(*status, headers, body.clone()), + _ => Err(Box::new(err)), + } + } + } } - fn set_bearer(req: &mut Request, token: &str) { - if let Ok(value) = http::HeaderValue::from_str(&format!("Bearer {token}")) { - req.headers_mut().insert(http::header::AUTHORIZATION, value); + /// Did this fetch result resolve to an HTTP 401? smooai-fetch reports non-2xx + /// as `Err(HttpResponse { status: 401, .. })`; defensively also catch an `Ok` + /// whose status is 401 (it shouldn't be — 401 isn't 2xx — but a future + /// smooai-fetch could change that). + fn is_unauthorized(result: &Result, FetchError>) -> bool { + match result { + Ok(resp) => resp.status == 401, + Err(err) => { + let unwrapped = match err { + FetchError::Retry { source, .. } => source.as_ref(), + other => other, + }; + matches!(unwrapped, FetchError::HttpResponse { status: 401, .. }) + } } } } @@ -59,56 +204,38 @@ impl AuthInjectingHttpClient { #[async_trait] impl HttpClient for AuthInjectingHttpClient { async fn send_bytes(&self, request: Request) -> Result, HttpError> { - // No token provider → transparent pass-through (static-header mode). + // No token provider → send as-is (static-header / no-auth mode). let Some(provider) = &self.token_provider else { - return self.inner.send_bytes(request).await; + let (url, init) = Self::to_fetch(&request, None); + return Self::into_http_response(self.http.fetch(&url, init).await); }; - // Keep a copy for the potential 401 retry before we consume `request`. - let retry_template = Self::clone_request(&request); - - let mut req = request; - match provider.get_access_token().await { - Ok(token) => Self::set_bearer(&mut req, &token), + // Mint a token for this export. A mint failure is logged, not fatal — we + // still send (unauthenticated) so the server's 401 surfaces in logs + // rather than silently dropping the batch. Never panic. + let token = match provider.get_access_token().await { + Ok(token) => Some(token), Err(e) => { - // Couldn't mint — send unauthenticated; the server 401 is more - // useful in logs than silently dropping. Never panic. crate::otel::warn(&format!("token mint failed before export: {e}")); + None } - } - - let result = self.inner.send_bytes(req).await; - - // 401 retry: token may have been revoked / rotated server-side. Drop the - // cached value and re-mint once. Don't loop. - // - // NOTE: the upstream `HttpClient for reqwest::Client` impl calls - // `.error_for_status()`, so a 401 surfaces as `Err(reqwest::Error)` — - // NOT `Ok(response_with_status_401)`. We therefore detect the 401 in - // BOTH shapes: the error path (downcast to reqwest::Error + check - // `.status()`) and, defensively, an `Ok` whose status is 401 (in case a - // future inner client doesn't auto-error). - let is_401 = match &result { - Ok(resp) => resp.status() == http::StatusCode::UNAUTHORIZED, - Err(e) => e - .downcast_ref::() - .and_then(|re| re.status()) - .map(|s| s == reqwest::StatusCode::UNAUTHORIZED) - .unwrap_or(false), }; - if is_401 { + let (url, init) = Self::to_fetch(&request, token.as_deref()); + let result = self.http.fetch(&url, init).await; + + // 401 retry: the token may have been revoked / rotated server-side. Drop + // the cached value, re-mint, and retry ONCE. smooai-fetch does not retry + // 401 itself (not in its retryable set), so this is the only 401 retry — + // no double-retry. Don't loop. + if Self::is_unauthorized(&result) { provider.invalidate().await; - let mut retry = retry_template; - if let Ok(token) = provider.get_access_token().await { - Self::set_bearer(&mut retry, &token); - } - return self.inner.send_bytes(retry).await; + let retry_token = provider.get_access_token().await.ok(); + let (retry_url, retry_init) = Self::to_fetch(&request, retry_token.as_deref()); + return Self::into_http_response(self.http.fetch(&retry_url, retry_init).await); } - let response = result?; - - Ok(response) + Self::into_http_response(result) } } @@ -121,6 +248,8 @@ mod tests { use wiremock::matchers::{method, path}; use wiremock::{Mock, MockServer, Request as WmRequest, Respond, ResponseTemplate}; + const TEST_TIMEOUT_MS: u64 = 10_000; + fn build_req(uri: &str) -> Request { Request::builder() .method("POST") @@ -138,7 +267,7 @@ mod tests { .respond_with(ResponseTemplate::new(200)) .mount(&server) .await; - let client = AuthInjectingHttpClient::new(reqwest::Client::new(), None); + let client = AuthInjectingHttpClient::new(TEST_TIMEOUT_MS, None); let resp = client .send_bytes(build_req(&format!("{}/v1/traces", server.uri()))) .await @@ -181,7 +310,7 @@ mod tests { .await; let tp = TokenProvider::new(TokenProviderOptions::new(auth.uri(), "cid", "sk")).unwrap(); - let client = AuthInjectingHttpClient::new(reqwest::Client::new(), Some(tp)); + let client = AuthInjectingHttpClient::new(TEST_TIMEOUT_MS, Some(tp)); let resp = client .send_bytes(build_req(&format!("{}/v1/traces", ingest.uri()))) .await @@ -226,7 +355,7 @@ mod tests { .await; let tp = TokenProvider::new(TokenProviderOptions::new(auth.uri(), "cid", "sk")).unwrap(); - let client = AuthInjectingHttpClient::new(reqwest::Client::new(), Some(tp)); + let client = AuthInjectingHttpClient::new(TEST_TIMEOUT_MS, Some(tp)); let resp = client .send_bytes(build_req(&format!("{}/v1/traces", ingest.uri()))) .await @@ -238,4 +367,23 @@ mod tests { "should have retried exactly once" ); } + + #[tokio::test] + async fn non_2xx_surfaces_as_response_not_error() { + // A 4xx that is NOT 401 must come back as a `Response` carrying the real + // status (so the OTLP exporter can read it), not a transport `Err`. + let ingest = MockServer::start().await; + Mock::given(method("POST")) + .and(path("/v1/traces")) + .respond_with(ResponseTemplate::new(422).set_body_string("bad payload")) + .mount(&ingest) + .await; + + let client = AuthInjectingHttpClient::new(TEST_TIMEOUT_MS, None); + let resp = client + .send_bytes(build_req(&format!("{}/v1/traces", ingest.uri()))) + .await + .expect("non-2xx should be a Response, not an Err"); + assert_eq!(resp.status(), 422); + } }