Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
151 changes: 148 additions & 3 deletions parquet/src/encodings/rle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,143 @@ impl RleEncoder {
/// Size, in number of `i32s` of buffer to use for RLE batch reading
const RLE_DECODER_INDEX_BUFFER_SIZE: usize = 1024;

/// SVE-accelerated dictionary gather for AArch64.
///
/// SVE is an optional extension that is not part of the AArch64 baseline, so a
/// portable binary cannot rely on the autovectoriser emitting gather
/// instructions for it. Rust also has no stable SVE intrinsics, so the kernels
/// are written in inline assembly and guarded by a cached runtime feature check.
#[cfg(target_arch = "aarch64")]
mod sve_gather {
use std::arch::asm;
use std::mem::{needs_drop, size_of};
use std::sync::atomic::{AtomicU8, Ordering};

const UNCHECKED: u8 = 0;
const SUPPORTED: u8 = 1;
const UNSUPPORTED: u8 = 2;

static SVE_STATUS: AtomicU8 = AtomicU8::new(UNCHECKED);

#[inline]
fn sve_available() -> bool {
match SVE_STATUS.load(Ordering::Relaxed) {
UNCHECKED => {
let available = std::arch::is_aarch64_feature_detected!("sve");
SVE_STATUS.store(
if available { SUPPORTED } else { UNSUPPORTED },
Ordering::Relaxed,
);
available
}
status => status == SUPPORTED,
}
}

/// Gather `count` 4-byte elements `dict[indices[i]]` into `output`.
///
/// # Safety
/// - every `indices[i]` must be a valid in-bounds offset into `dict`
/// - `indices` and `output` must be valid for `count` elements
/// - the `sve` feature must be available
#[target_feature(enable = "sve")]
unsafe fn gather_4byte(dict: *const u8, indices: *const i32, output: *mut u8, count: usize) {
// SAFETY: the caller guarantees the index/dict/output invariants and that
// the `sve` feature is present.
unsafe {
asm!(
"mov {pos}, #0",
"whilelt p0.s, {pos}, {count}",
"2:",
"ld1w {{z0.s}}, p0/z, [{idx}, {pos}, lsl #2]",
"ld1w {{z1.s}}, p0/z, [{dict}, z0.s, uxtw #2]",
"st1w {{z1.s}}, p0, [{out}, {pos}, lsl #2]",
"incw {pos}",
"whilelt p0.s, {pos}, {count}",
"b.first 2b",
pos = out(reg) _,
count = in(reg) count,
idx = in(reg) indices,
dict = in(reg) dict,
out = in(reg) output,
options(nostack)
);
}
}

/// Gather `count` 8-byte elements `dict[indices[i]]` into `output`.
///
/// # Safety
/// Same contract as [`gather_4byte`], for 8-byte elements.
#[target_feature(enable = "sve")]
unsafe fn gather_8byte(dict: *const u8, indices: *const i32, output: *mut u8, count: usize) {
// SAFETY: the caller guarantees the index/dict/output invariants and that
// the `sve` feature is present.
unsafe {
asm!(
"mov {pos}, #0",
"mov {idx_pos}, #0",
"whilelt p0.d, {pos}, {count}",
"2:",
"ld1sw {{z0.d}}, p0/z, [{idx}, {idx_pos}, lsl #2]",
"ld1d {{z1.d}}, p0/z, [{dict}, z0.d, lsl #3]",
"st1d {{z1.d}}, p0, [{out}, {pos}, lsl #3]",
"incd {pos}",
"incd {idx_pos}",
"whilelt p0.d, {pos}, {count}",
"b.first 2b",
pos = out(reg) _,
idx_pos = out(reg) _,
count = in(reg) count,
idx = in(reg) indices,
dict = in(reg) dict,
out = in(reg) output,
options(nostack)
);
}
}

/// Gather `dict[indices[i]]` into `buffer` using SVE, returning `true` if the
/// SVE path ran and `false` if the caller must fall back to a scalar copy.
///
/// The kernels copy raw element bytes, so they only run for 4- and 8-byte
/// types that do not need drop. Every index must already be known to be in
/// bounds.
#[inline]
pub fn try_gather<T: Clone>(dict: &[T], indices: &[i32], buffer: &mut [T]) -> bool {
if needs_drop::<T>() || !sve_available() {
return false;
}

let count = indices.len();
debug_assert!(buffer.len() >= count);

// SAFETY: the caller has checked every index is in bounds, the slices
// hold `count` elements, and SVE availability was confirmed above.
match size_of::<T>() {
4 => unsafe {
gather_4byte(
dict.as_ptr() as *const u8,
indices.as_ptr(),
buffer.as_mut_ptr() as *mut u8,
count,
);
true
},
8 => unsafe {
gather_8byte(
dict.as_ptr() as *const u8,
indices.as_ptr(),
buffer.as_mut_ptr() as *mut u8,
count,
);
true
},
_ => false,
}
}
}

/// A RLE/Bit-Packing hybrid decoder.
pub struct RleDecoder {
// Number of bits used to encode the value. Must be between [0, 64].
Expand Down Expand Up @@ -579,9 +716,17 @@ impl RleDecoder {
if (max_idx as usize) >= dict_len {
return Err(oob(max_idx, dict_len));
}
for (b, i) in out_chunk.iter_mut().zip(idx_chunk.iter()) {
// SAFETY: all indices checked above to be in bounds
b.clone_from(unsafe { dict.get_unchecked(*i as usize) });
// Every index in this chunk is now known to be in
// bounds, so the gather reuses that single check.
#[cfg(target_arch = "aarch64")]
let gathered = sve_gather::try_gather(dict, idx_chunk, out_chunk);
#[cfg(not(target_arch = "aarch64"))]
let gathered = false;
if !gathered {
for (b, i) in out_chunk.iter_mut().zip(idx_chunk.iter()) {
// SAFETY: all indices checked above to be in bounds
b.clone_from(unsafe { dict.get_unchecked(*i as usize) });
}
}
}
for (b, i) in out_chunks
Expand Down