Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 28 additions & 51 deletions src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use crate::decompress::{DecompressResult, Decompressor, DecompressorState};
use rayon::prelude::*;
use std::cmp::min;
use std::io::{self, Read, Write};
use std::mem::MaybeUninit;

/// A streaming encoder that compresses data using the DEFLATE algorithm.
///
Expand Down Expand Up @@ -63,32 +62,25 @@ impl<W: Write + Send> DeflateEncoder<W> {
let compressor = &mut self.compressors[0];
let output = &mut self.output_buffers[0];
let bound = Compressor::deflate_compress_bound(chunk.len());
if output.len() < bound {
output
.try_reserve(bound - output.len())
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
// SAFETY: We just reserved sufficient capacity. The compressor writes to
// the buffer using `MaybeUninit` pointers, so uninitialized memory is fine.
unsafe {
output.set_len(bound);
}
}
output.clear();
output
.try_reserve(bound)
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;

let mode = if final_block {
crate::compress::FlushMode::Finish
} else {
crate::compress::FlushMode::Sync
};
let out_uninit = unsafe {
std::slice::from_raw_parts_mut(
output.as_mut_ptr() as *mut MaybeUninit<u8>,
output.len(),
)
};

let out_uninit = output.spare_capacity_mut();
let (res, size, _) = compressor.compress(chunk, out_uninit, mode);
if res == CompressResult::Success {
unsafe {
output.set_len(size);
}
if let Some(writer) = &mut self.writer {
writer.write_all(&output[..size])?;
writer.write_all(output)?;
}
} else {
return Err(io::Error::new(io::ErrorKind::Other, "Compression failed"));
Expand All @@ -101,30 +93,23 @@ impl<W: Write + Send> DeflateEncoder<W> {
.enumerate()
.map(|(i, ((&chunk, compressor), output))| {
let bound = Compressor::deflate_compress_bound(chunk.len());
if output.len() < bound {
output
.try_reserve(bound - output.len())
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
// SAFETY: We just reserved sufficient capacity. The compressor writes to
// the buffer using `MaybeUninit` pointers, so uninitialized memory is fine.
unsafe {
output.set_len(bound);
}
}
output.clear();
output
.try_reserve(bound)
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;

let mode = if final_block && i == num_chunks - 1 {
crate::compress::FlushMode::Finish
} else {
crate::compress::FlushMode::Sync
};
let out_uninit = unsafe {
std::slice::from_raw_parts_mut(
output.as_mut_ptr() as *mut MaybeUninit<u8>,
output.len(),
)
};

let out_uninit = output.spare_capacity_mut();
let (res, size, _) = compressor.compress(chunk, out_uninit, mode);
if res == CompressResult::Success {
unsafe {
output.set_len(size);
}
Ok(size)
} else {
Err(io::Error::new(io::ErrorKind::Other, "Compression failed"))
Expand All @@ -150,32 +135,24 @@ impl<W: Write + Send> DeflateEncoder<W> {
let compressor = &mut self.compressors[0];
let output = &mut self.output_buffers[0];
let bound = Compressor::deflate_compress_bound(self.buffer.len());
if output.len() < bound {
output
.try_reserve(bound - output.len())
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
// SAFETY: We just reserved sufficient capacity. The compressor writes to
// the buffer using `MaybeUninit` pointers, so uninitialized memory is fine.
unsafe {
output.set_len(bound);
}
}
output.clear();
output
.try_reserve(bound)
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;

let mode = if final_block {
crate::compress::FlushMode::Finish
} else {
crate::compress::FlushMode::Sync
};
let out_uninit = unsafe {
std::slice::from_raw_parts_mut(
output.as_mut_ptr() as *mut MaybeUninit<u8>,
output.len(),
)
};
let out_uninit = output.spare_capacity_mut();
let (res, size, _) = compressor.compress(&self.buffer, out_uninit, mode);
if res == CompressResult::Success {
unsafe {
output.set_len(size);
}
if let Some(writer) = &mut self.writer {
writer.write_all(&output[..size])?;
writer.write_all(output)?;
}
} else {
return Err(io::Error::new(io::ErrorKind::Other, "Compression failed"));
Expand Down