Skip to content

Commit acd95f2

Browse files
authored
Issue 698 HDFS save returns result message dataset, instead of the source dataset (#699)
* add CustomDataset object & test, set bloom create and update to return status dataset instead of the whole dataset. * apply spotless * add isStreaming(boolean) option for CustomDataset. * separate makeRowsList and makeRowsSeq into Rows object & tests. * change Bloom filter result to contain _time column and also switch result column to _raw. * remove null sparksession from CustomDatasetTest.testEqualsContract * remove double call to collectAsList() in CustomDatasetTest.testCreateStreamingDataset * add CustomDataset interface, rename CustomDataset -> CustomDatasetImpl, remove isStreaming ctor arg, this is now handled by CustomResultStep object instead. Result dataset is now created in a separate CustomResultStep, which is added after bloom create and update. * removed unused variables from TeragrepBloomStep * add custom result for hdfs save * fix teragrepTest failing * fix aggregate and join tests failing * spotless * remove possibly unnecessary DataStreamWriter from CustomDatasetImpl
1 parent c73cc91 commit acd95f2

5 files changed

Lines changed: 64 additions & 52 deletions

File tree

src/main/java/com/teragrep/pth_10/ast/commands/transformstatement/TeragrepTransformation.java

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -375,8 +375,27 @@ public Node visitT_saveModeParameter(DPLParser.T_saveModeParameterContext ctx) {
375375
}
376376
}
377377

378-
return new StepNode(
379-
new TeragrepHdfsSaveStep(catCtx, hdfsOverwrite, hdfsPath, hdfsRetentionSpan, format, header)
378+
final CustomResultStep customResultStep = new CustomResultStep(
379+
new CustomDatasetImpl(new StructType(new StructField[] {
380+
StructField.apply("_time", DataTypes.TimestampType, false, new MetadataBuilder().build()),
381+
StructField.apply("_raw", DataTypes.StringType, false, new MetadataBuilder().build())
382+
}), Collections.singletonList(new Object[] {
383+
java.sql.Timestamp.from(Instant.now()), "HDFS save complete."
384+
}), catCtx)
385+
);
386+
387+
return new StepListNode(
388+
Arrays
389+
.asList(
390+
new TeragrepHdfsSaveStep(
391+
catCtx,
392+
hdfsOverwrite,
393+
hdfsPath,
394+
hdfsRetentionSpan,
395+
format,
396+
header
397+
), customResultStep
398+
)
380399
);
381400
}
382401

src/test/java/com/teragrep/pth_10/AggregateAfterSequentialCommandTest.java

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -138,20 +138,10 @@ public void aggregateAfterHdfsLoadTest() {
138138
new StructField(
139139
"_time",
140140
DataTypes.TimestampType,
141-
true,
141+
false,
142142
new MetadataBuilder().build()
143143
),
144-
new StructField("id", DataTypes.LongType, true, new MetadataBuilder().build()),
145-
new StructField("_raw", DataTypes.StringType, true, new MetadataBuilder().build()),
146-
new StructField("index", DataTypes.StringType, true, new MetadataBuilder().build()),
147-
new StructField(
148-
"sourcetype",
149-
DataTypes.StringType,
150-
true,
151-
new MetadataBuilder().build()
152-
),
153-
new StructField("host", DataTypes.StringType, true, new MetadataBuilder().build()),
154-
new StructField("source", DataTypes.StringType, true, new MetadataBuilder().build()), new StructField("partition", DataTypes.StringType, true, new MetadataBuilder().build()), new StructField("offset", DataTypes.LongType, true, new MetadataBuilder().build()), new StructField("wind_speed", DataTypes.StringType, true, new MetadataBuilder().build()), new StructField("latitude", DataTypes.StringType, true, new MetadataBuilder().build()), new StructField("rainfall_rate", DataTypes.StringType, true, new MetadataBuilder().build()), new StructField("atmosphere_cloud_liquid_water_content", DataTypes.StringType, true, new MetadataBuilder().build()), new StructField("atmosphere_water_vapor_content", DataTypes.StringType, true, new MetadataBuilder().build())
144+
new StructField("_raw", DataTypes.StringType, false, new MetadataBuilder().build())
155145
}), ds.schema());
156146
}
157147
);

src/test/java/com/teragrep/pth_10/JoinTransformationTest.java

