Skip to content

Commit f09254b

Browse files
authored
Revert "remove collectAsList call (#13)" (#41)
This reverts commit 9f6132b.
1 parent 9f6132b commit f09254b

1 file changed

Lines changed: 4 additions & 3 deletions

File tree

src/main/java/com/teragrep/functions/dpf_02/BatchCollect.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -132,13 +132,14 @@ public void collect(Dataset<Row> batchDF, Long batchId, List<AbstractStep> postB
132132
if (!skipLimiting && limit > 0) {
133133
orderedDs = orderedDs.limit(limit);
134134
}
135-
135+
List<Row> collected = orderedDs.collectAsList();
136+
Dataset<Row> createdDsFromCollected = SparkSession.builder().getOrCreate().createDataFrame(collected, this.inputSchema);
136137
Dataset<Row> current;
137138
if (this.savedDs == null) {
138-
current = orderedDs;
139+
current = createdDsFromCollected;
139140
}
140141
else {
141-
current = savedDs.union(orderedDs);
142+
current = savedDs.union(createdDsFromCollected);
142143
}
143144

144145
current = orderDataset(current);

0 commit comments

Comments
 (0)