Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
320 changes: 302 additions & 18 deletions arrow-select/src/coalesce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
//!
//! [`filter`]: crate::filter::filter
//! [`take`]: crate::take::take
use crate::filter::filter_record_batch;
use crate::filter::{FilterBuilder, FilterPredicate, FilterSelection};
use crate::take::take_record_batch;
use arrow_array::types::{BinaryViewType, StringViewType};
use arrow_array::{Array, ArrayRef, BooleanArray, RecordBatch, downcast_primitive};
Expand Down Expand Up @@ -212,7 +212,7 @@ impl BatchCoalescer {
/// Push a batch into the Coalescer after applying a filter
///
/// This is semantically equivalent of calling [`Self::push_batch`]
/// with the results from [`filter_record_batch`]
/// with the results from [`crate::filter::filter_record_batch`]
///
/// # Example
/// ```
Expand All @@ -238,10 +238,7 @@ impl BatchCoalescer {
batch: RecordBatch,
filter: &BooleanArray,
) -> Result<(), ArrowError> {
// TODO: optimize this to avoid materializing (copying the results
// of filter to a new batch)
let filtered_batch = filter_record_batch(&batch, filter)?;
self.push_batch(filtered_batch)
self.push_batch_with_filtered_columns(batch, filter)
}

/// Push a batch into the Coalescer after applying a set of indices
Expand Down Expand Up @@ -566,6 +563,79 @@ impl BatchCoalescer {
}
}

impl BatchCoalescer {
fn push_batch_with_filtered_columns(
&mut self,
batch: RecordBatch,
filter: &BooleanArray,
) -> Result<(), ArrowError> {
if filter.len() > batch.num_rows() {
return Err(ArrowError::InvalidArgumentError(format!(
"Filter predicate of length {} is larger than target array of length {}",
filter.len(),
batch.num_rows()
)));
}

let mut filter_builder = FilterBuilder::new(filter);
if batch.num_columns() > 1
|| (batch.num_columns() > 0
&& FilterBuilder::is_optimize_beneficial(batch.schema_ref().field(0).data_type()))
{
filter_builder = filter_builder.optimize();
}
let predicate = filter_builder.build();
let selected_count = predicate.count();

if selected_count == 0 {
return Ok(());
}

if selected_count == batch.num_rows() && filter.len() == batch.num_rows() {
return self.push_batch(batch);
}

let exceeds_coalesce_limit = self
.biggest_coalesce_batch_size
.is_some_and(|limit| selected_count > limit);
// Multi-column batches benefit from sharing the selection across
// columns; single-column batches need a sparser filter to win.
let is_dense_filter = if batch.num_columns() > 1 {
selected_count.saturating_mul(4) > filter.len()
} else {
selected_count.saturating_mul(16) > filter.len()
};
let does_not_fit_buffer = selected_count > self.target_batch_size - self.buffered_rows;

if exceeds_coalesce_limit || is_dense_filter || does_not_fit_buffer {
// Use materialized filtering when sparse per-column copying won't help.
let filtered_batch = predicate.filter_record_batch(&batch)?;
return self.push_batch(filtered_batch);
}

let (_schema, arrays, _num_rows) = batch.into_parts();

if arrays.len() != self.in_progress_arrays.len() {
return Err(ArrowError::InvalidArgumentError(format!(
"Batch has {} columns but BatchCoalescer expects {}",
arrays.len(),
self.in_progress_arrays.len()
)));
}

for (in_progress, array) in self.in_progress_arrays.iter_mut().zip(arrays) {
in_progress.copy_rows_by_filter_from(array, &predicate)?;
}

self.buffered_rows += selected_count;
if self.buffered_rows >= self.target_batch_size {
self.finish_buffered_batch()?;
}

Ok(())
}
}

