Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 32 additions & 11 deletions writer/src/main/java/io/github/dfa1/vortex/writer/VortexWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,10 @@ public final class VortexWriter implements Closeable {
private static final int LAYOUT_DICT = 3;
private static final int LAYOUT_ZONED = 4;

// Stat ordinals in the Rust `Stat` enum (see ZonedStatsSchema). v1 emits MAX + MIN only.
// Stat ordinals in the Rust `Stat` enum (see ZonedStatsSchema). Emitted: MAX, MIN, NULL_COUNT.
private static final int STAT_MAX = 3;
private static final int STAT_MIN = 4;
private static final int STAT_NULL_COUNT = 6;

// Columns with global cardinality below this threshold are dict-encoded across all chunks.
// Kept low: global dict hurts high-cardinality F64 columns (ALP codes beat U16 dict codes).
Expand Down Expand Up @@ -121,6 +122,8 @@ public final class VortexWriter implements Closeable {
// Stats (ScalarValue bytes) of the most recently written segment, captured for ChunkRef.
private byte[] lastStatsMin;
private byte[] lastStatsMax;
// Null count of the most recently written segment's input data (0 for dense arrays).
private long lastNullCount;

private VortexWriter(
WritableByteChannel channel, DType.Struct schema, WriteOptions options, List<EncodingEncoder> encodings
Expand Down Expand Up @@ -466,7 +469,7 @@ public void writeChunk(Map<String, Object> columns) throws IOException {
} else {
long rowCount = arrayLength(data);
int segIdx = writeSegment(colDtype, data);
colChunks.get(colName).add(new ChunkRef(segIdx, rowCount, lastStatsMin, lastStatsMax));
colChunks.get(colName).add(new ChunkRef(segIdx, rowCount, lastStatsMin, lastStatsMax, lastNullCount));
}
}
firstChunkSeen = true;
Expand Down Expand Up @@ -568,6 +571,7 @@ private int writeSegment(DType dtype, Object data, EncodingEncoder encodingOverr
segs.add(new SegRef(offset, bytesWritten - offset));
lastStatsMin = result.statsMin();
lastStatsMax = result.statsMax();
lastNullCount = data instanceof NullableData nd ? countNulls(nd.validity()) : 0L;
return segIdx;
}
}
Expand Down Expand Up @@ -699,17 +703,24 @@ private void flushZoneMaps() throws IOException {
java.util.Arrays.fill(allValid, true);
boolean[] notTruncated = new boolean[nZones];
DType nullablePrim = new DType.Primitive(prim.ptype(), true);
// Field order mirrors ZonedStatsSchema.statsTableDtype for present stats MAX(3), MIN(4):
// [max, max_is_truncated, min, min_is_truncated].
long[] nullCounts = new long[nZones];
for (int i = 0; i < nZones; i++) {
nullCounts[i] = chunks.get(i).nullCount();
}
// Field order mirrors ZonedStatsSchema.statsTableDtype for present stats MAX(3), MIN(4),
// NULL_COUNT(6): [max, max_is_truncated, min, min_is_truncated, null_count]. Every stat
// field is nullable in the reconstructed dtype, so null_count is a nullable U64.
DType.Struct statsDtype = new DType.Struct(
List.of("max", "max_is_truncated", "min", "min_is_truncated"),
List.of(nullablePrim, new DType.Bool(false), nullablePrim, new DType.Bool(false)),
List.of("max", "max_is_truncated", "min", "min_is_truncated", "null_count"),
List.of(nullablePrim, new DType.Bool(false), nullablePrim, new DType.Bool(false),
new DType.Primitive(PType.U64, true)),
false);
StructData sd = new StructData(List.of(
new NullableData(statColumn(prim.ptype(), chunks, true), allValid),
notTruncated,
new NullableData(statColumn(prim.ptype(), chunks, false), allValid.clone()),
notTruncated.clone()));
notTruncated.clone(),
new NullableData(nullCounts, allValid.clone())));
int zonesSegIdx = writeSegment(statsDtype, sd, new StructEncodingEncoder());
zoneMaps.put(colName, new ZoneMapRef(zonesSegIdx, nZones, options.chunkSize()));
}
Expand All @@ -734,10 +745,20 @@ private int wrapZoneMap(FlatBufferBuilder fbb, String colName, int dataLayout, l
private static byte[] zonedMetadataBytes(long zoneLen) {
byte[] meta = new byte[5];
ByteBuffer.wrap(meta).order(ByteOrder.LITTLE_ENDIAN).putInt((int) zoneLen);
meta[4] = (byte) ((1 << STAT_MAX) | (1 << STAT_MIN));
meta[4] = (byte) ((1 << STAT_MAX) | (1 << STAT_MIN) | (1 << STAT_NULL_COUNT));
return meta;
}

private static long countNulls(boolean[] validity) {
long nulls = 0;
for (boolean valid : validity) {
if (!valid) {
nulls++;
}
}
return nulls;
}

private static boolean isZoneMappable(PType ptype) {
return switch (ptype) {
case I8, I16, I32, I64, U8, U16, U32, U64, F32, F64 -> true;
Expand Down Expand Up @@ -989,7 +1010,7 @@ private void writeGlobalDictColumn(String colName, DType.Primitive dtype, List<O
for (Object chunk : chunks) {
long rowCount = arrayLength(chunk);
int segIdx = writeSegment(dtype, chunk);
colChunks.get(colName).add(new ChunkRef(segIdx, rowCount, lastStatsMin, lastStatsMax));
colChunks.get(colName).add(new ChunkRef(segIdx, rowCount, lastStatsMin, lastStatsMax, lastNullCount));
}
return;
}
Expand Down Expand Up @@ -1033,7 +1054,7 @@ private void writeGlobalDictUtf8Column(String colName, DType.Utf8 dtype, List<Ob
for (Object chunk : chunks) {
long rowCount = arrayLength(chunk);
int segIdx = writeSegment(dtype, chunk);
colChunks.get(colName).add(new ChunkRef(segIdx, rowCount, lastStatsMin, lastStatsMax));
colChunks.get(colName).add(new ChunkRef(segIdx, rowCount, lastStatsMin, lastStatsMax, lastNullCount));
}
return;
}
Expand Down Expand Up @@ -1248,7 +1269,7 @@ private static Object buildCodesArray(Object data, PType ptype, Map<Object, Inte
private record SegRef(long offset, long len) {
}

private record ChunkRef(int segIdx, long rowCount, byte[] statsMin, byte[] statsMax) {
private record ChunkRef(int segIdx, long rowCount, byte[] statsMin, byte[] statsMax, long nullCount) {
boolean hasStats() {
return statsMin != null && statsMax != null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,12 @@ void enableZoneMaps_wrapsColumnInZonedLayoutWithMetadata(@TempDir Path tmp) thro
try (VortexReader reader = VortexReader.open(file)) {
Layout column = reader.layout().children().get(0);

// Then the column is a vortex.stats layout: [data, zones], zone_len=4, MAX+MIN bitset
// Then the column is a vortex.stats layout: [data, zones], zone_len=4, MAX+MIN+NULL_COUNT
assertThat(column.isZoned()).isTrue();
assertThat(column.children()).hasSize(2);
ByteBuffer meta = column.metadata().duplicate().order(ByteOrder.LITTLE_ENDIAN);
assertThat(meta.getInt(meta.position())).isEqualTo(4); // zone_len
assertThat(meta.get(meta.position() + 4)).isEqualTo((byte) 0x18); // bits 3(MAX)+4(MIN)
assertThat(meta.get(meta.position() + 4)).isEqualTo((byte) 0x58); // bits 3(MAX)+4(MIN)+6(NULL_COUNT)
}
}

Expand Down Expand Up @@ -99,25 +99,65 @@ void zoneMaps_statsPayloadDecodesPerZoneMinMax(@TempDir Path tmp) throws IOExcep
try (VortexReader reader = VortexReader.open(file)) {
Layout zonesFlat = reader.layout().children().get(0).children().get(1);
SegmentSpec spec = reader.footer().segmentSpecs().get(zonesFlat.segments().getFirst());
DType nullableI64 = new DType.Primitive(PType.I64, true);
DType.Struct statsDtype = new DType.Struct(
List.of("max", "max_is_truncated", "min", "min_is_truncated"),
List.of(nullableI64, new DType.Bool(false), nullableI64, new DType.Bool(false)),
false);

try (Arena arena = Arena.ofConfined()) {
StructArray stats = (StructArray) reader.decodeFlatSegment(spec, statsDtype, 3, arena);
StructArray stats = (StructArray) reader.decodeFlatSegment(spec, statsTableDtype(), 3, arena);
LongArray max = (LongArray) ((MaskedArray) stats.field("max")).inner();
LongArray min = (LongArray) ((MaskedArray) stats.field("min")).inner();
LongArray nullCount = (LongArray) ((MaskedArray) stats.field("null_count")).inner();

// Then min/max per zone match the source data
// Then min/max per zone match the source data; the column is non-nullable so
// every zone reports zero nulls
assertThat(min.getLong(0)).isZero();
assertThat(max.getLong(0)).isEqualTo(3);
assertThat(min.getLong(1)).isEqualTo(4);
assertThat(max.getLong(1)).isEqualTo(7);
assertThat(min.getLong(2)).isEqualTo(8);
assertThat(max.getLong(2)).isEqualTo(11);
assertThat(nullCount.getLong(0)).isZero();
assertThat(nullCount.getLong(1)).isZero();
assertThat(nullCount.getLong(2)).isZero();
}
}
}

@Test
void zoneMaps_nullableColumn_recordsPerZoneNullCount(@TempDir Path tmp) throws IOException {
// Given a nullable I64 column across two zones of two rows: zone 0 = [10, null],
// zone 1 = [null, null]
DType.Struct schema = new DType.Struct(
List.of("v"), List.of(new DType.Primitive(PType.I64, true)), false);
WriteOptions opts = new WriteOptions(2, true, 0.90, 0, true, false);
Path file = tmp.resolve("nullable.vtx");
try (var ch = FileChannel.open(file, StandardOpenOption.CREATE, StandardOpenOption.WRITE);
var sut = VortexWriter.create(ch, schema, opts)) {
sut.writeChunk(Map.of("v", new io.github.dfa1.vortex.writer.encode.NullableData(
new long[]{10L, 0L}, new boolean[]{true, false})));
sut.writeChunk(Map.of("v", new io.github.dfa1.vortex.writer.encode.NullableData(
new long[]{0L, 0L}, new boolean[]{false, false})));
}

// When the per-zone stats table is decoded
try (VortexReader reader = VortexReader.open(file)) {
Layout zonesFlat = reader.layout().children().get(0).children().get(1);
SegmentSpec spec = reader.footer().segmentSpecs().get(zonesFlat.segments().getFirst());
try (Arena arena = Arena.ofConfined()) {
StructArray stats = (StructArray) reader.decodeFlatSegment(spec, statsTableDtype(), 2, arena);
LongArray nullCount = (LongArray) ((MaskedArray) stats.field("null_count")).inner();

// Then each zone's null count is recorded (1 and 2)
assertThat(nullCount.getLong(0)).isEqualTo(1);
assertThat(nullCount.getLong(1)).isEqualTo(2);
}
}
}

/// Reconstructs the stats-table dtype the writer emits: MAX, MIN, NULL_COUNT for an I64 column.
private static DType.Struct statsTableDtype() {
DType nullableI64 = new DType.Primitive(PType.I64, true);
return new DType.Struct(
List.of("max", "max_is_truncated", "min", "min_is_truncated", "null_count"),
List.of(nullableI64, new DType.Bool(false), nullableI64, new DType.Bool(false),
new DType.Primitive(PType.U64, true)),
false);
}
}