Lines changed: 2 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -119,29 +119,16 @@ public void joinRightSideHdfsLoadTest() {
119119
new StructField(
120120
"_time",
121121
DataTypes.TimestampType,
122-
true,
122+
false,
123123
new MetadataBuilder().build()
124124
),
125-
new StructField("id", DataTypes.LongType, true, new MetadataBuilder().build()),
126-
new StructField("_raw", DataTypes.StringType, true, new MetadataBuilder().build()),
127-
new StructField("index", DataTypes.StringType, true, new MetadataBuilder().build()),
128-
new StructField(
129-
"sourcetype",
130-
DataTypes.StringType,
131-
true,
132-
new MetadataBuilder().build()
133-
),
134-
new StructField("host", DataTypes.StringType, true, new MetadataBuilder().build()),
135-
new StructField("source", DataTypes.StringType, true, new MetadataBuilder().build()), new StructField("partition", DataTypes.StringType, true, new MetadataBuilder().build()), new StructField("offset", DataTypes.LongType, true, new MetadataBuilder().build()), new StructField("a", DataTypes.IntegerType, false, new MetadataBuilder().build())
125+
new StructField("_raw", DataTypes.StringType, false, new MetadataBuilder().build())
136126
});
137127
Assertions
138128
.assertEquals(
139129
expectedSchema, ds.schema(),
140130
"Batch handler dataset contained an unexpected column arrangement !"
141131
);
142-
143-
Row r = ds.select("a").distinct().first();
144-
Assertions.assertEquals("12345", r.getAs(0).toString());
145132
}
146133
);
147134
this.streamingTestUtil.setUp(); // reset for another run

src/test/java/com/teragrep/pth_10/TeragrepTransformationTest.java

