diff --git a/parquet/src/encodings/rle.rs b/parquet/src/encodings/rle.rs index 47e52d00b094..7ecfc8e7ab14 100644 --- a/parquet/src/encodings/rle.rs +++ b/parquet/src/encodings/rle.rs @@ -338,6 +338,143 @@ impl RleEncoder { /// Size, in number of `i32s` of buffer to use for RLE batch reading const RLE_DECODER_INDEX_BUFFER_SIZE: usize = 1024; +/// SVE-accelerated dictionary gather for AArch64. +/// +/// SVE is an optional extension that is not part of the AArch64 baseline, so a +/// portable binary cannot rely on the autovectoriser emitting gather +/// instructions for it. Rust also has no stable SVE intrinsics, so the kernels +/// are written in inline assembly and guarded by a cached runtime feature check. +#[cfg(target_arch = "aarch64")] +mod sve_gather { + use std::arch::asm; + use std::mem::{needs_drop, size_of}; + use std::sync::atomic::{AtomicU8, Ordering}; + + const UNCHECKED: u8 = 0; + const SUPPORTED: u8 = 1; + const UNSUPPORTED: u8 = 2; + + static SVE_STATUS: AtomicU8 = AtomicU8::new(UNCHECKED); + + #[inline] + fn sve_available() -> bool { + match SVE_STATUS.load(Ordering::Relaxed) { + UNCHECKED => { + let available = std::arch::is_aarch64_feature_detected!("sve"); + SVE_STATUS.store( + if available { SUPPORTED } else { UNSUPPORTED }, + Ordering::Relaxed, + ); + available + } + status => status == SUPPORTED, + } + } + + /// Gather `count` 4-byte elements `dict[indices[i]]` into `output`. + /// + /// # Safety + /// - every `indices[i]` must be a valid in-bounds offset into `dict` + /// - `indices` and `output` must be valid for `count` elements + /// - the `sve` feature must be available + #[target_feature(enable = "sve")] + unsafe fn gather_4byte(dict: *const u8, indices: *const i32, output: *mut u8, count: usize) { + // SAFETY: the caller guarantees the index/dict/output invariants and that + // the `sve` feature is present. + unsafe { + asm!( + "mov {pos}, #0", + "whilelt p0.s, {pos}, {count}", + "2:", + "ld1w {{z0.s}}, p0/z, [{idx}, {pos}, lsl #2]", + "ld1w {{z1.s}}, p0/z, [{dict}, z0.s, uxtw #2]", + "st1w {{z1.s}}, p0, [{out}, {pos}, lsl #2]", + "incw {pos}", + "whilelt p0.s, {pos}, {count}", + "b.first 2b", + pos = out(reg) _, + count = in(reg) count, + idx = in(reg) indices, + dict = in(reg) dict, + out = in(reg) output, + options(nostack) + ); + } + } + + /// Gather `count` 8-byte elements `dict[indices[i]]` into `output`. + /// + /// # Safety + /// Same contract as [`gather_4byte`], for 8-byte elements. + #[target_feature(enable = "sve")] + unsafe fn gather_8byte(dict: *const u8, indices: *const i32, output: *mut u8, count: usize) { + // SAFETY: the caller guarantees the index/dict/output invariants and that + // the `sve` feature is present. + unsafe { + asm!( + "mov {pos}, #0", + "mov {idx_pos}, #0", + "whilelt p0.d, {pos}, {count}", + "2:", + "ld1sw {{z0.d}}, p0/z, [{idx}, {idx_pos}, lsl #2]", + "ld1d {{z1.d}}, p0/z, [{dict}, z0.d, lsl #3]", + "st1d {{z1.d}}, p0, [{out}, {pos}, lsl #3]", + "incd {pos}", + "incd {idx_pos}", + "whilelt p0.d, {pos}, {count}", + "b.first 2b", + pos = out(reg) _, + idx_pos = out(reg) _, + count = in(reg) count, + idx = in(reg) indices, + dict = in(reg) dict, + out = in(reg) output, + options(nostack) + ); + } + } + + /// Gather `dict[indices[i]]` into `buffer` using SVE, returning `true` if the + /// SVE path ran and `false` if the caller must fall back to a scalar copy. + /// + /// The kernels copy raw element bytes, so they only run for 4- and 8-byte + /// types that do not need drop. Every index must already be known to be in + /// bounds. + #[inline] + pub fn try_gather(dict: &[T], indices: &[i32], buffer: &mut [T]) -> bool { + if needs_drop::() || !sve_available() { + return false; + } + + let count = indices.len(); + debug_assert!(buffer.len() >= count); + + // SAFETY: the caller has checked every index is in bounds, the slices + // hold `count` elements, and SVE availability was confirmed above. + match size_of::() { + 4 => unsafe { + gather_4byte( + dict.as_ptr() as *const u8, + indices.as_ptr(), + buffer.as_mut_ptr() as *mut u8, + count, + ); + true + }, + 8 => unsafe { + gather_8byte( + dict.as_ptr() as *const u8, + indices.as_ptr(), + buffer.as_mut_ptr() as *mut u8, + count, + ); + true + }, + _ => false, + } + } +} + /// A RLE/Bit-Packing hybrid decoder. pub struct RleDecoder { // Number of bits used to encode the value. Must be between [0, 64]. @@ -579,9 +716,17 @@ impl RleDecoder { if (max_idx as usize) >= dict_len { return Err(oob(max_idx, dict_len)); } - for (b, i) in out_chunk.iter_mut().zip(idx_chunk.iter()) { - // SAFETY: all indices checked above to be in bounds - b.clone_from(unsafe { dict.get_unchecked(*i as usize) }); + // Every index in this chunk is now known to be in + // bounds, so the gather reuses that single check. + #[cfg(target_arch = "aarch64")] + let gathered = sve_gather::try_gather(dict, idx_chunk, out_chunk); + #[cfg(not(target_arch = "aarch64"))] + let gathered = false; + if !gathered { + for (b, i) in out_chunk.iter_mut().zip(idx_chunk.iter()) { + // SAFETY: all indices checked above to be in bounds + b.clone_from(unsafe { dict.get_unchecked(*i as usize) }); + } } } for (b, i) in out_chunks