Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -729,8 +729,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ce2548391e9c1929c21bf6aa2680af86fe4c1b33e6cea9ac1cfeec0bd11218cf"
dependencies = [
"brotli 8.0.2",
"bzip2",
"compression-core",
"flate2",
"liblzma",
"memchr",
"zstd",
"zstd-safe",
Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ android_trace_log = { version = "0.3", features = ["serde"] }
# Keep it pinned until it's possible to disable backtrace.
anyhow = "=1.0.69"
arc-swap = "1"
async-compression = { version = "0.4", features = ["tokio", "gzip", "brotli", "deflate", "zstd"] }
async-compression = { version = "0.4", features = ["tokio", "gzip", "brotli", "deflate", "zstd", "bzip2", "xz"] }
async-trait = "0.1"
axum = "0.8"
axum-extra = "0.12"
Expand Down
41 changes: 26 additions & 15 deletions relay-server/src/endpoints/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use crate::service::ServiceState;
use crate::services::buffer::{ProjectKeyPair, PushError};
use crate::services::outcome::{DiscardItemType, DiscardReason, Outcome};
use crate::services::processor::{BucketSource, MetricData, ProcessMetrics};
use crate::services::upload::{Create, ProjectContext, Stream, Upload};
use crate::services::upload::{ContentEncoding, Create, ProjectContext, Stream, Upload};
use crate::statsd::{RelayCounters, RelayDistributions};
use crate::utils::{
self, ApiErrorResponse, BoundedStream, FormDataIter, MeteredStream, find_error_source,
Expand Down Expand Up @@ -548,8 +548,7 @@ fn emit_envelope_metrics(envelope: &Envelope) {
/// Uploads the content of `field` to the objectstore and returns an [Item] with an
/// [AttachmentPlaceholder] as payload.
pub async fn upload_to_objectstore<S, E>(
stream: S,
content_type: Option<String>,
stream: StreamWithHeaders<S>,
mut item: Managed<Item>,
config: &Config,
project: ProjectContext,
Expand All @@ -560,25 +559,26 @@ where
S: futures::Stream<Item = Result<Bytes, E>> + Send + 'static,
E: Into<Box<dyn std::error::Error + Send + Sync>> + 'static,
{
let res = upload_to_objectstore_inner(
stream,
content_type,
&mut item,
config,
project,
upload,
referrer,
)
.await;
let res =
upload_to_objectstore_inner(stream, &mut item, config, project, upload, referrer).await;
match res {
Some(()) => Ok(item),
None => Err(Outcome::Invalid(DiscardReason::Internal)).reject(&item),
}
}

/// A stream with metadata for request submission.
pub struct StreamWithHeaders<S> {
/// The stream of data.
pub stream: S,
/// What the data is currently compressed as.
pub content_encoding: Option<ContentEncoding>,
/// Content-type header to forward.
pub content_type: Option<String>,
}

async fn upload_to_objectstore_inner<S, E>(
stream: S,
content_type: Option<String>,
stream: StreamWithHeaders<S>,
item: &mut Managed<Item>,
config: &Config,
project: ProjectContext,
Expand All @@ -589,6 +589,12 @@ where
S: futures::Stream<Item = Result<Bytes, E>> + Send + 'static,
E: Into<Box<dyn std::error::Error + Send + Sync>> + 'static,
{
let StreamWithHeaders {
stream,
content_encoding,
content_type,
} = stream;

let stream: BoxStream<'static, io::Result<Bytes>> = Box::pin(stream.map_err(io::Error::other));
let stream = MeteredStream::new(stream, referrer);
let stream = BoundedStream::new(stream, 1, config.max_upload_size());
Expand All @@ -602,6 +608,7 @@ where
})
.await
.ok()?
.inspect_err(|e| relay_log::debug!(error = ?e))
.ok()?;

let scoping = project.scoping;
Expand All @@ -612,8 +619,10 @@ where
project,
location,
stream,
content_encoding,
})
.await
.inspect_err(|e| relay_log::debug!(error = ?e))
.ok()?;

let location = result
Expand All @@ -627,13 +636,15 @@ where
"multipart item upload failed",
);
})
.inspect_err(|e| relay_log::debug!(error = ?e))
.ok()?;
let location = location.into_header_value().ok()?;
let location = location.to_str().ok()?;
let placeholder = serde_json::to_vec(&AttachmentPlaceholder {
location,
content_type,
})
.inspect_err(|e| relay_log::debug!(error = ?e))
.ok()?;