Lines changed: 21 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -375,12 +375,13 @@ public void tgHdfsSaveAfterBloomEstimateTest() {
375375
+ id,
376376
testFile, ds -> {
377377
List<String> listOfResult = ds
378-
.select("estimate(tokens)")
378+
.select("_raw")
379+
.distinct()
379380
.collectAsList()
380381
.stream()
381382
.map(r -> r.getAs(0).toString())
382383
.collect(Collectors.toList());
383-
Assertions.assertEquals(Collections.singletonList("5"), listOfResult);
384+
Assertions.assertEquals(List.of("HDFS save complete."), listOfResult);
384385
}
385386
);
386387
this.streamingTestUtil.setUp();
@@ -408,12 +409,13 @@ public void tgHdfsSaveAfterAggregateTest() {
408409
+ id,
409410
testFile, ds -> {
410411
List<String> listOfResult = ds
411-
.select("avg_offset")
412+
.select("_raw")
413+
.distinct()
412414
.collectAsList()
413415
.stream()
414416
.map(r -> r.getAs(0).toString())
415417
.collect(Collectors.toList());
416-
Assertions.assertEquals(Collections.singletonList("3.0"), listOfResult);
418+
Assertions.assertEquals(List.of("HDFS save complete."), listOfResult);
417419
}
418420
);
419421
this.streamingTestUtil.setUp();
@@ -441,12 +443,13 @@ public void tgHdfsSaveAfterTwoAggregationsTest() {
441443
+ " | teragrep exec hdfs save /tmp/pth_10_hdfs/" + id,
442444
testFile, ds -> {
443445
List<String> listOfResult = ds
444-
.select("offset_values")
446+
.select("_raw")
447+
.distinct()
445448
.collectAsList()
446449
.stream()
447450
.map(r -> r.getAs(0).toString())
448451
.collect(Collectors.toList());
449-
Assertions.assertEquals(Collections.singletonList("3.0"), listOfResult);
452+
Assertions.assertEquals(List.of("HDFS save complete."), listOfResult);
450453
}
451454
);
452455
this.streamingTestUtil.setUp();
@@ -473,13 +476,13 @@ public void tgHdfsSaveAfterSequentialTest() {
473476
"index=index_A | sort num(offset) | teragrep exec hdfs save /tmp/pth_10_hdfs/" + id, testFile,
474477
ds -> {
475478
List<String> listOfResult = ds
476-
.select("offset")
477-
.orderBy("offset")
479+
.select("_raw")
480+
.distinct()
478481
.collectAsList()
479482
.stream()
480483
.map(r -> r.getAs(0).toString())
481484
.collect(Collectors.toList());
482-
Assertions.assertEquals(Arrays.asList("1", "2", "3", "4", "5"), listOfResult);
485+
Assertions.assertEquals(List.of("HDFS save complete."), listOfResult);
483486
}
484487
);
485488
this.streamingTestUtil.setUp();
@@ -505,13 +508,13 @@ public void tgHdfsSaveTest() {
505508
streamingTestUtil
506509
.performDPLTest("index=index_A | teragrep exec hdfs save /tmp/pth_10_hdfs/" + id, testFile, ds -> {
507510
List<String> listOfResult = ds
508-
.select("offset")
509-
.orderBy("offset")
511+
.select("_raw")
512+
.distinct()
510513
.collectAsList()
511514
.stream()
512515
.map(r -> r.getAs(0).toString())
513516
.collect(Collectors.toList());
514-
Assertions.assertEquals(Arrays.asList("1", "2", "3", "4", "5"), listOfResult);
517+
Assertions.assertEquals(List.of("HDFS save complete."), listOfResult);
515518
});
516519
this.streamingTestUtil.setUp();
517520
streamingTestUtil.performDPLTest("| teragrep exec hdfs load /tmp/pth_10_hdfs/" + id, testFile, ds -> {
@@ -538,13 +541,13 @@ public void tgHdfsSaveOverwriteTest() {
538541
"index=index_A | teragrep exec hdfs save /tmp/pth_10_hdfs/" + id + " overwrite=true", testFile,
539542
ds -> {
540543
List<String> listOfResult = ds
541-
.select("offset")
542-
.orderBy("offset")
544+
.select("_raw")
545+
.distinct()
543546
.collectAsList()
544547
.stream()
545548
.map(r -> r.getAs(0).toString())
546549
.collect(Collectors.toList());
547-
Assertions.assertEquals(Arrays.asList("1", "2", "3", "4", "5"), listOfResult);
550+
Assertions.assertEquals(List.of("HDFS save complete."), listOfResult);
548551
}
549552
);
550553
this.streamingTestUtil.setUp();
@@ -768,12 +771,13 @@ public void tgHdfsSaveAfterBloomEstimateTestUsingRegexExtract() {
768771
+ id,
769772
testFile, ds -> {
770773
List<String> listOfResult = ds
771-
.select("estimate(tokens)")
774+
.select("_raw")
775+
.distinct()
772776
.collectAsList()
773777
.stream()
774778
.map(r -> r.getAs(0).toString())
775779
.collect(Collectors.toList());
776-
Assertions.assertEquals(Collections.singletonList("5"), listOfResult);
780+
Assertions.assertEquals(List.of("HDFS save complete."), listOfResult);
777781
}
778782
);
779783
this.streamingTestUtil.setUp();

src/test/java/com/teragrep/pth_10/translationTests/TeragrepTest.java

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -180,15 +180,19 @@ void testTeragrepHdfsSaveTranslation() {
180180
final DPLParserCatalystVisitor visitor = new DPLParserCatalystVisitor(ctx);
181181

182182
final TeragrepTransformation ct = new TeragrepTransformation(ctx, visitor);
183-
StepNode stepNode = (StepNode) ct
183+
StepListNode stepNode = (StepListNode) ct
184184
.visitTeragrepTransformation((DPLParser.TeragrepTransformationContext) tree.getChild(1).getChild(0));
185-
final AbstractStep step = stepNode.get();
185+
final List<AbstractStep> steps = stepNode.asList();
186+
final AbstractStep step = steps.get(0);
186187

187188
Assertions.assertEquals(TeragrepHdfsSaveStep.class, step.getClass());
188189
final TeragrepHdfsSaveStep saveStep = (TeragrepHdfsSaveStep) step;
189190

190191
Assertions.assertEquals("/tmp/path", saveStep.pathStr);
191192
Assertions.assertNull(saveStep.retentionSpan);
193+
194+
final AbstractStep step2 = steps.get(1);
195+
Assertions.assertEquals(CustomResultStep.class, step2.getClass());
192196
}
193197

194198
@Test
@@ -206,15 +210,18 @@ void testTeragrepHdfsSaveRetentionTranslation() {
206210
LOGGER.debug(tree.toStringTree(parser));
207211

208212
final TeragrepTransformation ct = new TeragrepTransformation(ctx, visitor);
209-
StepNode stepNode = (StepNode) ct
213+
StepListNode stepNode = (StepListNode) ct
210214
.visitTeragrepTransformation((DPLParser.TeragrepTransformationContext) tree.getChild(1).getChild(0));
211-
final AbstractStep step = stepNode.get();
215+
final List<AbstractStep> steps = stepNode.asList();
216+
final AbstractStep step = steps.get(0);
212217

213218
Assertions.assertEquals(TeragrepHdfsSaveStep.class, step.getClass());
214219
final TeragrepHdfsSaveStep saveStep = (TeragrepHdfsSaveStep) step;
215220

216221
Assertions.assertEquals("/tmp/path", saveStep.pathStr);
217222
Assertions.assertEquals("1d", saveStep.retentionSpan);
223+
final AbstractStep step2 = steps.get(1);
224+
Assertions.assertEquals(CustomResultStep.class, step2.getClass());
218225
}
219226

220227
@Test
@@ -232,16 +239,21 @@ void testTeragrepHdfsSaveOverwriteTranslation() {
232239
LOGGER.debug(tree.toStringTree(parser));
233240

234241
final TeragrepTransformation ct = new TeragrepTransformation(ctx, visitor);
235-
StepNode stepNode = (StepNode) ct
242+
StepListNode stepNode = (StepListNode) ct
236243
.visitTeragrepTransformation((DPLParser.TeragrepTransformationContext) tree.getChild(1).getChild(0));
237-
final AbstractStep step = stepNode.get();
244+
final List<AbstractStep> steps = stepNode.asList();
245+
final AbstractStep step = steps.get(0);
238246

239247
Assertions.assertEquals(TeragrepHdfsSaveStep.class, step.getClass());
240248
final TeragrepHdfsSaveStep saveStep = (TeragrepHdfsSaveStep) step;
241249

242250
Assertions.assertEquals("/tmp/path", saveStep.pathStr);
243251
Assertions.assertTrue(saveStep.overwrite);
244252
Assertions.assertNull(saveStep.retentionSpan);
253+
254+
final AbstractStep step2 = steps.get(1);
255+
Assertions.assertEquals(CustomResultStep.class, step2.getClass());
256+
245257
}
246258

247259
@Test

0 commit comments

Comments
 (0)