Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public static ArrayStats empty() {
}

/// Parses stats from a FlatBuffers [io.github.dfa1.vortex.fbs.ArrayStats] table.
/// Returns an empty instance when `fbs` is `null` or carries no min/max.
/// Returns an empty instance when `fbs` is `null` or carries no min/max and no null count.
///
/// @param fbs the FlatBuffers stats table, or `null`
/// @return parsed stats, or an empty instance if no usable data is present
Expand All @@ -44,10 +44,11 @@ public static ArrayStats fromFbs(io.github.dfa1.vortex.fbs.ArrayStats fbs) {
}
Object min = decodeScalar(fbs.minAsByteBuffer());
Object max = decodeScalar(fbs.maxAsByteBuffer());
if (min == null && max == null) {
Long nullCount = fbs.hasNullCount() ? fbs.nullCount() : null;
if (min == null && max == null && nullCount == null) {
return EMPTY;
}
return new ArrayStats(min, max, null, null, null, null);
return new ArrayStats(min, max, null, nullCount, null, null);
}

private static Object decodeScalar(ByteBuffer bytes) {
Expand Down
22 changes: 19 additions & 3 deletions reader/src/main/java/io/github/dfa1/vortex/reader/RowFilter.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@

import java.util.List;

/// Predicate tree for zone-map pruning. Evaluated against per-chunk min/max statistics;
/// chunks where no row can satisfy the filter are skipped entirely.
/// Predicate tree for zone-map pruning. Evaluated against per-chunk statistics — min/max for
/// comparisons, null count for [#isNull(String)] / [#isNotNull(String)]; chunks where no row
/// can satisfy the filter are skipped entirely.
public sealed interface RowFilter
permits RowFilter.And, RowFilter.Eq, RowFilter.Neq,
RowFilter.Gt, RowFilter.Gte, RowFilter.Lt, RowFilter.Lte {
RowFilter.Gt, RowFilter.Gte, RowFilter.Lt, RowFilter.Lte,
RowFilter.IsNull, RowFilter.IsNotNull {

static RowFilter and(RowFilter... filters) {
return new And(List.of(filters));
Expand Down Expand Up @@ -36,6 +38,14 @@ static RowFilter lte(String col, Comparable<?> val) {
return new Lte(col, val);
}

static RowFilter isNull(String col) {
return new IsNull(col);
}

static RowFilter isNotNull(String col) {
return new IsNotNull(col);
}

default RowFilter and(RowFilter other) {
return new And(List.of(this, other));
}
Expand All @@ -60,4 +70,10 @@ record Lt(String column, Comparable<?> value) implements RowFilter {

record Lte(String column, Comparable<?> value) implements RowFilter {
}

record IsNull(String column) implements RowFilter {
}

record IsNotNull(String column) implements RowFilter {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -775,6 +775,24 @@ private boolean canPruneChunk(ChunkSpec chunk, RowFilter filter) {
yield false;
}
}
case RowFilter.IsNull(var col) -> {
Layout flat = chunk.layoutFor(col);
if (flat == null) {
yield false;
}
// Zero nulls in the chunk → no row is null → nothing can match IS NULL.
Long nullCount = readFlatStats(flat).nullCount();
yield nullCount != null && nullCount == 0;
}
case RowFilter.IsNotNull(var col) -> {
Layout flat = chunk.layoutFor(col);
if (flat == null) {
yield false;
}
// Every row is null → no row is non-null → nothing can match IS NOT NULL.
Long nullCount = readFlatStats(flat).nullCount();
yield nullCount != null && nullCount == flat.rowCount();
}
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,10 @@ public Map<String, ArrayStats> columnStats() {
private ArrayStats aggregateStats(List<Layout> flats) {
Object globalMin = null;
Object globalMax = null;
// Sum is meaningful only when every chunk carries a null count; one missing makes the
// column total unknown (null), so don't report a partial sum.
long totalNullCount = 0L;
boolean allHaveNullCount = !flats.isEmpty();
for (Layout flat : flats) {
ArrayStats s = readFlatStats(flat);
if (s.min() != null) {
Expand All @@ -189,11 +193,17 @@ private ArrayStats aggregateStats(List<Layout> flats) {
if (s.max() != null) {
globalMax = globalMax == null ? s.max() : maxOf(globalMax, s.max());
}
if (s.nullCount() != null) {
totalNullCount += s.nullCount();
} else {
allHaveNullCount = false;
}
}
if (globalMin == null && globalMax == null) {
Long nullCount = allHaveNullCount ? totalNullCount : null;
if (globalMin == null && globalMax == null && nullCount == null) {
return ArrayStats.empty();
}
return new ArrayStats(globalMin, globalMax, null, null, null, null);
return new ArrayStats(globalMin, globalMax, null, nullCount, null, null);
}

private ArrayStats readFlatStats(Layout flat) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -555,7 +555,8 @@ private int writeSegment(DType dtype, Object data, EncodingEncoder encodingOverr
int segIdx = segs.size();
long offset = bytesWritten;

ByteBuffer fbBuf = buildArrayFlatBuffer(result);
long segNullCount = data instanceof NullableData nd ? countNulls(nd.validity()) : 0L;
ByteBuffer fbBuf = buildArrayFlatBuffer(result, segNullCount);

// Segment format: [buffer data...] [FlatBuffer Array bytes] [4-byte LE u32 = fbLen]
int fbLen = fbBuf.remaining();
Expand All @@ -571,7 +572,7 @@ private int writeSegment(DType dtype, Object data, EncodingEncoder encodingOverr
segs.add(new SegRef(offset, bytesWritten - offset));
lastStatsMin = result.statsMin();
lastStatsMax = result.statsMax();
lastNullCount = data instanceof NullableData nd ? countNulls(nd.validity()) : 0L;
lastNullCount = segNullCount;
return segIdx;
}
}
Expand Down Expand Up @@ -616,19 +617,28 @@ private void writePadding(int n) throws IOException {
bytesWritten += n;
}

private ByteBuffer buildArrayFlatBuffer(EncodeResult result) {
private ByteBuffer buildArrayFlatBuffer(EncodeResult result, long nullCount) {
var fbb = new FlatBufferBuilder(256);

// Stats for root node only (build before root ArrayNode)
int statsOff = 0;
// Stats for the root node only (build vectors before the ArrayStats table). null_count is
// always recorded; min/max only when the encoder produced them.
int minVec = result.hasStats()
? io.github.dfa1.vortex.fbs.ArrayStats.createMinVector(fbb, result.statsMin()) : 0;
int maxVec = result.hasStats()
? io.github.dfa1.vortex.fbs.ArrayStats.createMaxVector(fbb, result.statsMax()) : 0;
// forceDefaults only while building ArrayStats, so null_count = 0 is serialised (flatbuffers
// omits a scalar equal to its default otherwise) — matching the Rust writer and letting the
// reader prune IS NULL on zero-null chunks. Reset immediately so the Array/ArrayNode tables
// keep their normal (offset-default-omitting) layout.
fbb.forceDefaults(true);
io.github.dfa1.vortex.fbs.ArrayStats.startArrayStats(fbb);
if (result.hasStats()) {
int minVec = io.github.dfa1.vortex.fbs.ArrayStats.createMinVector(fbb, result.statsMin());
int maxVec = io.github.dfa1.vortex.fbs.ArrayStats.createMaxVector(fbb, result.statsMax());
io.github.dfa1.vortex.fbs.ArrayStats.startArrayStats(fbb);
io.github.dfa1.vortex.fbs.ArrayStats.addMin(fbb, minVec);
io.github.dfa1.vortex.fbs.ArrayStats.addMax(fbb, maxVec);
statsOff = io.github.dfa1.vortex.fbs.ArrayStats.endArrayStats(fbb);
}
io.github.dfa1.vortex.fbs.ArrayStats.addNullCount(fbb, nullCount);
int statsOff = io.github.dfa1.vortex.fbs.ArrayStats.endArrayStats(fbb);
fbb.forceDefaults(false);

int rootNodeOff = buildArrayNodeFlatBuffer(fbb, result.rootNode(), statsOff);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package io.github.dfa1.vortex.writer;

import io.github.dfa1.vortex.core.DType;
import io.github.dfa1.vortex.core.PType;
import io.github.dfa1.vortex.reader.RowFilter;
import io.github.dfa1.vortex.reader.ScanOptions;
import io.github.dfa1.vortex.reader.VortexReader;
import io.github.dfa1.vortex.writer.encode.NullableData;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

import java.io.IOException;
import java.nio.channels.FileChannel;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import static org.assertj.core.api.Assertions.assertThat;

/// Verifies IS NULL / IS NOT NULL chunk pruning via per-chunk null_count in the ArrayNode stats.
class NullCountPruningTest {

@TempDir
Path tmp;

private static final DType.Struct SCHEMA = new DType.Struct(
List.of("v"), List.of(new DType.Primitive(PType.I64, true)), false);

// chunkSize large so each writeChunk is exactly one chunk (one zone). Three chunks of distinct
// sizes and null patterns: 3 rows / 0 nulls, 2 rows / 1 null, 4 rows / all null.
private Path write() throws IOException {
Path file = tmp.resolve("nulls.vtx");
WriteOptions opts = new WriteOptions(1024, true, 0.90, 0, false, false);
try (var ch = FileChannel.open(file, StandardOpenOption.CREATE, StandardOpenOption.WRITE);
var sut = VortexWriter.create(ch, SCHEMA, opts)) {
sut.writeChunk(Map.of("v", new NullableData(
new long[]{1, 2, 3}, new boolean[]{true, true, true})));
sut.writeChunk(Map.of("v", new NullableData(
new long[]{4, 0}, new boolean[]{true, false})));
sut.writeChunk(Map.of("v", new NullableData(
new long[]{0, 0, 0, 0}, new boolean[]{false, false, false, false})));
}
return file;
}

private List<Long> scanRowCounts(Path file, RowFilter filter) throws IOException {
var opts = new ScanOptions(List.of(), filter, ScanOptions.NO_LIMIT);
var counts = new ArrayList<Long>();
try (VortexReader reader = VortexReader.open(file);
var iter = reader.scan(opts)) {
iter.forEachRemaining(c -> counts.add(c.rowCount()));
}
return counts;
}

@Test
void isNull_prunesZeroNullChunks() throws IOException {
// Given the 3-row chunk has zero nulls → pruned; the 1-null and all-null chunks survive
Path file = write();

// When / Then surviving chunk sizes are 2 (1-null) and 4 (all-null)
assertThat(scanRowCounts(file, RowFilter.isNull("v"))).containsExactly(2L, 4L);
}

@Test
void isNotNull_prunesAllNullChunks() throws IOException {
// Given the 4-row chunk is all null → pruned; the 0-null and 1-null chunks survive
Path file = write();

// When / Then surviving chunk sizes are 3 (0-null) and 2 (1-null)
assertThat(scanRowCounts(file, RowFilter.isNotNull("v"))).containsExactly(3L, 2L);
}

@Test
void noFilter_keepsAllChunks() throws IOException {
// Given / When / Then all three chunks survive
assertThat(scanRowCounts(write(), null)).containsExactly(3L, 2L, 4L);
}

@Test
void columnStats_sumsNullCountAcrossChunks() throws IOException {
// Given chunks carry 0, 1 and 4 nulls; every chunk records a null count so the
// column total is known (0 + 1 + 4 = 5) rather than dropped to unknown
Path file = write();

// When
Long result;
try (VortexReader reader = VortexReader.open(file)) {
result = reader.columnStats().get("v").nullCount();
}

// Then
assertThat(result).isEqualTo(5L);
}
}
Loading