item.modify(|inner, records| {
Expand Down
140 changes: 80 additions & 60 deletions relay-server/src/endpoints/minidump.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use axum::routing::{MethodRouter, post};
use bytes::Bytes;
use bzip2::read::BzDecoder;
use flate2::read::GzDecoder;
use futures::{self, Stream};
use futures::{self, Stream, StreamExt, TryStreamExt};
use liblzma::read::XzDecoder;
use multer::{Field, Multipart};
use relay_config::Config;
Expand All @@ -18,19 +18,23 @@ use std::convert::Infallible;
use std::error::Error;
use std::io::Cursor;
use std::io::Read;
use tokio::io::BufReader;
use tokio_util::io::{ReaderStream, StreamReader};
use tower_http::limit::RequestBodyLimitLayer;
use zstd::stream::Decoder as ZstdDecoder;

use crate::constants::{ITEM_NAME_BREADCRUMBS1, ITEM_NAME_BREADCRUMBS2, ITEM_NAME_EVENT};
use crate::endpoints::common::{self, BadStoreRequest, TextResponse, upload_to_objectstore};
use crate::endpoints::common::{
self, BadStoreRequest, StreamWithHeaders, TextResponse, upload_to_objectstore,
};
use crate::envelope::{AttachmentType, ContentType, Envelope, Item, ItemType, Items};
use crate::extractors::{RawContentType, RequestMeta};
use crate::managed::{Managed, ManagedResult};
use crate::middlewares;
use crate::service::ServiceState;
use crate::services::outcome::{DiscardAttachmentType, DiscardItemType, DiscardReason, Outcome};
use crate::services::projects::project::ProjectState;
use crate::services::upload::{ProjectContext, Upload};
use crate::services::upload::{ByteStream, ContentEncoding, ProjectContext, Upload};
use crate::statsd::RelayCounters;
use crate::utils::{self, AttachmentStrategy, read_bytes_into_item};

Expand Down Expand Up @@ -69,34 +73,31 @@ const MAGIC_PEEK: usize = 6;
/// Content types by which standalone uploads can be recognized.
const MINIDUMP_RAW_CONTENT_TYPES: &[&str] = &["application/octet-stream", "application/x-dmp"];

#[derive(Debug, thiserror::Error)]
enum PeekError<E> {
#[error("compressed minidump payloads are not supported for streaming upload")]
Compressed,
#[error(transparent)]
Source(#[from] E),
macro_rules! wrap_decode {
($stream:expr, $decoder:ident) => {{ ReaderStream::new($decoder::new(BufReader::new(StreamReader::new($stream)))).boxed() }};
}

/// Peek the first bytes of `stream` and reject if they look compressed (gzip/xz/bzip2/zstd).
/// Returns the original stream contents if not.
async fn reject_if_compressed<S, E>(
stream: S,
) -> Result<impl Stream<Item = Result<Bytes, E>> + Send, PeekError<E>>
/// Peek the first bytes of `stream` and returns a decoding wrapper if necessary.
///
/// Returns raw minidump bytes if the stream is uncompressed, otherwise decompresses
/// one of the minidump container formats we support for inline uploads.
async fn decode_stream<S, E>(stream: S) -> std::io::Result<(ByteStream, Option<ContentEncoding>)>
where
S: Stream<Item = Result<Bytes, E>> + Send,
E: Send,
S: Stream<Item = Result<Bytes, E>> + Send + 'static,
E: Into<Box<dyn Error + Send + Sync>> + Send + 'static,
{
let (head, stream) = utils::stream::peek_n(stream, MAGIC_PEEK).await?;
use async_compression::tokio::bufread::{BzDecoder, GzipDecoder, XzDecoder};

if head.starts_with(GZIP_MAGIC_HEADER)
|| head.starts_with(XZ_MAGIC_HEADER)
|| head.starts_with(BZIP2_MAGIC_HEADER)
|| head.starts_with(ZSTD_MAGIC_HEADER)
{
Err(PeekError::Compressed)
} else {
Ok(stream)
}
let stream = stream.map_err(std::io::Error::other);
let (head, stream) = utils::stream::peek_n(stream, MAGIC_PEEK).await?;
let decoded = match Compression::from(&head) {
Compression::None => (stream.boxed(), None),
Compression::Zstd => (stream.boxed(), Some(ContentEncoding::Zstd)),
Compression::Gzip => (wrap_decode!(stream, GzipDecoder), None),
Compression::Xz => (wrap_decode!(stream, XzDecoder), None),
Compression::Bzip2 => (wrap_decode!(stream, BzDecoder), None),
};
Ok(decoded)
}

fn validate_minidump(data: &[u8]) -> Result<(), BadStoreRequest> {
Expand All @@ -118,25 +119,46 @@ fn run_decoder(mut decoder: impl Read) -> std::io::Result<Vec<u8>> {
Ok(buffer)
}

/// Creates a decoder based on the magic bytes the minidump payload
/// Types of compression we support for minidump payloads.
enum Compression {
None,
Gzip,
Xz,
Bzip2,
Zstd,
}

impl Compression {
fn from(header: &[u8]) -> Self {
if header.starts_with(GZIP_MAGIC_HEADER) {
Self::Gzip
} else if header.starts_with(XZ_MAGIC_HEADER) {
Self::Xz
} else if header.starts_with(BZIP2_MAGIC_HEADER) {
Self::Bzip2
} else if header.starts_with(ZSTD_MAGIC_HEADER) {
Self::Zstd
} else {
Self::None
}
}
}

/// Creates a decoder based on the magic bytes in the minidump payload.
fn decoder_from(minidump_data: Bytes) -> Option<Box<dyn Read>> {
if minidump_data.starts_with(GZIP_MAGIC_HEADER) {
return Some(Box::new(GzDecoder::new(Cursor::new(minidump_data))));
} else if minidump_data.starts_with(XZ_MAGIC_HEADER) {
return Some(Box::new(XzDecoder::new(Cursor::new(minidump_data))));
} else if minidump_data.starts_with(BZIP2_MAGIC_HEADER) {
return Some(Box::new(BzDecoder::new(Cursor::new(minidump_data))));
} else if minidump_data.starts_with(ZSTD_MAGIC_HEADER) {
return match ZstdDecoder::new(Cursor::new(minidump_data)) {
match Compression::from(&minidump_data) {
Compression::None => None,
Compression::Gzip => Some(Box::new(GzDecoder::new(Cursor::new(minidump_data)))),
Compression::Xz => Some(Box::new(XzDecoder::new(Cursor::new(minidump_data)))),
Compression::Bzip2 => Some(Box::new(BzDecoder::new(Cursor::new(minidump_data)))),
Compression::Zstd => match ZstdDecoder::new(Cursor::new(minidump_data)) {
Ok(decoder) => Some(Box::new(decoder)),
Err(ref err) => {
relay_log::error!(error = err as &dyn Error, "failed to create ZstdDecoder");
None
}
};
},
}

None
}

/// Tries to decode a minidump using any of the supported compression formats
Expand Down Expand Up @@ -330,38 +352,32 @@ where
E: Into<Box<dyn std::error::Error + Send + Sync>> + Send + 'static,
{
if !matches!(item.attachment_type(), Some(AttachmentType::Minidump)) {
return upload_to_objectstore(
let stream = StreamWithHeaders {
stream,
content_encoding: None,
content_type,
item,
config,
project,
upload,
referrer,
)
.await
.map_err(|_| BadStoreRequest::ObjectstoreUploadFailed);
};
return upload_to_objectstore(stream, item, config, project, upload, referrer)
.await
.map_err(|_| BadStoreRequest::ObjectstoreUploadFailed);
}

let stream = match reject_if_compressed(stream).await {
Ok(stream) => stream,
let (stream, content_encoding) = match decode_stream(stream).await {
Ok(decoded) => decoded,
Err(_) => {
let _ = item.reject_err(Outcome::Invalid(DiscardReason::InvalidMinidump));
return Err(BadStoreRequest::InvalidMinidump);
}
};

upload_to_objectstore(
let stream = StreamWithHeaders {
stream,
content_encoding,
content_type,
item,
config,
project,
upload,
referrer,
)
.await
.map_err(|_| BadStoreRequest::ObjectstoreUploadFailed)
};
upload_to_objectstore(stream, item, config, project, upload, referrer)
.await
.map_err(|_| BadStoreRequest::ObjectstoreUploadFailed)
}

async fn multipart_to_items(
Expand Down Expand Up @@ -511,13 +527,17 @@ async fn raw_minidump_to_item(
if let Some(upload_context) = upload_context
&& matches!(upload_context.upload_minidumps, UploadDecision::Upload)
{
let stream = reject_if_compressed(request.into_body().into_data_stream())
let (stream, content_encoding) = decode_stream(request.into_body().into_data_stream())
.await
.map_err(|_| BadStoreRequest::InvalidMinidump)?;

let stream = StreamWithHeaders {
stream,
content_encoding,
content_type: Some(content_type.to_string()).filter(|s| !s.is_empty()),
};
item = upload_to_objectstore(
stream,
Some(content_type.to_string()).filter(|s| !s.is_empty()),
item,
state.config(),
upload_context.project,
Expand Down
10 changes: 7 additions & 3 deletions relay-server/src/endpoints/playstation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use relay_system::Addr;
use serde::Serialize;
use tower_http::limit::RequestBodyLimitLayer;

use crate::endpoints::common::{self, BadStoreRequest, TextResponse};
use crate::endpoints::common::{self, BadStoreRequest, StreamWithHeaders, TextResponse};
use crate::envelope::{AttachmentType, ContentType, Envelope, Item, Items};
use crate::extractors::{RawContentType, RequestMeta};
use crate::managed::{Managed, ManagedResult};
Expand Down Expand Up @@ -147,9 +147,13 @@ impl<'a> AttachmentStrategy for PlaystationAttachmentStrategy<'a> {
match &self.upload_context {
Some(upload_context) if self.infer_type(&field) != AttachmentType::Prosperodump => {
let content_type = field.content_type().map(ToString::to_string);
Ok(common::upload_to_objectstore(
field,
let stream = StreamWithHeaders {
stream: field,
content_encoding: None,
content_type,
};
Ok(common::upload_to_objectstore(
stream,
item,
config,
upload_context.project.clone(),
Expand Down
1 change: 1 addition & 0 deletions relay-server/src/endpoints/upload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,7 @@ async fn upload(
project,
location,
stream,
content_encoding: None, // NOTE: we could pass through zstd here if we can skip `RequestDecompressionLayer`.
})
.await??;

Expand Down
Loading
Loading