Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 18 additions & 32 deletions writer/src/main/java/io/github/dfa1/vortex/writer/VortexWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -692,7 +692,7 @@ private void flushZoneMaps() throws IOException {
continue;
}
DType colDtype = schema.fieldTypes().get(schema.fieldNames().indexOf(colName));
if (!(colDtype instanceof DType.Primitive prim) || !isZoneMappable(prim.ptype())) {
if (!(colDtype instanceof DType.Primitive prim)) {
continue;
}
if (!chunks.stream().allMatch(ChunkRef::hasStats)) {
Expand Down Expand Up @@ -759,16 +759,9 @@ private static long countNulls(boolean[] validity) {
return nulls;
}

private static boolean isZoneMappable(PType ptype) {
return switch (ptype) {
case I8, I16, I32, I64, U8, U16, U32, U64, F32, F64 -> true;
case F16 -> false;
};
}

/// Builds the per-zone min (or max) values array in the storage shape the primitive encoder
/// expects, decoding each chunk's serialised [ScalarValue] stat.
private static Object statColumn(PType ptype, List<ChunkRef> chunks, boolean max) {
private static Object statColumn(PType ptype, List<ChunkRef> chunks, boolean max) throws IOException {
int n = chunks.size();
return switch (ptype) {
case I8, U8 -> {
Expand Down Expand Up @@ -813,39 +806,32 @@ private static Object statColumn(PType ptype, List<ChunkRef> chunks, boolean max
}
yield a;
}
case F16 -> throw new IllegalStateException("F16 is not zone-mappable");
case F16 -> {
// F16 min/max are serialised as f32 scalars; re-pack to float16 storage.
short[] a = new short[n];
for (int i = 0; i < n; i++) {
a[i] = Float.floatToFloat16((float) scalarDouble(chunks.get(i), max));
}
yield a;
}
};
}

private static long scalarLong(ChunkRef cr, boolean max) {
private static long scalarLong(ChunkRef cr, boolean max) throws IOException {
// Integer columns serialise min/max as int64 (signed) or uint64 (unsigned).
ScalarValue sv = decodeScalar(max ? cr.statsMax() : cr.statsMin());
if (sv.int64_value() != null) {
return sv.int64_value();
}
if (sv.uint64_value() != null) {
return sv.uint64_value();
}
throw new IllegalStateException("expected integer scalar stat");
return sv.int64_value() != null ? sv.int64_value() : sv.uint64_value();
}

private static double scalarDouble(ChunkRef cr, boolean max) {
private static double scalarDouble(ChunkRef cr, boolean max) throws IOException {
// Float columns serialise min/max as f64 (F64) or f32 (F32).
ScalarValue sv = decodeScalar(max ? cr.statsMax() : cr.statsMin());
if (sv.f64_value() != null) {
return sv.f64_value();
}
if (sv.f32_value() != null) {
return sv.f32_value();
}
throw new IllegalStateException("expected float scalar stat");
return sv.f64_value() != null ? sv.f64_value() : sv.f32_value();
}

private static ScalarValue decodeScalar(byte[] bytes) {
private static ScalarValue decodeScalar(byte[] bytes) throws IOException {
MemorySegment seg = MemorySegment.ofArray(bytes);
try {
return ScalarValue.decode(seg, 0, seg.byteSize());
} catch (IOException ex) {
throw new java.io.UncheckedIOException(ex);
}
return ScalarValue.decode(seg, 0, seg.byteSize());
}

private ByteBuffer buildFooter() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import io.github.dfa1.vortex.reader.array.StructArray;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;

import java.io.IOException;
import java.lang.foreign.Arena;
Expand Down Expand Up @@ -151,6 +153,81 @@ void zoneMaps_nullableColumn_recordsPerZoneNullCount(@TempDir Path tmp) throws I
}
}

@ParameterizedTest
@EnumSource(PType.class)
void everyPrimitiveType_emitsZonedLayout(PType ptype, @TempDir Path tmp) throws IOException {
// Given a two-zone file of the given primitive type (globalDict off so the column stays
// a plain primitive, not a dict). Every fixed-width primitive carries min/max stats, so
// every one gets a vortex.stats layout — exercising each per-ptype stat-column arm.
DType.Struct schema = new DType.Struct(
List.of("v"), List.of(new DType.Primitive(ptype, false)), false);
WriteOptions opts = new WriteOptions(2, true, 0.90, 0, false, false);
Path file = tmp.resolve("ptype-" + ptype + ".vtx");
try (var ch = FileChannel.open(file, StandardOpenOption.CREATE, StandardOpenOption.WRITE);
var sut = VortexWriter.create(ch, schema, opts)) {
sut.writeChunk(Map.of("v", sample(ptype, 0)));
sut.writeChunk(Map.of("v", sample(ptype, 2)));
}

// When / Then
try (VortexReader reader = VortexReader.open(file)) {
assertThat(reader.layout().children().get(0).isZoned())
.as("ptype %s zoned", ptype)
.isTrue();
}
}

@Test
void noChunks_emitsNoZoneMap(@TempDir Path tmp) throws IOException {
// Given a file closed without any writeChunk: the column has no chunks, so flushZoneMaps
// skips it (the empty-chunks guard) and emits no zone-map.
WriteOptions opts = new WriteOptions(4, true, 0.90, 0, false, false);
Path file = tmp.resolve("empty.vtx");
try (var ch = FileChannel.open(file, StandardOpenOption.CREATE, StandardOpenOption.WRITE);
var sut = VortexWriter.create(ch, SCHEMA, opts)) {
// no writeChunk
assertThat(sut).isNotNull();
}

// When / Then
try (VortexReader reader = VortexReader.open(file)) {
assertThat(reader.layout().children().get(0).isZoned()).isFalse();
}
}

@Test
void chunkWithoutStats_skipsZoneMap(@TempDir Path tmp) throws IOException {
// Given a column with one normal chunk and one empty chunk (no min/max stats): not every
// chunk carries stats, so flushZoneMaps skips the column (the all-stats guard).
DType.Struct schema = new DType.Struct(
List.of("v"), List.of(new DType.Primitive(PType.I64, false)), false);
WriteOptions opts = new WriteOptions(2, true, 0.90, 0, false, false);
Path file = tmp.resolve("partial.vtx");
try (var ch = FileChannel.open(file, StandardOpenOption.CREATE, StandardOpenOption.WRITE);
var sut = VortexWriter.create(ch, schema, opts)) {
sut.writeChunk(Map.of("v", new long[]{1L, 2L}));
sut.writeChunk(Map.of("v", new long[]{}));
}

// When / Then
try (VortexReader reader = VortexReader.open(file)) {
assertThat(reader.layout().children().get(0).isZoned()).isFalse();
}
}

/// Two values starting at `base` in the storage shape for `ptype`.
private static Object sample(PType ptype, int base) {
return switch (ptype) {
case I8, U8 -> new byte[]{(byte) base, (byte) (base + 1)};
case I16, U16 -> new short[]{(short) base, (short) (base + 1)};
case F16 -> new short[]{Float.floatToFloat16(base), Float.floatToFloat16(base + 1)};
case I32, U32 -> new int[]{base, base + 1};
case I64, U64 -> new long[]{base, base + 1};
case F32 -> new float[]{base, base + 1};
case F64 -> new double[]{base, base + 1};
};
}

/// Reconstructs the stats-table dtype the writer emits: MAX, MIN, NULL_COUNT for an I64 column.
private static DType.Struct statsTableDtype() {
DType nullableI64 = new DType.Primitive(PType.I64, true);
Expand Down