/// Return a new `InProgressArray` for the given data type
fn create_in_progress_array(data_type: &DataType, batch_size: usize) -> Box<dyn InProgressArray> {
macro_rules! instantiate_primitive {
Expand All @@ -591,9 +661,9 @@ fn create_in_progress_array(data_type: &DataType, batch_size: usize) -> Box<dyn
/// Incrementally builds up arrays
///
/// [`GenericInProgressArray`] is the default implementation that buffers
/// arrays and uses other kernels concatenates them when finished.
/// arrays, uses other kernels, and concatenates them when finished.
///
/// Some types have specialized implementations for this array types (e.g.,
/// Some types have specialized, faster implementations (e.g.,
/// [`StringViewArray`], etc.).
///
/// [`StringViewArray`]: arrow_array::StringViewArray
Expand All @@ -606,11 +676,52 @@ trait InProgressArray: std::fmt::Debug + Send + Sync {

/// Copy rows from the current source array into the in-progress array
///
/// The source array is set by [`Self::set_source`].
/// Note: The source array is set by [`Self::set_source`].
///
/// Return an error if the source array is not set
fn copy_rows(&mut self, offset: usize, len: usize) -> Result<(), ArrowError>;

/// Copy rows selected by `filter` from the current source array.
///
/// The default implementation calls [`Self::copy_rows_by_selection`]
fn copy_rows_by_filter(&mut self, filter: &FilterPredicate) -> Result<(), ArrowError> {
self.copy_rows_by_selection(filter.selection())
}

/// Copy rows selected by a [`FilterPredicate`] from `source`.
///
/// Unlike the other copy methods, the source array is passed in directly
/// rather than read from the array set by [`Self::set_source`].
///
/// The default implementation sets `source` via [`Self::set_source`] and
/// then calls [`Self::copy_rows_by_filter`].
fn copy_rows_by_filter_from(
&mut self,
source: ArrayRef,
filter: &FilterPredicate,
) -> Result<(), ArrowError> {
self.set_source(Some(source));
let result = self.copy_rows_by_filter(filter);
self.set_source(None);
result
}

/// Copy rows described by a [`FilterSelection`] from the current source array.
///
/// You typically get a [`FilterSelection`] from [`FilterPredicate::selection`].
///
/// Note: The source array is set by [`Self::set_source`].
fn copy_rows_by_selection(&mut self, selection: FilterSelection<'_>) -> Result<(), ArrowError> {
match selection {
FilterSelection::None => Ok(()),
FilterSelection::All { len } => self.copy_rows(0, len),
FilterSelection::Slices(slices) => {
slices.try_for_each(|(start, end)| self.copy_rows(start, end - start))
}
FilterSelection::Indices(indices) => indices.try_for_each(|idx| self.copy_rows(idx, 1)),
}
}

/// Finish the currently in-progress array and return it as an `ArrayRef`
fn finish(&mut self) -> Result<ArrayRef, ArrowError>;
}
Expand All @@ -619,6 +730,7 @@ trait InProgressArray: std::fmt::Debug + Send + Sync {
mod tests {
use super::*;
use crate::concat::concat_batches;
use crate::filter::filter_record_batch;
use arrow_array::builder::StringViewBuilder;
use arrow_array::cast::AsArray;
use arrow_array::types::Int32Type;
Expand Down Expand Up @@ -1197,6 +1309,172 @@ mod tests {
.run();
}

#[test]
fn test_binary_view_filtered() {
let values: Vec<Option<&[u8]>> = vec![
Some(b"foo"),
None,
Some(b"A longer string that is more than 12 bytes"),
];

let binary_view =
BinaryViewArray::from_iter(std::iter::repeat(values.iter()).flatten().take(1000));
let batch =
RecordBatch::try_from_iter(vec![("c0", Arc::new(binary_view) as ArrayRef)]).unwrap();
let filter = sparse_filter(1000);

Test::new("coalesce_binary_view_filtered")
.with_batch(batch.clone())
.with_filter(filter.clone())
.with_batch(batch)
.with_filter(filter)
.with_batch_size(256)
.with_expected_output_sizes(vec![250])
.run();
}

#[test]
fn test_binary_view_filtered_inline() {
let values: Vec<Option<&[u8]>> = vec![Some(b"foo"), None, Some(b"barbaz")];

let binary_view =
BinaryViewArray::from_iter(std::iter::repeat(values.iter()).flatten().take(1000));
let batch =
RecordBatch::try_from_iter(vec![("c0", Arc::new(binary_view) as ArrayRef)]).unwrap();
let filter = sparse_filter(1000);

Test::new("coalesce_binary_view_filtered_inline")
.with_batch(batch.clone())
.with_filter(filter.clone())
.with_batch(batch)
.with_filter(filter)
.with_batch_size(300)
.with_expected_output_sizes(vec![250])
.run();
}

#[test]
fn test_string_view_filtered_inline() {
let values: Vec<Option<&str>> = vec![Some("foo"), None, Some("barbaz")];

let string_view =
StringViewArray::from_iter(std::iter::repeat(values.iter()).flatten().take(1000));
let batch =
RecordBatch::try_from_iter(vec![("c0", Arc::new(string_view) as ArrayRef)]).unwrap();
let filter = sparse_filter(1000);

Test::new("coalesce_string_view_filtered_inline")
.with_batch(batch.clone())
.with_filter(filter.clone())
.with_batch(batch)
.with_filter(filter)
.with_batch_size(300)
.with_expected_output_sizes(vec![250])
.run();
}

#[test]
fn test_mixed_inline_binary_view_filtered() {
let int_values =
Int32Array::from_iter((0..1000).map(|v| if v % 5 == 0 { None } else { Some(v) }));
let float_values = arrow_array::Float64Array::from_iter((0..1000).map(|v| Some(v as f64)));
let binary_values: Vec<Option<&[u8]>> = vec![Some(b"foo"), None, Some(b"barbaz")];
let binary_view = BinaryViewArray::from_iter(
std::iter::repeat(binary_values.iter()).flatten().take(1000),
);

let batch = RecordBatch::try_from_iter(vec![
("i", Arc::new(int_values) as ArrayRef),
("f", Arc::new(float_values) as ArrayRef),
("b", Arc::new(binary_view) as ArrayRef),
])
.unwrap();

let filter = sparse_filter(1000);

Test::new("coalesce_mixed_inline_binary_view_filtered")
.with_batch(batch.clone())
.with_filter(filter.clone())
.with_batch(batch)
.with_filter(filter)
.with_batch_size(300)
.with_expected_output_sizes(vec![250])
.run();
}

#[test]
fn test_mixed_inline_string_view_filtered() {
let int_values =
Int32Array::from_iter((0..1000).map(|v| if v % 5 == 0 { None } else { Some(v) }));
let float_values = arrow_array::Float64Array::from_iter((0..1000).map(|v| Some(v as f64)));
let string_values: Vec<Option<&str>> = vec![Some("foo"), None, Some("barbaz")];
let string_view = StringViewArray::from_iter(
std::iter::repeat(string_values.iter()).flatten().take(1000),
);

let batch = RecordBatch::try_from_iter(vec![
("i", Arc::new(int_values) as ArrayRef),
("f", Arc::new(float_values) as ArrayRef),
("s", Arc::new(string_view) as ArrayRef),
])
.unwrap();

let filter = sparse_filter(1000);

Test::new("coalesce_mixed_inline_string_view_filtered")
.with_batch(batch.clone())
.with_filter(filter.clone())
.with_batch(batch)
.with_filter(filter)
.with_batch_size(300)
.with_expected_output_sizes(vec![250])
.run();
}

#[test]
fn test_mixed_boolean_inline_string_view_filtered() {
let bool_values = BooleanArray::from_iter((0..1000).map(|v| Some(v % 3 == 0)));
let string_values: Vec<Option<&str>> = vec![Some("foo"), None, Some("barbaz")];
let string_view = StringViewArray::from_iter(
std::iter::repeat(string_values.iter()).flatten().take(1000),
);

let batch = RecordBatch::try_from_iter(vec![
("b", Arc::new(bool_values) as ArrayRef),
("s", Arc::new(string_view) as ArrayRef),
])
.unwrap();

let filter = sparse_filter(1000);

Test::new("coalesce_mixed_boolean_inline_string_view_filtered")
.with_batch(batch.clone())
.with_filter(filter.clone())
.with_batch(batch)
.with_filter(filter)
.with_batch_size(300)
.with_expected_output_sizes(vec![250])
.run();
}

#[test]
fn test_inline_filter_rejects_filter_longer_than_batch() {
let values: Vec<Option<&[u8]>> = vec![Some(b"foo"), Some(b"bar")];
let binary_view = BinaryViewArray::from_iter(values);
let batch =
RecordBatch::try_from_iter(vec![("c0", Arc::new(binary_view) as ArrayRef)]).unwrap();
let filter = BooleanArray::from(vec![true, false, true]);

let mut coalescer = BatchCoalescer::new(batch.schema(), 100);
let result = coalescer.push_batch_with_filter(batch, &filter);
assert!(result.is_err());
let err = result.unwrap_err().to_string();
assert!(
err.contains("Filter predicate of length 3 is larger than target array of length 2"),
"unexpected error: {err}"
);
}

#[derive(Debug, Clone, PartialEq)]
struct ExpectedLayout {
len: usize,
Expand Down Expand Up @@ -1685,6 +1963,10 @@ mod tests {
}
}

fn sparse_filter(len: usize) -> BooleanArray {
BooleanArray::from_iter((0..len).map(|idx| Some(idx % 8 == 0)))
}

/// Returns the named column as a StringViewArray
fn col_as_string_view<'b>(name: &str, batch: &'b RecordBatch) -> &'b StringViewArray {
batch
Expand All @@ -1701,18 +1983,20 @@ mod tests {
let (schema, mut columns, row_count) = batch.into_parts();

for column in columns.iter_mut() {
let Some(string_view) = column.as_string_view_opt() else {
if let Some(string_view) = column.as_string_view_opt() {
// Re-create the StringViewArray to ensure memory layout is
// consistent
let mut builder = StringViewBuilder::new();
for s in string_view.iter() {
builder.append_option(s);
}
*column = Arc::new(builder.finish());
continue;
};
}

// Re-create the StringViewArray to ensure memory layout is
// consistent
let mut builder = StringViewBuilder::new();
for s in string_view.iter() {
builder.append_option(s);
if let Some(binary_view) = column.as_binary_view_opt() {
*column = Arc::new(BinaryViewArray::from_iter(binary_view.iter()));
}
// Update the column with the new StringViewArray
*column = Arc::new(builder.finish());
}

let options = RecordBatchOptions::new().with_row_count(Some(row_count));
Expand Down
Loading
Loading