From d466139045ae3b420c9613ee6c3c6b8d527fe2a7 Mon Sep 17 00:00:00 2001 From: Joris Bayer Date: Mon, 29 Jun 2026 20:33:52 +0200 Subject: [PATCH 1/5] ref: compression enum --- relay-server/src/endpoints/minidump.rs | 58 +++++++++++++++++--------- 1 file changed, 38 insertions(+), 20 deletions(-) diff --git a/relay-server/src/endpoints/minidump.rs b/relay-server/src/endpoints/minidump.rs index f8bc8c290cc..9ad15e8df9f 100644 --- a/relay-server/src/endpoints/minidump.rs +++ b/relay-server/src/endpoints/minidump.rs @@ -88,14 +88,11 @@ where { let (head, stream) = utils::stream::peek_n(stream, MAGIC_PEEK).await?; - if head.starts_with(GZIP_MAGIC_HEADER) - || head.starts_with(XZ_MAGIC_HEADER) - || head.starts_with(BZIP2_MAGIC_HEADER) - || head.starts_with(ZSTD_MAGIC_HEADER) - { - Err(PeekError::Compressed) - } else { - Ok(stream) + match Compression::from(&head) { + Compression::NoCompression => Ok(stream), + Compression::Gzip | Compression::Xz | Compression::Bzip2 | Compression::Zstd => { + Err(PeekError::Compressed) + } } } @@ -118,25 +115,46 @@ fn run_decoder(mut decoder: impl Read) -> std::io::Result> { Ok(buffer) } -/// Creates a decoder based on the magic bytes the minidump payload +/// Types of compression we support for minidump payloads. +enum Compression { + NoCompression, + Gzip, + Xz, + Bzip2, + Zstd, +} + +impl Compression { + fn from(header: &[u8]) -> Self { + if header.starts_with(GZIP_MAGIC_HEADER) { + Self::Gzip + } else if header.starts_with(XZ_MAGIC_HEADER) { + Self::Xz + } else if header.starts_with(BZIP2_MAGIC_HEADER) { + Self::Bzip2 + } else if header.starts_with(ZSTD_MAGIC_HEADER) { + Self::Zstd + } else { + Self::NoCompression + } + } +} + +/// Creates a decoder based on the magic bytes in the minidump payload. fn decoder_from(minidump_data: Bytes) -> Option> { - if minidump_data.starts_with(GZIP_MAGIC_HEADER) { - return Some(Box::new(GzDecoder::new(Cursor::new(minidump_data)))); - } else if minidump_data.starts_with(XZ_MAGIC_HEADER) { - return Some(Box::new(XzDecoder::new(Cursor::new(minidump_data)))); - } else if minidump_data.starts_with(BZIP2_MAGIC_HEADER) { - return Some(Box::new(BzDecoder::new(Cursor::new(minidump_data)))); - } else if minidump_data.starts_with(ZSTD_MAGIC_HEADER) { - return match ZstdDecoder::new(Cursor::new(minidump_data)) { + match Compression::from(&minidump_data) { + Compression::NoCompression => None, + Compression::Gzip => Some(Box::new(GzDecoder::new(Cursor::new(minidump_data)))), + Compression::Xz => Some(Box::new(XzDecoder::new(Cursor::new(minidump_data)))), + Compression::Bzip2 => Some(Box::new(BzDecoder::new(Cursor::new(minidump_data)))), + Compression::Zstd => match ZstdDecoder::new(Cursor::new(minidump_data)) { Ok(decoder) => Some(Box::new(decoder)), Err(ref err) => { relay_log::error!(error = err as &dyn Error, "failed to create ZstdDecoder"); None } - }; + }, } - - None } /// Tries to decode a minidump using any of the supported compression formats From ec1d33950d66e7255cab2110689145eeb1980287 Mon Sep 17 00:00:00 2001 From: Joris Bayer Date: Tue, 30 Jun 2026 11:00:37 +0200 Subject: [PATCH 2/5] wip --- Cargo.lock | 2 + Cargo.toml | 2 +- relay-server/src/endpoints/common.rs | 6 ++- relay-server/src/endpoints/minidump.rs | 56 +++++++++++++---------- relay-server/src/endpoints/playstation.rs | 1 + relay-server/src/endpoints/upload.rs | 1 + relay-server/src/services/upload.rs | 31 +++++++++++-- 7 files changed, 69 insertions(+), 30 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index af2180e5df3..4bd370dfde4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -729,8 +729,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ce2548391e9c1929c21bf6aa2680af86fe4c1b33e6cea9ac1cfeec0bd11218cf" dependencies = [ "brotli 8.0.2", + "bzip2", "compression-core", "flate2", + "liblzma", "memchr", "zstd", "zstd-safe", diff --git a/Cargo.toml b/Cargo.toml index 5932888d8d8..21857231a0e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -90,7 +90,7 @@ android_trace_log = { version = "0.3", features = ["serde"] } # Keep it pinned until it's possible to disable backtrace. anyhow = "=1.0.69" arc-swap = "1" -async-compression = { version = "0.4", features = ["tokio", "gzip", "brotli", "deflate", "zstd"] } +async-compression = { version = "0.4", features = ["tokio", "gzip", "brotli", "deflate", "zstd", "bzip2", "xz"] } async-trait = "0.1" axum = "0.8" axum-extra = "0.12" diff --git a/relay-server/src/endpoints/common.rs b/relay-server/src/endpoints/common.rs index 00de250cb54..024fff00d9a 100644 --- a/relay-server/src/endpoints/common.rs +++ b/relay-server/src/endpoints/common.rs @@ -24,7 +24,7 @@ use crate::service::ServiceState; use crate::services::buffer::{ProjectKeyPair, PushError}; use crate::services::outcome::{DiscardItemType, DiscardReason, Outcome}; use crate::services::processor::{BucketSource, MetricData, ProcessMetrics}; -use crate::services::upload::{Create, ProjectContext, Stream, Upload}; +use crate::services::upload::{ContentEncoding, Create, ProjectContext, Stream, Upload}; use crate::statsd::{RelayCounters, RelayDistributions}; use crate::utils::{ self, ApiErrorResponse, BoundedStream, FormDataIter, MeteredStream, find_error_source, @@ -549,6 +549,7 @@ fn emit_envelope_metrics(envelope: &Envelope) { /// [AttachmentPlaceholder] as payload. pub async fn upload_to_objectstore( stream: S, + content_encoding: Option, content_type: Option, mut item: Managed, config: &Config, @@ -562,6 +563,7 @@ where { let res = upload_to_objectstore_inner( stream, + content_encoding, content_type, &mut item, config, @@ -578,6 +580,7 @@ where async fn upload_to_objectstore_inner( stream: S, + content_encoding: Option, content_type: Option, item: &mut Managed, config: &Config, @@ -612,6 +615,7 @@ where project, location, stream, + content_encoding, }) .await .ok()?; diff --git a/relay-server/src/endpoints/minidump.rs b/relay-server/src/endpoints/minidump.rs index 9ad15e8df9f..b98f46f10a4 100644 --- a/relay-server/src/endpoints/minidump.rs +++ b/relay-server/src/endpoints/minidump.rs @@ -1,3 +1,6 @@ +use async_compression::tokio::bufread::{ + BzDecoder as AsyncBzDecoder, GzipDecoder as AsyncGzipDecoder, XzDecoder as AsyncXzDecoder, +}; use axum::RequestExt; use axum::extract::{DefaultBodyLimit, Request}; use axum::response::IntoResponse; @@ -5,7 +8,7 @@ use axum::routing::{MethodRouter, post}; use bytes::Bytes; use bzip2::read::BzDecoder; use flate2::read::GzDecoder; -use futures::{self, Stream}; +use futures::{self, Stream, StreamExt, TryStreamExt}; use liblzma::read::XzDecoder; use multer::{Field, Multipart}; use relay_config::Config; @@ -18,6 +21,8 @@ use std::convert::Infallible; use std::error::Error; use std::io::Cursor; use std::io::Read; +use tokio::io::BufReader; +use tokio_util::io::{ReaderStream, StreamReader}; use tower_http::limit::RequestBodyLimitLayer; use zstd::stream::Decoder as ZstdDecoder; @@ -30,7 +35,7 @@ use crate::middlewares; use crate::service::ServiceState; use crate::services::outcome::{DiscardAttachmentType, DiscardItemType, DiscardReason, Outcome}; use crate::services::projects::project::ProjectState; -use crate::services::upload::{ProjectContext, Upload}; +use crate::services::upload::{ByteStream, ContentEncoding, ProjectContext, Upload}; use crate::statsd::RelayCounters; use crate::utils::{self, AttachmentStrategy, read_bytes_into_item}; @@ -69,31 +74,29 @@ const MAGIC_PEEK: usize = 6; /// Content types by which standalone uploads can be recognized. const MINIDUMP_RAW_CONTENT_TYPES: &[&str] = &["application/octet-stream", "application/x-dmp"]; -#[derive(Debug, thiserror::Error)] -enum PeekError { - #[error("compressed minidump payloads are not supported for streaming upload")] - Compressed, - #[error(transparent)] - Source(#[from] E), +macro_rules! wrap_decode { + ($stream:expr, $decoder:ident) => {{ ReaderStream::new($decoder::new(BufReader::new(StreamReader::new($stream)))).boxed() }}; } -/// Peek the first bytes of `stream` and reject if they look compressed (gzip/xz/bzip2/zstd). -/// Returns the original stream contents if not. -async fn reject_if_compressed( - stream: S, -) -> Result> + Send, PeekError> +/// Peek the first bytes of `stream` and returns a decoding wrapper if necessary. +/// +/// Returns raw minidump bytes if the stream is uncompressed, otherwise decompresses +/// one of the minidump container formats we support for inline uploads. +async fn decode_stream(stream: S) -> std::io::Result<(ByteStream, Option)> where - S: Stream> + Send, - E: Send, + S: Stream> + Send + 'static, + E: Into> + Send + 'static, { + let stream = stream.map_err(std::io::Error::other); let (head, stream) = utils::stream::peek_n(stream, MAGIC_PEEK).await?; - - match Compression::from(&head) { - Compression::NoCompression => Ok(stream), - Compression::Gzip | Compression::Xz | Compression::Bzip2 | Compression::Zstd => { - Err(PeekError::Compressed) - } - } + let decoded = match Compression::from(&head) { + Compression::NoCompression => (stream.boxed(), None), + Compression::Zstd => (stream.boxed(), Some(ContentEncoding::Zstd)), + Compression::Gzip => (wrap_decode!(stream, AsyncGzipDecoder), None), + Compression::Xz => (wrap_decode!(stream, AsyncXzDecoder), None), + Compression::Bzip2 => (wrap_decode!(stream, AsyncBzDecoder), None), + }; + Ok(decoded) } fn validate_minidump(data: &[u8]) -> Result<(), BadStoreRequest> { @@ -350,6 +353,7 @@ where if !matches!(item.attachment_type(), Some(AttachmentType::Minidump)) { return upload_to_objectstore( stream, + None, content_type, item, config, @@ -361,8 +365,8 @@ where .map_err(|_| BadStoreRequest::ObjectstoreUploadFailed); } - let stream = match reject_if_compressed(stream).await { - Ok(stream) => stream, + let (stream, encoding) = match decode_stream(stream).await { + Ok(decoded) => decoded, Err(_) => { let _ = item.reject_err(Outcome::Invalid(DiscardReason::InvalidMinidump)); return Err(BadStoreRequest::InvalidMinidump); @@ -371,6 +375,7 @@ where upload_to_objectstore( stream, + encoding, content_type, item, config, @@ -529,12 +534,13 @@ async fn raw_minidump_to_item( if let Some(upload_context) = upload_context && matches!(upload_context.upload_minidumps, UploadDecision::Upload) { - let stream = reject_if_compressed(request.into_body().into_data_stream()) + let (stream, encoding) = decode_stream(request.into_body().into_data_stream()) .await .map_err(|_| BadStoreRequest::InvalidMinidump)?; item = upload_to_objectstore( stream, + encoding, Some(content_type.to_string()).filter(|s| !s.is_empty()), item, state.config(), diff --git a/relay-server/src/endpoints/playstation.rs b/relay-server/src/endpoints/playstation.rs index d888b6d8da2..d44ae397bfe 100644 --- a/relay-server/src/endpoints/playstation.rs +++ b/relay-server/src/endpoints/playstation.rs @@ -149,6 +149,7 @@ impl<'a> AttachmentStrategy for PlaystationAttachmentStrategy<'a> { let content_type = field.content_type().map(ToString::to_string); Ok(common::upload_to_objectstore( field, + None, content_type, item, config, diff --git a/relay-server/src/endpoints/upload.rs b/relay-server/src/endpoints/upload.rs index 021a7bf51c9..16e7252b788 100644 --- a/relay-server/src/endpoints/upload.rs +++ b/relay-server/src/endpoints/upload.rs @@ -326,6 +326,7 @@ async fn upload( project, location, stream, + content_encoding: None, // NOTE: we could pass through zstd here if we can skip `RequestDecompressionLayer`. }) .await??; diff --git a/relay-server/src/services/upload.rs b/relay-server/src/services/upload.rs index 79f320b245c..5fdaded2dab 100644 --- a/relay-server/src/services/upload.rs +++ b/relay-server/src/services/upload.rs @@ -146,6 +146,18 @@ pub struct Stream { pub location: SignedLocation, /// The body to be uploaded to objectstore, with length validation. pub stream: BoundedStream>, + /// The content encoding of the stream. + /// + /// If `Some`, the service treats the stream as already encoded. + /// If `None`, the service will apply zstd compression to the stream while uploading. + pub content_encoding: Option, +} + +/// Type of compression that both Relay and Objectstore support. +/// +/// This can be used to pass compressed streams through to Objectstore without decoding. +pub enum ContentEncoding { + Zstd, } impl FromMessage for Upload { @@ -293,10 +305,16 @@ impl Service { project, location, stream, + content_encoding, } = stream; match &self.backend { Backend::Upstream { addr } => { - let (request, rx) = UploadRequest::upload(project, location.try_to_uri()?, stream); + let (request, rx) = UploadRequest::upload( + project, + location.try_to_uri()?, + stream, + content_encoding, + ); addr.send(SendRequest(request)); let response = rx.await??; SignedLocation::try_from_response(response) @@ -677,6 +695,7 @@ enum RequestKind { uri: String, stream: TakeOnce>>, encoding: HttpEncoding, + content_encoding: Option, }, } @@ -715,6 +734,7 @@ impl UploadRequest { project: ProjectContext, uri: String, stream: BoundedStream>, + content_encoding: Option, ) -> ( Self, oneshot::Receiver>, @@ -727,6 +747,7 @@ impl UploadRequest { uri, stream: TakeOnce::new(stream), encoding: HttpEncoding::Zstd, // just a default, will be overwritten by .configure() + content_encoding, }, sender, }, @@ -809,6 +830,7 @@ impl UpstreamRequest for UploadRequest { uri: _, stream, encoding, + content_encoding, } => { let Some(body) = RetryableStream::new(stream.clone()) else { relay_log::error!("upload request stream was already consumed"); @@ -816,8 +838,11 @@ impl UpstreamRequest for UploadRequest { }; tus::add_upload_headers(builder); - let body = encode_body(body, *encoding); - builder.content_encoding(*encoding); + let (body, encoding) = match content_encoding { + Some(ContentEncoding::Zstd) => (body.boxed(), HttpEncoding::Zstd), + None => (encode_body(body, *encoding), *encoding), + }; + builder.content_encoding(encoding); builder.body(reqwest::Body::wrap_stream(body)); } From 29fa09273b98f3addfa44c67ee03ec17a5177ed4 Mon Sep 17 00:00:00 2001 From: Joris Bayer Date: Tue, 30 Jun 2026 11:10:50 +0200 Subject: [PATCH 3/5] simplify --- relay-server/src/endpoints/minidump.rs | 11 +++++---- relay-server/src/services/upload.rs | 31 +++++++------------------- 2 files changed, 13 insertions(+), 29 deletions(-) diff --git a/relay-server/src/endpoints/minidump.rs b/relay-server/src/endpoints/minidump.rs index b98f46f10a4..a65e5bd544a 100644 --- a/relay-server/src/endpoints/minidump.rs +++ b/relay-server/src/endpoints/minidump.rs @@ -1,6 +1,3 @@ -use async_compression::tokio::bufread::{ - BzDecoder as AsyncBzDecoder, GzipDecoder as AsyncGzipDecoder, XzDecoder as AsyncXzDecoder, -}; use axum::RequestExt; use axum::extract::{DefaultBodyLimit, Request}; use axum::response::IntoResponse; @@ -87,14 +84,16 @@ where S: Stream> + Send + 'static, E: Into> + Send + 'static, { + use async_compression::tokio::bufread::{BzDecoder, GzipDecoder, XzDecoder}; + let stream = stream.map_err(std::io::Error::other); let (head, stream) = utils::stream::peek_n(stream, MAGIC_PEEK).await?; let decoded = match Compression::from(&head) { Compression::NoCompression => (stream.boxed(), None), Compression::Zstd => (stream.boxed(), Some(ContentEncoding::Zstd)), - Compression::Gzip => (wrap_decode!(stream, AsyncGzipDecoder), None), - Compression::Xz => (wrap_decode!(stream, AsyncXzDecoder), None), - Compression::Bzip2 => (wrap_decode!(stream, AsyncBzDecoder), None), + Compression::Gzip => (wrap_decode!(stream, GzipDecoder), None), + Compression::Xz => (wrap_decode!(stream, XzDecoder), None), + Compression::Bzip2 => (wrap_decode!(stream, BzDecoder), None), }; Ok(decoded) } diff --git a/relay-server/src/services/upload.rs b/relay-server/src/services/upload.rs index 5fdaded2dab..c26bfec41a5 100644 --- a/relay-server/src/services/upload.rs +++ b/relay-server/src/services/upload.rs @@ -6,7 +6,7 @@ use std::fmt; use std::sync::Arc; use std::time::Duration; -use async_compression::tokio::bufread::{BrotliEncoder, DeflateEncoder, GzipEncoder, ZstdEncoder}; +use async_compression::tokio::bufread::ZstdEncoder; use bytes::Bytes; use chrono::DateTime; use chrono::Utc; @@ -694,7 +694,6 @@ enum RequestKind { Upload { uri: String, stream: TakeOnce>>, - encoding: HttpEncoding, content_encoding: Option, }, } @@ -746,7 +745,6 @@ impl UploadRequest { kind: RequestKind::Upload { uri, stream: TakeOnce::new(stream), - encoding: HttpEncoding::Zstd, // just a default, will be overwritten by .configure() content_encoding, }, sender, @@ -829,7 +827,6 @@ impl UpstreamRequest for UploadRequest { RequestKind::Upload { uri: _, stream, - encoding, content_encoding, } => { let Some(body) = RetryableStream::new(stream.clone()) else { @@ -838,13 +835,13 @@ impl UpstreamRequest for UploadRequest { }; tus::add_upload_headers(builder); - let (body, encoding) = match content_encoding { - Some(ContentEncoding::Zstd) => (body.boxed(), HttpEncoding::Zstd), - None => (encode_body(body, *encoding), *encoding), + let zstd_body = match content_encoding { + Some(ContentEncoding::Zstd) => reqwest::Body::wrap_stream(body), + None => reqwest::Body::wrap_stream(encode_body(body)), }; - builder.content_encoding(encoding); - builder.body(reqwest::Body::wrap_stream(body)); + builder.content_encoding(HttpEncoding::Zstd); + builder.body(zstd_body); } }; @@ -854,26 +851,14 @@ impl UpstreamRequest for UploadRequest { Ok(()) } - - fn configure(&mut self, config: &Config) { - if let RequestKind::Upload { encoding, .. } = &mut self.kind { - *encoding = config.http_encoding(); - } - } } -fn encode_body(stream: S, encoding: HttpEncoding) -> ByteStream +fn encode_body(stream: S) -> ByteStream where S: futures::Stream> + Send + 'static, { let reader = BufReader::new(StreamReader::new(stream)); - match encoding { - HttpEncoding::Identity => ReaderStream::new(reader).boxed(), - HttpEncoding::Deflate => ReaderStream::new(DeflateEncoder::new(reader)).boxed(), - HttpEncoding::Gzip => ReaderStream::new(GzipEncoder::new(reader)).boxed(), - HttpEncoding::Br => ReaderStream::new(BrotliEncoder::new(reader)).boxed(), - HttpEncoding::Zstd => ReaderStream::new(ZstdEncoder::new(reader)).boxed(), - } + ReaderStream::new(ZstdEncoder::new(reader)).boxed() } #[cfg(test)] From e2dbf10b07c9f17388017db67b890d3b1d6e6c8e Mon Sep 17 00:00:00 2001 From: Joris Bayer Date: Tue, 30 Jun 2026 11:24:42 +0200 Subject: [PATCH 4/5] lint --- relay-server/src/endpoints/common.rs | 37 ++++++++------- relay-server/src/endpoints/minidump.rs | 55 +++++++++++------------ relay-server/src/endpoints/playstation.rs | 11 +++-- 3 files changed, 53 insertions(+), 50 deletions(-) diff --git a/relay-server/src/endpoints/common.rs b/relay-server/src/endpoints/common.rs index 024fff00d9a..62e2d4485a7 100644 --- a/relay-server/src/endpoints/common.rs +++ b/relay-server/src/endpoints/common.rs @@ -548,9 +548,7 @@ fn emit_envelope_metrics(envelope: &Envelope) { /// Uploads the content of `field` to the objectstore and returns an [Item] with an /// [AttachmentPlaceholder] as payload. pub async fn upload_to_objectstore( - stream: S, - content_encoding: Option, - content_type: Option, + stream: StreamWithHeaders, mut item: Managed, config: &Config, project: ProjectContext, @@ -561,27 +559,26 @@ where S: futures::Stream> + Send + 'static, E: Into> + 'static, { - let res = upload_to_objectstore_inner( - stream, - content_encoding, - content_type, - &mut item, - config, - project, - upload, - referrer, - ) - .await; + let res = + upload_to_objectstore_inner(stream, &mut item, config, project, upload, referrer).await; match res { Some(()) => Ok(item), None => Err(Outcome::Invalid(DiscardReason::Internal)).reject(&item), } } +/// A stream with metadata for request submission. +pub struct StreamWithHeaders { + /// The stream of data. + pub stream: S, + /// What the data is currently compressed as. + pub content_encoding: Option, + /// Content-type header to forward. + pub content_type: Option, +} + async fn upload_to_objectstore_inner( - stream: S, - content_encoding: Option, - content_type: Option, + stream: StreamWithHeaders, item: &mut Managed, config: &Config, project: ProjectContext, @@ -592,6 +589,12 @@ where S: futures::Stream> + Send + 'static, E: Into> + 'static, { + let StreamWithHeaders { + stream, + content_encoding, + content_type, + } = stream; + let stream: BoxStream<'static, io::Result> = Box::pin(stream.map_err(io::Error::other)); let stream = MeteredStream::new(stream, referrer); let stream = BoundedStream::new(stream, 1, config.max_upload_size()); diff --git a/relay-server/src/endpoints/minidump.rs b/relay-server/src/endpoints/minidump.rs index a65e5bd544a..777c5714cbc 100644 --- a/relay-server/src/endpoints/minidump.rs +++ b/relay-server/src/endpoints/minidump.rs @@ -24,7 +24,9 @@ use tower_http::limit::RequestBodyLimitLayer; use zstd::stream::Decoder as ZstdDecoder; use crate::constants::{ITEM_NAME_BREADCRUMBS1, ITEM_NAME_BREADCRUMBS2, ITEM_NAME_EVENT}; -use crate::endpoints::common::{self, BadStoreRequest, TextResponse, upload_to_objectstore}; +use crate::endpoints::common::{ + self, BadStoreRequest, StreamWithHeaders, TextResponse, upload_to_objectstore, +}; use crate::envelope::{AttachmentType, ContentType, Envelope, Item, ItemType, Items}; use crate::extractors::{RawContentType, RequestMeta}; use crate::managed::{Managed, ManagedResult}; @@ -89,7 +91,7 @@ where let stream = stream.map_err(std::io::Error::other); let (head, stream) = utils::stream::peek_n(stream, MAGIC_PEEK).await?; let decoded = match Compression::from(&head) { - Compression::NoCompression => (stream.boxed(), None), + Compression::None => (stream.boxed(), None), Compression::Zstd => (stream.boxed(), Some(ContentEncoding::Zstd)), Compression::Gzip => (wrap_decode!(stream, GzipDecoder), None), Compression::Xz => (wrap_decode!(stream, XzDecoder), None), @@ -119,7 +121,7 @@ fn run_decoder(mut decoder: impl Read) -> std::io::Result> { /// Types of compression we support for minidump payloads. enum Compression { - NoCompression, + None, Gzip, Xz, Bzip2, @@ -137,7 +139,7 @@ impl Compression { } else if header.starts_with(ZSTD_MAGIC_HEADER) { Self::Zstd } else { - Self::NoCompression + Self::None } } } @@ -145,7 +147,7 @@ impl Compression { /// Creates a decoder based on the magic bytes in the minidump payload. fn decoder_from(minidump_data: Bytes) -> Option> { match Compression::from(&minidump_data) { - Compression::NoCompression => None, + Compression::None => None, Compression::Gzip => Some(Box::new(GzDecoder::new(Cursor::new(minidump_data)))), Compression::Xz => Some(Box::new(XzDecoder::new(Cursor::new(minidump_data)))), Compression::Bzip2 => Some(Box::new(BzDecoder::new(Cursor::new(minidump_data)))), @@ -350,21 +352,17 @@ where E: Into> + Send + 'static, { if !matches!(item.attachment_type(), Some(AttachmentType::Minidump)) { - return upload_to_objectstore( + let stream = StreamWithHeaders { stream, - None, + content_encoding: None, content_type, - item, - config, - project, - upload, - referrer, - ) - .await - .map_err(|_| BadStoreRequest::ObjectstoreUploadFailed); + }; + return upload_to_objectstore(stream, item, config, project, upload, referrer) + .await + .map_err(|_| BadStoreRequest::ObjectstoreUploadFailed); } - let (stream, encoding) = match decode_stream(stream).await { + let (stream, content_encoding) = match decode_stream(stream).await { Ok(decoded) => decoded, Err(_) => { let _ = item.reject_err(Outcome::Invalid(DiscardReason::InvalidMinidump)); @@ -372,18 +370,14 @@ where } }; - upload_to_objectstore( + let stream = StreamWithHeaders { stream, - encoding, + content_encoding, content_type, - item, - config, - project, - upload, - referrer, - ) - .await - .map_err(|_| BadStoreRequest::ObjectstoreUploadFailed) + }; + upload_to_objectstore(stream, item, config, project, upload, referrer) + .await + .map_err(|_| BadStoreRequest::ObjectstoreUploadFailed) } async fn multipart_to_items( @@ -533,14 +527,17 @@ async fn raw_minidump_to_item( if let Some(upload_context) = upload_context && matches!(upload_context.upload_minidumps, UploadDecision::Upload) { - let (stream, encoding) = decode_stream(request.into_body().into_data_stream()) + let (stream, content_encoding) = decode_stream(request.into_body().into_data_stream()) .await .map_err(|_| BadStoreRequest::InvalidMinidump)?; + let stream = StreamWithHeaders { + stream, + content_encoding, + content_type: Some(content_type.to_string()).filter(|s| !s.is_empty()), + }; item = upload_to_objectstore( stream, - encoding, - Some(content_type.to_string()).filter(|s| !s.is_empty()), item, state.config(), upload_context.project, diff --git a/relay-server/src/endpoints/playstation.rs b/relay-server/src/endpoints/playstation.rs index d44ae397bfe..cfc2b378637 100644 --- a/relay-server/src/endpoints/playstation.rs +++ b/relay-server/src/endpoints/playstation.rs @@ -13,7 +13,7 @@ use relay_system::Addr; use serde::Serialize; use tower_http::limit::RequestBodyLimitLayer; -use crate::endpoints::common::{self, BadStoreRequest, TextResponse}; +use crate::endpoints::common::{self, BadStoreRequest, StreamWithHeaders, TextResponse}; use crate::envelope::{AttachmentType, ContentType, Envelope, Item, Items}; use crate::extractors::{RawContentType, RequestMeta}; use crate::managed::{Managed, ManagedResult}; @@ -147,10 +147,13 @@ impl<'a> AttachmentStrategy for PlaystationAttachmentStrategy<'a> { match &self.upload_context { Some(upload_context) if self.infer_type(&field) != AttachmentType::Prosperodump => { let content_type = field.content_type().map(ToString::to_string); - Ok(common::upload_to_objectstore( - field, - None, + let stream = StreamWithHeaders { + stream: field, + content_encoding: None, content_type, + }; + Ok(common::upload_to_objectstore( + stream, item, config, upload_context.project.clone(), From aa3da0f5a7c77373a641aebeedc736d5ab6c618b Mon Sep 17 00:00:00 2001 From: Joris Bayer Date: Tue, 30 Jun 2026 15:43:50 +0200 Subject: [PATCH 5/5] update test --- relay-server/src/endpoints/common.rs | 4 ++ tests/integration/test_minidump.py | 103 +++++++-------------------- 2 files changed, 29 insertions(+), 78 deletions(-) diff --git a/relay-server/src/endpoints/common.rs b/relay-server/src/endpoints/common.rs index 62e2d4485a7..a2a6f3d5309 100644 --- a/relay-server/src/endpoints/common.rs +++ b/relay-server/src/endpoints/common.rs @@ -608,6 +608,7 @@ where }) .await .ok()? + .inspect_err(|e| relay_log::debug!(error = ?e)) .ok()?; let scoping = project.scoping; @@ -621,6 +622,7 @@ where content_encoding, }) .await + .inspect_err(|e| relay_log::debug!(error = ?e)) .ok()?; let location = result @@ -634,6 +636,7 @@ where "multipart item upload failed", ); }) + .inspect_err(|e| relay_log::debug!(error = ?e)) .ok()?; let location = location.into_header_value().ok()?; let location = location.to_str().ok()?; @@ -641,6 +644,7 @@ where location, content_type, }) + .inspect_err(|e| relay_log::debug!(error = ?e)) .ok()?; item.modify(|inner, records| { diff --git a/tests/integration/test_minidump.py b/tests/integration/test_minidump.py index 332cb918532..be6447db33e 100644 --- a/tests/integration/test_minidump.py +++ b/tests/integration/test_minidump.py @@ -491,17 +491,20 @@ def test_minidump_invalid_nested_formdata(mini_sentry, relay): @pytest.mark.parametrize( - "rate_limit,minidump_filename,use_objectstore", + "rate_limit,minidump_filename,use_objectstore,stream_upload", [ - (None, "minidump.dmp", True), - (None, "minidump.dmp", False), - ("attachment", "minidump.dmp", True), - ("attachment", "minidump.dmp", False), - ("transaction", "minidump.dmp", False), - (None, "minidump.dmp.gz", False), - (None, "minidump.dmp.xz", False), - (None, "minidump.dmp.bz2", False), - (None, "minidump.dmp.zst", False), + (None, "minidump.dmp", True, False), + (None, "minidump.dmp", False, False), + ("attachment", "minidump.dmp", True, False), + ("attachment", "minidump.dmp", False, False), + ("transaction", "minidump.dmp", False, False), + (None, "minidump.dmp.gz", False, False), + (None, "minidump.dmp.xz", False, False), + (None, "minidump.dmp.bz2", False, False), + (None, "minidump.dmp.bz2", True, False), + (None, "minidump.dmp.zst", False, False), + (None, "minidump.dmp.zst", True, False), + (None, "minidump.dmp.zst", True, True), ], ) def test_minidump_with_processing( @@ -513,6 +516,7 @@ def test_minidump_with_processing( minidump_filename, use_objectstore, objectstore, + stream_upload, ): dmp_path = os.path.join(os.path.dirname(__file__), "fixtures/native/minidump.dmp") with open(dmp_path, "rb") as f: @@ -533,6 +537,10 @@ def test_minidump_with_processing( project_id = 42 project_config = mini_sentry.add_full_project_config(project_id) project_config["config"]["eventRetention"] = 50000 + if stream_upload: + project_config["config"].setdefault("features", []).append( + "projects:relay-minidump-uploads" + ) options = { "processing": { @@ -595,10 +603,15 @@ def test_minidump_with_processing( assert event["exception"]["values"][0]["mechanism"]["type"] == "minidump" # Check information extracted from the minidump - assert event["timestamp"] == 1574692481.0 # 11/25/2019 @ 2:34pm (UTC) + if not stream_upload: + assert event["timestamp"] == 1574692481.0 # 11/25/2019 @ 2:34pm (UTC) # Check that the SDK name is correctly detected - assert event["sdk"]["name"] == "minidump.unknown" + assert ( + event["sdk"]["name"] == "minidump.upload" + if stream_upload + else "minidump.unknown" + ) if not use_objectstore: assert list(message["attachments"]) == [ @@ -1620,72 +1633,6 @@ def test_minidump_large_attachment_skipped_when_no_project_fetching(mini_sentry, assert envelope.items[0].payload.bytes == minidump_content -@pytest.mark.parametrize( - "magic,filename", - [ - pytest.param(b"\x1f\x8b", "minidump.dmp.gz", id="gzip"), - pytest.param(b"\xfd7zXZ\x00", "minidump.dmp.xz", id="xz"), - pytest.param(b"BZh", "minidump.dmp.bz2", id="bzip2"), - pytest.param(b"\x28\xb5\x2f\xfd", "minidump.dmp.zst", id="zstd"), - ], -) -def test_minidump_objectstore_uploads_rejects_compressed( - mini_sentry, - relay, - magic, - filename, -): - """ - When streaming a minidump to objectstore, a compressed payload should be reject - (untill objectstore or minidump can handle them). - """ - project_id = 42 - project_config = mini_sentry.add_full_project_config(project_id) - project_config["config"].setdefault("features", []).append( - "projects:relay-minidump-uploads" - ) - - relay = relay( - mini_sentry, - options={ - "outcomes": { - "emit_outcomes": True, - "batch_size": 1, - "batch_interval": 1, - }, - }, - ) - - with pytest.raises(HTTPError) as exc_info: - relay.send_minidump( - project_id=project_id, - files=[(MINIDUMP_ATTACHMENT_NAME, filename, magic + b"\x00" * 32)], - ) - - assert exc_info.value.response.status_code == 400 - - assert mini_sentry.get_aggregated_outcomes() == [ - { - "category": 1, - "outcome": 3, - "quantity": 1, - "reason": "invalid_minidump", - }, - { - "category": 4, - "outcome": 3, - "reason": "invalid_minidump", - "quantity": 1, - }, - { - "category": 22, - "outcome": 3, - "reason": "invalid_minidump", - "quantity": 1, - }, - ] - - def test_minidump_upload_failure_bubbles_up(mini_sentry, relay): project_id = 42 minidump_content = b"MDMP content"