Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -734,6 +734,26 @@ private void flushZoneMaps() throws IOException {
int zonesSegIdx = writeSegment(statsDtype, new StructData(fields), new StructEncodingEncoder());
zoneMaps.put(colName, new ZoneMapRef(zonesSegIdx, nZones, options.chunkSize(), hasMinMax));
}
// Dict-encoded columns live in a separate path (one zone per code chunk); they carry
// NULL_COUNT only for now (no dict-level MIN/MAX yet). Matches Rust, which zone-maps dict
// columns (vortex.stats wrapping vortex.dict).
for (Map.Entry<String, DictColRef> e : dictColRefs.entrySet()) {
// A dict column always has at least one code chunk, so null counts are non-empty.
long[] nullCounts = e.getValue().chunkNullCounts().stream().mapToLong(Long::longValue).toArray();
writeNullCountZoneMap(e.getKey(), nullCounts);
}
}

/// Emits a NULL_COUNT-only `vortex.stats` zone-map (one zone per chunk) for `colName`.
private void writeNullCountZoneMap(String colName, long[] nullCounts) throws IOException {
int nZones = nullCounts.length;
boolean[] allValid = new boolean[nZones];
java.util.Arrays.fill(allValid, true);
DType.Struct statsDtype = new DType.Struct(
List.of("null_count"), List.of(new DType.Primitive(PType.U64, true)), false);
StructData sd = new StructData(List.of(new NullableData(nullCounts, allValid)));
int zonesSegIdx = writeSegment(statsDtype, sd, new StructEncodingEncoder());
zoneMaps.put(colName, new ZoneMapRef(zonesSegIdx, nZones, options.chunkSize(), false));
}

/// Wraps a column's data layout in a `vortex.stats` (zoned) layout when a zone-map was
Expand Down Expand Up @@ -895,7 +915,8 @@ private ByteBuffer buildLayout() {
String colName = schema.fieldNames().get(c);
DictColRef ref = dictColRefs.get(colName);
if (ref != null) {
colLayouts[c] = buildDictColLayout(fbb, ref);
int dictLayout = buildDictColLayout(fbb, ref);
colLayouts[c] = wrapZoneMap(fbb, colName, dictLayout, ref.totalRows());
if (totalRows == 0) {
totalRows = ref.totalRows();
}
Expand Down Expand Up @@ -1029,14 +1050,17 @@ private void writeGlobalDictColumn(String colName, DType.Primitive dtype, List<O
DType codesDtype = new DType.Primitive(codePType, false);
List<Integer> codesSegIdxes = new ArrayList<>();
List<Long> chunkRowCounts = new ArrayList<>();
List<Long> chunkNullCounts = new ArrayList<>();
for (Object chunk : chunks) {
int len = primitiveArrayLen(chunk, ptype);
Object codesArr = buildCodesArray(chunk, ptype, valueMap, codePType, len);
codesSegIdxes.add(writeSegment(codesDtype, codesArr));
chunkRowCounts.add((long) len);
chunkNullCounts.add(chunk instanceof NullableData nd ? countNulls(nd.validity()) : 0L);
}

dictColRefs.put(colName, new DictColRef(valuesSegIdx, dictSize, codesSegIdxes, chunkRowCounts));
dictColRefs.put(colName,
new DictColRef(valuesSegIdx, dictSize, codesSegIdxes, chunkRowCounts, chunkNullCounts));
}

private void writeGlobalDictUtf8Column(String colName, DType.Utf8 dtype, List<Object> chunks)
Expand Down Expand Up @@ -1072,13 +1096,16 @@ private void writeGlobalDictUtf8Column(String colName, DType.Utf8 dtype, List<Ob
DType codesDtype = new DType.Primitive(codePType, false);
List<Integer> codesSegIdxes = new ArrayList<>();
List<Long> chunkRowCounts = new ArrayList<>();
List<Long> chunkNullCounts = new ArrayList<>();
for (Object chunk : chunks) {
String[] strs = (String[]) chunk;
Object codesArr = buildUtf8CodesArray(strs, valueMap, codePType);
codesSegIdxes.add(writeSegment(codesDtype, codesArr));
chunkRowCounts.add((long) strs.length);
chunkNullCounts.add(0L); // global-dict Utf8 columns are dense (non-nullable)
}
dictColRefs.put(colName, new DictColRef(valuesSegIdx, dictSize, codesSegIdxes, chunkRowCounts));
dictColRefs.put(colName,
new DictColRef(valuesSegIdx, dictSize, codesSegIdxes, chunkRowCounts, chunkNullCounts));
}

private static Object buildUtf8CodesArray(String[] strs, Map<String, Integer> valueMap, PType codePType) {
Expand Down Expand Up @@ -1283,7 +1310,7 @@ private record ZoneMapRef(int zonesSegIdx, long nZones, long zoneLen, boolean ha
}

private record DictColRef(int valuesSegIdx, long valuesLen, List<Integer> codesSegIdxes,
List<Long> chunkRowCounts) {
List<Long> chunkRowCounts, List<Long> chunkNullCounts) {
long totalRows() {
return chunkRowCounts.stream().mapToLong(Long::longValue).sum();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,31 @@ void nonPrimitiveColumn_emitsNullCountOnlyZoneMap(@TempDir Path tmp) throws IOEx
}
}

@Test
void dictColumn_emitsNullCountZoneMapWrappingDict(@TempDir Path tmp) throws IOException {
// Given a low-cardinality Utf8 column across 2 chunks of 6 with heavy repeats (so the
// global-dict candidate test, distinct*2 < rows, fires) → vortex.dict; with zone maps the
// column is vortex.stats wrapping the dict, carrying NULL_COUNT only.
DType.Struct schema = new DType.Struct(
List.of("s"), List.of(new DType.Utf8(false)), false);
WriteOptions opts = new WriteOptions(6, true, 0.90, 0, true, false);
Path file = tmp.resolve("dict.vtx");
try (var ch = FileChannel.open(file, StandardOpenOption.CREATE, StandardOpenOption.WRITE);
var sut = VortexWriter.create(ch, schema, opts)) {
sut.writeChunk(Map.of("s", new String[]{"a", "a", "a", "b", "b", "b"}));
sut.writeChunk(Map.of("s", new String[]{"c", "c", "c", "a", "a", "a"}));
}

// When / Then — zoned over a dict data child, NULL_COUNT-only bitset
try (VortexReader reader = VortexReader.open(file)) {
Layout column = reader.layout().children().get(0);
assertThat(column.isZoned()).isTrue();
assertThat(column.children().get(0).isDict()).isTrue();
ByteBuffer meta = column.metadata().duplicate().order(ByteOrder.LITTLE_ENDIAN);
assertThat(meta.get(meta.position() + 4)).isEqualTo((byte) 0x40);
}
}

/// Two values starting at `base` in the storage shape for `ptype`.
private static Object sample(PType ptype, int base) {
return switch (ptype) {
Expand Down