diff --git a/Cargo.lock b/Cargo.lock index af2180e5df3..4bd370dfde4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -729,8 +729,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ce2548391e9c1929c21bf6aa2680af86fe4c1b33e6cea9ac1cfeec0bd11218cf" dependencies = [ "brotli 8.0.2", + "bzip2", "compression-core", "flate2", + "liblzma", "memchr", "zstd", "zstd-safe", diff --git a/Cargo.toml b/Cargo.toml index 5932888d8d8..21857231a0e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -90,7 +90,7 @@ android_trace_log = { version = "0.3", features = ["serde"] } # Keep it pinned until it's possible to disable backtrace. anyhow = "=1.0.69" arc-swap = "1" -async-compression = { version = "0.4", features = ["tokio", "gzip", "brotli", "deflate", "zstd"] } +async-compression = { version = "0.4", features = ["tokio", "gzip", "brotli", "deflate", "zstd", "bzip2", "xz"] } async-trait = "0.1" axum = "0.8" axum-extra = "0.12" diff --git a/relay-server/src/endpoints/common.rs b/relay-server/src/endpoints/common.rs index 00de250cb54..a2a6f3d5309 100644 --- a/relay-server/src/endpoints/common.rs +++ b/relay-server/src/endpoints/common.rs @@ -24,7 +24,7 @@ use crate::service::ServiceState; use crate::services::buffer::{ProjectKeyPair, PushError}; use crate::services::outcome::{DiscardItemType, DiscardReason, Outcome}; use crate::services::processor::{BucketSource, MetricData, ProcessMetrics}; -use crate::services::upload::{Create, ProjectContext, Stream, Upload}; +use crate::services::upload::{ContentEncoding, Create, ProjectContext, Stream, Upload}; use crate::statsd::{RelayCounters, RelayDistributions}; use crate::utils::{ self, ApiErrorResponse, BoundedStream, FormDataIter, MeteredStream, find_error_source, @@ -548,8 +548,7 @@ fn emit_envelope_metrics(envelope: &Envelope) { /// Uploads the content of `field` to the objectstore and returns an [Item] with an /// [AttachmentPlaceholder] as payload. pub async fn upload_to_objectstore( - stream: S, - content_type: Option, + stream: StreamWithHeaders, mut item: Managed, config: &Config, project: ProjectContext, @@ -560,25 +559,26 @@ where S: futures::Stream> + Send + 'static, E: Into> + 'static, { - let res = upload_to_objectstore_inner( - stream, - content_type, - &mut item, - config, - project, - upload, - referrer, - ) - .await; + let res = + upload_to_objectstore_inner(stream, &mut item, config, project, upload, referrer).await; match res { Some(()) => Ok(item), None => Err(Outcome::Invalid(DiscardReason::Internal)).reject(&item), } } +/// A stream with metadata for request submission. +pub struct StreamWithHeaders { + /// The stream of data. + pub stream: S, + /// What the data is currently compressed as. + pub content_encoding: Option, + /// Content-type header to forward. + pub content_type: Option, +} + async fn upload_to_objectstore_inner( - stream: S, - content_type: Option, + stream: StreamWithHeaders, item: &mut Managed, config: &Config, project: ProjectContext, @@ -589,6 +589,12 @@ where S: futures::Stream> + Send + 'static, E: Into> + 'static, { + let StreamWithHeaders { + stream, + content_encoding, + content_type, + } = stream; + let stream: BoxStream<'static, io::Result> = Box::pin(stream.map_err(io::Error::other)); let stream = MeteredStream::new(stream, referrer); let stream = BoundedStream::new(stream, 1, config.max_upload_size()); @@ -602,6 +608,7 @@ where }) .await .ok()? + .inspect_err(|e| relay_log::debug!(error = ?e)) .ok()?; let scoping = project.scoping; @@ -612,8 +619,10 @@ where project, location, stream, + content_encoding, }) .await + .inspect_err(|e| relay_log::debug!(error = ?e)) .ok()?; let location = result @@ -627,6 +636,7 @@ where "multipart item upload failed", ); }) + .inspect_err(|e| relay_log::debug!(error = ?e)) .ok()?; let location = location.into_header_value().ok()?; let location = location.to_str().ok()?; @@ -634,6 +644,7 @@ where location, content_type, }) + .inspect_err(|e| relay_log::debug!(error = ?e)) .ok()?; item.modify(|inner, records| { diff --git a/relay-server/src/endpoints/minidump.rs b/relay-server/src/endpoints/minidump.rs index f8bc8c290cc..777c5714cbc 100644 --- a/relay-server/src/endpoints/minidump.rs +++ b/relay-server/src/endpoints/minidump.rs @@ -5,7 +5,7 @@ use axum::routing::{MethodRouter, post}; use bytes::Bytes; use bzip2::read::BzDecoder; use flate2::read::GzDecoder; -use futures::{self, Stream}; +use futures::{self, Stream, StreamExt, TryStreamExt}; use liblzma::read::XzDecoder; use multer::{Field, Multipart}; use relay_config::Config; @@ -18,11 +18,15 @@ use std::convert::Infallible; use std::error::Error; use std::io::Cursor; use std::io::Read; +use tokio::io::BufReader; +use tokio_util::io::{ReaderStream, StreamReader}; use tower_http::limit::RequestBodyLimitLayer; use zstd::stream::Decoder as ZstdDecoder; use crate::constants::{ITEM_NAME_BREADCRUMBS1, ITEM_NAME_BREADCRUMBS2, ITEM_NAME_EVENT}; -use crate::endpoints::common::{self, BadStoreRequest, TextResponse, upload_to_objectstore}; +use crate::endpoints::common::{ + self, BadStoreRequest, StreamWithHeaders, TextResponse, upload_to_objectstore, +}; use crate::envelope::{AttachmentType, ContentType, Envelope, Item, ItemType, Items}; use crate::extractors::{RawContentType, RequestMeta}; use crate::managed::{Managed, ManagedResult}; @@ -30,7 +34,7 @@ use crate::middlewares; use crate::service::ServiceState; use crate::services::outcome::{DiscardAttachmentType, DiscardItemType, DiscardReason, Outcome}; use crate::services::projects::project::ProjectState; -use crate::services::upload::{ProjectContext, Upload}; +use crate::services::upload::{ByteStream, ContentEncoding, ProjectContext, Upload}; use crate::statsd::RelayCounters; use crate::utils::{self, AttachmentStrategy, read_bytes_into_item}; @@ -69,34 +73,31 @@ const MAGIC_PEEK: usize = 6; /// Content types by which standalone uploads can be recognized. const MINIDUMP_RAW_CONTENT_TYPES: &[&str] = &["application/octet-stream", "application/x-dmp"]; -#[derive(Debug, thiserror::Error)] -enum PeekError { - #[error("compressed minidump payloads are not supported for streaming upload")] - Compressed, - #[error(transparent)] - Source(#[from] E), +macro_rules! wrap_decode { + ($stream:expr, $decoder:ident) => {{ ReaderStream::new($decoder::new(BufReader::new(StreamReader::new($stream)))).boxed() }}; } -/// Peek the first bytes of `stream` and reject if they look compressed (gzip/xz/bzip2/zstd). -/// Returns the original stream contents if not. -async fn reject_if_compressed( - stream: S, -) -> Result> + Send, PeekError> +/// Peek the first bytes of `stream` and returns a decoding wrapper if necessary. +/// +/// Returns raw minidump bytes if the stream is uncompressed, otherwise decompresses +/// one of the minidump container formats we support for inline uploads. +async fn decode_stream(stream: S) -> std::io::Result<(ByteStream, Option)> where - S: Stream> + Send, - E: Send, + S: Stream> + Send + 'static, + E: Into> + Send + 'static, { - let (head, stream) = utils::stream::peek_n(stream, MAGIC_PEEK).await?; + use async_compression::tokio::bufread::{BzDecoder, GzipDecoder, XzDecoder}; - if head.starts_with(GZIP_MAGIC_HEADER) - || head.starts_with(XZ_MAGIC_HEADER) - || head.starts_with(BZIP2_MAGIC_HEADER) - || head.starts_with(ZSTD_MAGIC_HEADER) - { - Err(PeekError::Compressed) - } else { - Ok(stream) - } + let stream = stream.map_err(std::io::Error::other); + let (head, stream) = utils::stream::peek_n(stream, MAGIC_PEEK).await?; + let decoded = match Compression::from(&head) { + Compression::None => (stream.boxed(), None), + Compression::Zstd => (stream.boxed(), Some(ContentEncoding::Zstd)), + Compression::Gzip => (wrap_decode!(stream, GzipDecoder), None), + Compression::Xz => (wrap_decode!(stream, XzDecoder), None), + Compression::Bzip2 => (wrap_decode!(stream, BzDecoder), None), + }; + Ok(decoded) } fn validate_minidump(data: &[u8]) -> Result<(), BadStoreRequest> { @@ -118,25 +119,46 @@ fn run_decoder(mut decoder: impl Read) -> std::io::Result> { Ok(buffer) } -/// Creates a decoder based on the magic bytes the minidump payload +/// Types of compression we support for minidump payloads. +enum Compression { + None, + Gzip, + Xz, + Bzip2, + Zstd, +} + +impl Compression { + fn from(header: &[u8]) -> Self { + if header.starts_with(GZIP_MAGIC_HEADER) { + Self::Gzip + } else if header.starts_with(XZ_MAGIC_HEADER) { + Self::Xz + } else if header.starts_with(BZIP2_MAGIC_HEADER) { + Self::Bzip2 + } else if header.starts_with(ZSTD_MAGIC_HEADER) { + Self::Zstd + } else { + Self::None + } + } +} + +/// Creates a decoder based on the magic bytes in the minidump payload. fn decoder_from(minidump_data: Bytes) -> Option> { - if minidump_data.starts_with(GZIP_MAGIC_HEADER) { - return Some(Box::new(GzDecoder::new(Cursor::new(minidump_data)))); - } else if minidump_data.starts_with(XZ_MAGIC_HEADER) { - return Some(Box::new(XzDecoder::new(Cursor::new(minidump_data)))); - } else if minidump_data.starts_with(BZIP2_MAGIC_HEADER) { - return Some(Box::new(BzDecoder::new(Cursor::new(minidump_data)))); - } else if minidump_data.starts_with(ZSTD_MAGIC_HEADER) { - return match ZstdDecoder::new(Cursor::new(minidump_data)) { + match Compression::from(&minidump_data) { + Compression::None => None, + Compression::Gzip => Some(Box::new(GzDecoder::new(Cursor::new(minidump_data)))), + Compression::Xz => Some(Box::new(XzDecoder::new(Cursor::new(minidump_data)))), + Compression::Bzip2 => Some(Box::new(BzDecoder::new(Cursor::new(minidump_data)))), + Compression::Zstd => match ZstdDecoder::new(Cursor::new(minidump_data)) { Ok(decoder) => Some(Box::new(decoder)), Err(ref err) => { relay_log::error!(error = err as &dyn Error, "failed to create ZstdDecoder"); None } - }; + }, } - - None } /// Tries to decode a minidump using any of the supported compression formats @@ -330,38 +352,32 @@ where E: Into> + Send + 'static, { if !matches!(item.attachment_type(), Some(AttachmentType::Minidump)) { - return upload_to_objectstore( + let stream = StreamWithHeaders { stream, + content_encoding: None, content_type, - item, - config, - project, - upload, - referrer, - ) - .await - .map_err(|_| BadStoreRequest::ObjectstoreUploadFailed); + }; + return upload_to_objectstore(stream, item, config, project, upload, referrer) + .await + .map_err(|_| BadStoreRequest::ObjectstoreUploadFailed); } - let stream = match reject_if_compressed(stream).await { - Ok(stream) => stream, + let (stream, content_encoding) = match decode_stream(stream).await { + Ok(decoded) => decoded, Err(_) => { let _ = item.reject_err(Outcome::Invalid(DiscardReason::InvalidMinidump)); return Err(BadStoreRequest::InvalidMinidump); } }; - upload_to_objectstore( + let stream = StreamWithHeaders { stream, + content_encoding, content_type, - item, - config, - project, - upload, - referrer, - ) - .await - .map_err(|_| BadStoreRequest::ObjectstoreUploadFailed) + }; + upload_to_objectstore(stream, item, config, project, upload, referrer) + .await + .map_err(|_| BadStoreRequest::ObjectstoreUploadFailed) } async fn multipart_to_items( @@ -511,13 +527,17 @@ async fn raw_minidump_to_item( if let Some(upload_context) = upload_context && matches!(upload_context.upload_minidumps, UploadDecision::Upload) { - let stream = reject_if_compressed(request.into_body().into_data_stream()) + let (stream, content_encoding) = decode_stream(request.into_body().into_data_stream()) .await .map_err(|_| BadStoreRequest::InvalidMinidump)?; + let stream = StreamWithHeaders { + stream, + content_encoding, + content_type: Some(content_type.to_string()).filter(|s| !s.is_empty()), + }; item = upload_to_objectstore( stream, - Some(content_type.to_string()).filter(|s| !s.is_empty()), item, state.config(), upload_context.project, diff --git a/relay-server/src/endpoints/playstation.rs b/relay-server/src/endpoints/playstation.rs index d888b6d8da2..cfc2b378637 100644 --- a/relay-server/src/endpoints/playstation.rs +++ b/relay-server/src/endpoints/playstation.rs @@ -13,7 +13,7 @@ use relay_system::Addr; use serde::Serialize; use tower_http::limit::RequestBodyLimitLayer; -use crate::endpoints::common::{self, BadStoreRequest, TextResponse}; +use crate::endpoints::common::{self, BadStoreRequest, StreamWithHeaders, TextResponse}; use crate::envelope::{AttachmentType, ContentType, Envelope, Item, Items}; use crate::extractors::{RawContentType, RequestMeta}; use crate::managed::{Managed, ManagedResult}; @@ -147,9 +147,13 @@ impl<'a> AttachmentStrategy for PlaystationAttachmentStrategy<'a> { match &self.upload_context { Some(upload_context) if self.infer_type(&field) != AttachmentType::Prosperodump => { let content_type = field.content_type().map(ToString::to_string); - Ok(common::upload_to_objectstore( - field, + let stream = StreamWithHeaders { + stream: field, + content_encoding: None, content_type, + }; + Ok(common::upload_to_objectstore( + stream, item, config, upload_context.project.clone(), diff --git a/relay-server/src/endpoints/upload.rs b/relay-server/src/endpoints/upload.rs index 021a7bf51c9..16e7252b788 100644 --- a/relay-server/src/endpoints/upload.rs +++ b/relay-server/src/endpoints/upload.rs @@ -326,6 +326,7 @@ async fn upload( project, location, stream, + content_encoding: None, // NOTE: we could pass through zstd here if we can skip `RequestDecompressionLayer`. }) .await??; diff --git a/relay-server/src/services/upload.rs b/relay-server/src/services/upload.rs index 79f320b245c..c26bfec41a5 100644 --- a/relay-server/src/services/upload.rs +++ b/relay-server/src/services/upload.rs @@ -6,7 +6,7 @@ use std::fmt; use std::sync::Arc; use std::time::Duration; -use async_compression::tokio::bufread::{BrotliEncoder, DeflateEncoder, GzipEncoder, ZstdEncoder}; +use async_compression::tokio::bufread::ZstdEncoder; use bytes::Bytes; use chrono::DateTime; use chrono::Utc; @@ -146,6 +146,18 @@ pub struct Stream { pub location: SignedLocation, /// The body to be uploaded to objectstore, with length validation. pub stream: BoundedStream>, + /// The content encoding of the stream. + /// + /// If `Some`, the service treats the stream as already encoded. + /// If `None`, the service will apply zstd compression to the stream while uploading. + pub content_encoding: Option, +} + +/// Type of compression that both Relay and Objectstore support. +/// +/// This can be used to pass compressed streams through to Objectstore without decoding. +pub enum ContentEncoding { + Zstd, } impl FromMessage for Upload { @@ -293,10 +305,16 @@ impl Service { project, location, stream, + content_encoding, } = stream; match &self.backend { Backend::Upstream { addr } => { - let (request, rx) = UploadRequest::upload(project, location.try_to_uri()?, stream); + let (request, rx) = UploadRequest::upload( + project, + location.try_to_uri()?, + stream, + content_encoding, + ); addr.send(SendRequest(request)); let response = rx.await??; SignedLocation::try_from_response(response) @@ -676,7 +694,7 @@ enum RequestKind { Upload { uri: String, stream: TakeOnce>>, - encoding: HttpEncoding, + content_encoding: Option, }, } @@ -715,6 +733,7 @@ impl UploadRequest { project: ProjectContext, uri: String, stream: BoundedStream>, + content_encoding: Option, ) -> ( Self, oneshot::Receiver>, @@ -726,7 +745,7 @@ impl UploadRequest { kind: RequestKind::Upload { uri, stream: TakeOnce::new(stream), - encoding: HttpEncoding::Zstd, // just a default, will be overwritten by .configure() + content_encoding, }, sender, }, @@ -808,7 +827,7 @@ impl UpstreamRequest for UploadRequest { RequestKind::Upload { uri: _, stream, - encoding, + content_encoding, } => { let Some(body) = RetryableStream::new(stream.clone()) else { relay_log::error!("upload request stream was already consumed"); @@ -816,10 +835,13 @@ impl UpstreamRequest for UploadRequest { }; tus::add_upload_headers(builder); - let body = encode_body(body, *encoding); - builder.content_encoding(*encoding); + let zstd_body = match content_encoding { + Some(ContentEncoding::Zstd) => reqwest::Body::wrap_stream(body), + None => reqwest::Body::wrap_stream(encode_body(body)), + }; - builder.body(reqwest::Body::wrap_stream(body)); + builder.content_encoding(HttpEncoding::Zstd); + builder.body(zstd_body); } }; @@ -829,26 +851,14 @@ impl UpstreamRequest for UploadRequest { Ok(()) } - - fn configure(&mut self, config: &Config) { - if let RequestKind::Upload { encoding, .. } = &mut self.kind { - *encoding = config.http_encoding(); - } - } } -fn encode_body(stream: S, encoding: HttpEncoding) -> ByteStream +fn encode_body(stream: S) -> ByteStream where S: futures::Stream> + Send + 'static, { let reader = BufReader::new(StreamReader::new(stream)); - match encoding { - HttpEncoding::Identity => ReaderStream::new(reader).boxed(), - HttpEncoding::Deflate => ReaderStream::new(DeflateEncoder::new(reader)).boxed(), - HttpEncoding::Gzip => ReaderStream::new(GzipEncoder::new(reader)).boxed(), - HttpEncoding::Br => ReaderStream::new(BrotliEncoder::new(reader)).boxed(), - HttpEncoding::Zstd => ReaderStream::new(ZstdEncoder::new(reader)).boxed(), - } + ReaderStream::new(ZstdEncoder::new(reader)).boxed() } #[cfg(test)] diff --git a/tests/integration/test_minidump.py b/tests/integration/test_minidump.py index 332cb918532..be6447db33e 100644 --- a/tests/integration/test_minidump.py +++ b/tests/integration/test_minidump.py @@ -491,17 +491,20 @@ def test_minidump_invalid_nested_formdata(mini_sentry, relay): @pytest.mark.parametrize( - "rate_limit,minidump_filename,use_objectstore", + "rate_limit,minidump_filename,use_objectstore,stream_upload", [ - (None, "minidump.dmp", True), - (None, "minidump.dmp", False), - ("attachment", "minidump.dmp", True), - ("attachment", "minidump.dmp", False), - ("transaction", "minidump.dmp", False), - (None, "minidump.dmp.gz", False), - (None, "minidump.dmp.xz", False), - (None, "minidump.dmp.bz2", False), - (None, "minidump.dmp.zst", False), + (None, "minidump.dmp", True, False), + (None, "minidump.dmp", False, False), + ("attachment", "minidump.dmp", True, False), + ("attachment", "minidump.dmp", False, False), + ("transaction", "minidump.dmp", False, False), + (None, "minidump.dmp.gz", False, False), + (None, "minidump.dmp.xz", False, False), + (None, "minidump.dmp.bz2", False, False), + (None, "minidump.dmp.bz2", True, False), + (None, "minidump.dmp.zst", False, False), + (None, "minidump.dmp.zst", True, False), + (None, "minidump.dmp.zst", True, True), ], ) def test_minidump_with_processing( @@ -513,6 +516,7 @@ def test_minidump_with_processing( minidump_filename, use_objectstore, objectstore, + stream_upload, ): dmp_path = os.path.join(os.path.dirname(__file__), "fixtures/native/minidump.dmp") with open(dmp_path, "rb") as f: @@ -533,6 +537,10 @@ def test_minidump_with_processing( project_id = 42 project_config = mini_sentry.add_full_project_config(project_id) project_config["config"]["eventRetention"] = 50000 + if stream_upload: + project_config["config"].setdefault("features", []).append( + "projects:relay-minidump-uploads" + ) options = { "processing": { @@ -595,10 +603,15 @@ def test_minidump_with_processing( assert event["exception"]["values"][0]["mechanism"]["type"] == "minidump" # Check information extracted from the minidump - assert event["timestamp"] == 1574692481.0 # 11/25/2019 @ 2:34pm (UTC) + if not stream_upload: + assert event["timestamp"] == 1574692481.0 # 11/25/2019 @ 2:34pm (UTC) # Check that the SDK name is correctly detected - assert event["sdk"]["name"] == "minidump.unknown" + assert ( + event["sdk"]["name"] == "minidump.upload" + if stream_upload + else "minidump.unknown" + ) if not use_objectstore: assert list(message["attachments"]) == [ @@ -1620,72 +1633,6 @@ def test_minidump_large_attachment_skipped_when_no_project_fetching(mini_sentry, assert envelope.items[0].payload.bytes == minidump_content -@pytest.mark.parametrize( - "magic,filename", - [ - pytest.param(b"\x1f\x8b", "minidump.dmp.gz", id="gzip"), - pytest.param(b"\xfd7zXZ\x00", "minidump.dmp.xz", id="xz"), - pytest.param(b"BZh", "minidump.dmp.bz2", id="bzip2"), - pytest.param(b"\x28\xb5\x2f\xfd", "minidump.dmp.zst", id="zstd"), - ], -) -def test_minidump_objectstore_uploads_rejects_compressed( - mini_sentry, - relay, - magic, - filename, -): - """ - When streaming a minidump to objectstore, a compressed payload should be reject - (untill objectstore or minidump can handle them). - """ - project_id = 42 - project_config = mini_sentry.add_full_project_config(project_id) - project_config["config"].setdefault("features", []).append( - "projects:relay-minidump-uploads" - ) - - relay = relay( - mini_sentry, - options={ - "outcomes": { - "emit_outcomes": True, - "batch_size": 1, - "batch_interval": 1, - }, - }, - ) - - with pytest.raises(HTTPError) as exc_info: - relay.send_minidump( - project_id=project_id, - files=[(MINIDUMP_ATTACHMENT_NAME, filename, magic + b"\x00" * 32)], - ) - - assert exc_info.value.response.status_code == 400 - - assert mini_sentry.get_aggregated_outcomes() == [ - { - "category": 1, - "outcome": 3, - "quantity": 1, - "reason": "invalid_minidump", - }, - { - "category": 4, - "outcome": 3, - "reason": "invalid_minidump", - "quantity": 1, - }, - { - "category": 22, - "outcome": 3, - "reason": "invalid_minidump", - "quantity": 1, - }, - ] - - def test_minidump_upload_failure_bubbles_up(mini_sentry, relay): project_id = 42 minidump_content = b"MDMP content"