Skip to content

Commit 6ce5255

Browse files
authored
Issue 647: Bloom create and update now returns only status messages. (#693)
* add CustomDataset object & test, set bloom create and update to return status dataset instead of the whole dataset. * apply spotless * add isStreaming(boolean) option for CustomDataset. * separate makeRowsList and makeRowsSeq into Rows object & tests. * change Bloom filter result to contain _time column and also switch result column to _raw. * remove null sparksession from CustomDatasetTest.testEqualsContract * remove double call to collectAsList() in CustomDatasetTest.testCreateStreamingDataset * add CustomDataset interface, rename CustomDataset -> CustomDatasetImpl, remove isStreaming ctor arg, this is now handled by CustomResultStep object instead. Result dataset is now created in a separate CustomResultStep, which is added after bloom create and update. * removed unused variables from TeragrepBloomStep * remove possibly unnecessary DataStreamWriter from CustomDatasetImpl
1 parent 1efc7fa commit 6ce5255

8 files changed

Lines changed: 604 additions & 5 deletions

File tree

src/main/java/com/teragrep/pth_10/ast/commands/transformstatement/TeragrepTransformation.java

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,8 @@
6161
import com.teragrep.pth_10.ast.commands.transformstatement.teragrep.RegexValueFromBloomContext;
6262
import com.teragrep.pth_10.ast.commands.transformstatement.teragrep.TableNameFromBloomContext;
6363
import com.teragrep.pth_10.ast.ContextValue;
64+
import com.teragrep.pth_10.datasources.CustomDatasetImpl;
65+
import com.teragrep.pth_10.steps.CustomResultStep;
6466
import com.teragrep.pth_10.steps.teragrep.*;
6567
import com.teragrep.pth_10.steps.teragrep.AbstractTokenizerStep;
6668
import com.teragrep.pth_10.steps.teragrep.TeragrepTokenizerStep;
@@ -69,13 +71,19 @@
6971
import com.teragrep.pth_03.antlr.DPLParser;
7072
import com.teragrep.pth_03.antlr.DPLParserBaseVisitor;
7173
import com.teragrep.pth_03.shaded.org.antlr.v4.runtime.tree.TerminalNode;
74+
import org.apache.spark.sql.types.DataTypes;
75+
import org.apache.spark.sql.types.MetadataBuilder;
76+
import org.apache.spark.sql.types.StructField;
77+
import org.apache.spark.sql.types.StructType;
7278
import org.slf4j.Logger;
7379
import org.slf4j.LoggerFactory;
7480
import org.w3c.dom.Document;
7581

7682
import javax.xml.parsers.DocumentBuilderFactory;
7783
import javax.xml.parsers.ParserConfigurationException;
84+
import java.time.Instant;
7885
import java.util.Arrays;
86+
import java.util.Collections;
7987

8088
/**
8189
* Class containing the visitor methods for all "| teragrep" subcommands
@@ -509,7 +517,18 @@ public Node visitT_bloomOptionParameter(final DPLParser.T_bloomOptionParameterCo
509517
outputCol.value(),
510518
estimateCol.value()
511519
);
512-
rv = new StepListNode(Arrays.asList(aggregateStep, bloomStepWithRegexAndTable));
520+
// Create a step, that returns a Custom dataset containing the result message
521+
// instead of the whole dataset
522+
final CustomResultStep customResultStep = new CustomResultStep(
523+
new CustomDatasetImpl(new StructType(new StructField[] {
524+
StructField.apply("_time", DataTypes.TimestampType, false, new MetadataBuilder().build()),
525+
StructField.apply("_raw", DataTypes.StringType, false, new MetadataBuilder().build())
526+
}), Collections.singletonList(new Object[] {
527+
Instant.now(),
528+
"Bloom filter operation ".concat(mode.value().toString()).concat(" was completed.")
529+
}), catCtx)
530+
);
531+
rv = new StepListNode(Arrays.asList(aggregateStep, bloomStepWithRegexAndTable, customResultStep));
513532
}
514533
else {
515534
final TeragrepBloomStep bloomStep = new TeragrepBloomStep(
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/*
2+
* Teragrep Data Processing Language (DPL) translator for Apache Spark (pth_10)
3+
* Copyright (C) 2019-2025 Suomen Kanuuna Oy
4+
*
5+
* This program is free software: you can redistribute it and/or modify
6+
* it under the terms of the GNU Affero General Public License as published by
7+
* the Free Software Foundation, either version 3 of the License, or
8+
* (at your option) any later version.
9+
*
10+
* This program is distributed in the hope that it will be useful,
11+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
12+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13+
* GNU Affero General Public License for more details.
14+
*
15+
* You should have received a copy of the GNU Affero General Public License
16+
* along with this program. If not, see <https://www.gnu.org/licenses/>.
17+
*
18+
*
19+
* Additional permission under GNU Affero General Public License version 3
20+
* section 7
21+
*
22+
* If you modify this Program, or any covered work, by linking or combining it
23+
* with other code, such other code is not for that reason alone subject to any
24+
* of the requirements of the GNU Affero GPL version 3 as long as this Program
25+
* is the same Program as licensed from Suomen Kanuuna Oy without any additional
26+
* modifications.
27+
*
28+
* Supplemented terms under GNU Affero General Public License version 3
29+
* section 7
30+
*
31+
* Origin of the software must be attributed to Suomen Kanuuna Oy. Any modified
32+
* versions must be marked as "Modified version of" The Program.
33+
*
34+
* Names of the licensors and authors may not be used for publicity purposes.
35+
*
36+
* No rights are granted for use of trade names, trademarks, or service marks
37+
* which are in The Program if any.
38+
*
39+
* Licensee must indemnify licensors and authors for any liability that these
40+
* contractual assumptions impose on licensors and authors.
41+
*
42+
* To the extent this program is licensed as part of the Commercial versions of
43+
* Teragrep, the applicable Commercial License may apply to this file if you as
44+
* a licensee so wish it.
45+
*/
46+
package com.teragrep.pth_10.datasources;
47+
48+
import org.apache.spark.sql.Dataset;
49+
import org.apache.spark.sql.Row;
50+
51+
public interface CustomDataset {
52+
53+
public abstract Dataset<Row> asStaticDataset();
54+
55+
public abstract Dataset<Row> asStreamingDataset();
56+
}
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
/*
2+
* Teragrep Data Processing Language (DPL) translator for Apache Spark (pth_10)
3+
* Copyright (C) 2019-2025 Suomen Kanuuna Oy
4+
*
5+
* This program is free software: you can redistribute it and/or modify
6+
* it under the terms of the GNU Affero General Public License as published by
7+
* the Free Software Foundation, either version 3 of the License, or
8+
* (at your option) any later version.
9+
*
10+
* This program is distributed in the hope that it will be useful,
11+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
12+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13+
* GNU Affero General Public License for more details.
14+
*
15+
* You should have received a copy of the GNU Affero General Public License
16+
* along with this program. If not, see <https://www.gnu.org/licenses/>.
17+
*
18+
*
19+
* Additional permission under GNU Affero General Public License version 3
20+
* section 7
21+
*
22+
* If you modify this Program, or any covered work, by linking or combining it
23+
* with other code, such other code is not for that reason alone subject to any
24+
* of the requirements of the GNU Affero GPL version 3 as long as this Program
25+
* is the same Program as licensed from Suomen Kanuuna Oy without any additional
26+
* modifications.
27+
*
28+
* Supplemented terms under GNU Affero General Public License version 3
29+
* section 7
30+
*
31+
* Origin of the software must be attributed to Suomen Kanuuna Oy. Any modified
32+
* versions must be marked as "Modified version of" The Program.
33+
*
34+
* Names of the licensors and authors may not be used for publicity purposes.
35+
*
36+
* No rights are granted for use of trade names, trademarks, or service marks
37+
* which are in The Program if any.
38+
*
39+
* Licensee must indemnify licensors and authors for any liability that these
40+
* contractual assumptions impose on licensors and authors.
41+
*
42+
* To the extent this program is licensed as part of the Commercial versions of
43+
* Teragrep, the applicable Commercial License may apply to this file if you as
44+
* a licensee so wish it.
45+
*/
46+
package com.teragrep.pth_10.datasources;
47+
48+
import com.teragrep.pth_10.ast.DPLParserCatalystContext;
49+
import org.apache.spark.sql.*;
50+
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
51+
import org.apache.spark.sql.catalyst.encoders.RowEncoder;
52+
import org.apache.spark.sql.execution.streaming.MemoryStream;
53+
import org.apache.spark.sql.types.StructType;
54+
import scala.Option;
55+
56+
import java.util.List;
57+
import java.util.Objects;
58+
59+
public final class CustomDatasetImpl implements CustomDataset {
60+
61+
private final StructType schema;
62+
private final List<Object[]> values;
63+
private final DPLParserCatalystContext catCtx;
64+
65+
public CustomDatasetImpl(
66+
final StructType schema,
67+
final List<Object[]> values,
68+
final DPLParserCatalystContext catCtx
69+
) {
70+
this.schema = schema;
71+
this.values = values;
72+
this.catCtx = catCtx;
73+
}
74+
75+
@Override
76+
public Dataset<Row> asStaticDataset() {
77+
return catCtx.getSparkSession().createDataFrame(new Rows(values).asList(), schema);
78+
}
79+
80+
@Override
81+
public Dataset<Row> asStreamingDataset() {
82+
final SQLContext sqlContext = catCtx.getSparkSession().sqlContext();
83+
final ExpressionEncoder<Row> encoder = RowEncoder.apply(schema);
84+
final MemoryStream<Row> rowMemoryStream = new MemoryStream<>(1, sqlContext, Option.apply(1), encoder);
85+
86+
rowMemoryStream.addData(new Rows(values).asSeq());
87+
88+
return rowMemoryStream.toDF();
89+
}
90+
91+
@Override
92+
public boolean equals(final Object o) {
93+
if (o == null || getClass() != o.getClass()) {
94+
return false;
95+
}
96+
final CustomDatasetImpl that = (CustomDatasetImpl) o;
97+
return Objects.equals(schema, that.schema) && Objects.equals(values, that.values)
98+
&& Objects.equals(catCtx, that.catCtx);
99+
}
100+
101+
@Override
102+
public int hashCode() {
103+
return Objects.hash(schema, values, catCtx);
104+
}
105+
}
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
/*
2+
* Teragrep Data Processing Language (DPL) translator for Apache Spark (pth_10)
3+
* Copyright (C) 2019-2025 Suomen Kanuuna Oy
4+
*
5+
* This program is free software: you can redistribute it and/or modify
6+
* it under the terms of the GNU Affero General Public License as published by
7+
* the Free Software Foundation, either version 3 of the License, or
8+
* (at your option) any later version.
9+
*
10+
* This program is distributed in the hope that it will be useful,
11+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
12+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13+
* GNU Affero General Public License for more details.
14+
*
15+
* You should have received a copy of the GNU Affero General Public License
16+
* along with this program. If not, see <https://www.gnu.org/licenses/>.
17+
*
18+
*
19+
* Additional permission under GNU Affero General Public License version 3
20+
* section 7
21+
*
22+
* If you modify this Program, or any covered work, by linking or combining it
23+
* with other code, such other code is not for that reason alone subject to any
24+
* of the requirements of the GNU Affero GPL version 3 as long as this Program
25+
* is the same Program as licensed from Suomen Kanuuna Oy without any additional
26+
* modifications.
27+
*
28+
* Supplemented terms under GNU Affero General Public License version 3
29+
* section 7
30+
*
31+
* Origin of the software must be attributed to Suomen Kanuuna Oy. Any modified
32+
* versions must be marked as "Modified version of" The Program.
33+
*
34+
* Names of the licensors and authors may not be used for publicity purposes.
35+
*
36+
* No rights are granted for use of trade names, trademarks, or service marks
37+
* which are in The Program if any.
38+
*
39+
* Licensee must indemnify licensors and authors for any liability that these
40+
* contractual assumptions impose on licensors and authors.
41+
*
42+
* To the extent this program is licensed as part of the Commercial versions of
43+
* Teragrep, the applicable Commercial License may apply to this file if you as
44+
* a licensee so wish it.
45+
*/
46+
package com.teragrep.pth_10.datasources;
47+
48+
import org.apache.spark.sql.Row;
49+
import org.apache.spark.sql.RowFactory;
50+
import scala.collection.JavaConverters;
51+
import scala.collection.Seq;
52+
53+
import java.util.ArrayList;
54+
import java.util.List;
55+
import java.util.Objects;
56+
57+
public final class Rows {
58+
59+
private final List<Object[]> values;
60+
61+
public Rows(final List<Object[]> values) {
62+
this.values = values;
63+
}
64+
65+
public Seq<Row> asSeq() {
66+
return JavaConverters.asScalaBuffer(asList()).toSeq();
67+
}
68+
69+
public List<Row> asList() {
70+
final List<Row> rows = new ArrayList<>();
71+
for (final Object[] rowValues : values) {
72+
rows.add(RowFactory.create(rowValues));
73+
}
74+
return rows;
75+
}
76+
77+
@Override
78+
public boolean equals(final Object o) {
79+
if (o == null || getClass() != o.getClass()) {
80+
return false;
81+
}
82+
final Rows rows = (Rows) o;
83+
return Objects.equals(values, rows.values);
84+
}
85+
86+
@Override
87+
public int hashCode() {
88+
return Objects.hashCode(values);
89+
}
90+
}
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
/*
2+
* Teragrep Data Processing Language (DPL) translator for Apache Spark (pth_10)
3+
* Copyright (C) 2019-2025 Suomen Kanuuna Oy
4+
*
5+
* This program is free software: you can redistribute it and/or modify
6+
* it under the terms of the GNU Affero General Public License as published by
7+
* the Free Software Foundation, either version 3 of the License, or
8+
* (at your option) any later version.
9+
*
10+
* This program is distributed in the hope that it will be useful,
11+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
12+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13+
* GNU Affero General Public License for more details.
14+
*
15+
* You should have received a copy of the GNU Affero General Public License
16+
* along with this program. If not, see <https://www.gnu.org/licenses/>.
17+
*
18+
*
19+
* Additional permission under GNU Affero General Public License version 3
20+
* section 7
21+
*
22+
* If you modify this Program, or any covered work, by linking or combining it
23+
* with other code, such other code is not for that reason alone subject to any
24+
* of the requirements of the GNU Affero GPL version 3 as long as this Program
25+
* is the same Program as licensed from Suomen Kanuuna Oy without any additional
26+
* modifications.
27+
*
28+
* Supplemented terms under GNU Affero General Public License version 3
29+
* section 7
30+
*
31+
* Origin of the software must be attributed to Suomen Kanuuna Oy. Any modified
32+
* versions must be marked as "Modified version of" The Program.
33+
*
34+
* Names of the licensors and authors may not be used for publicity purposes.
35+
*
36+
* No rights are granted for use of trade names, trademarks, or service marks
37+
* which are in The Program if any.
38+
*
39+
* Licensee must indemnify licensors and authors for any liability that these
40+
* contractual assumptions impose on licensors and authors.
41+
*
42+
* To the extent this program is licensed as part of the Commercial versions of
43+
* Teragrep, the applicable Commercial License may apply to this file if you as
44+
* a licensee so wish it.
45+
*/
46+
package com.teragrep.pth_10.steps;
47+
48+
import com.teragrep.functions.dpf_02.AbstractStep;
49+
import com.teragrep.pth_10.datasources.CustomDataset;
50+
import org.apache.spark.sql.Dataset;
51+
import org.apache.spark.sql.Row;
52+
import org.apache.spark.sql.streaming.StreamingQueryException;
53+
54+
public final class CustomResultStep extends AbstractStep {
55+
56+
private final CustomDataset customDataset;
57+
58+
public CustomResultStep(final CustomDataset customDataset) {
59+
this.customDataset = customDataset;
60+
}
61+
62+
@Override
63+
public Dataset<Row> get(final Dataset<Row> dataset) throws StreamingQueryException {
64+
final Dataset<Row> result;
65+
66+
if (dataset.isStreaming()) {
67+
result = customDataset.asStreamingDataset();
68+
}
69+
else {
70+
result = customDataset.asStaticDataset();
71+
}
72+
73+
return result;
74+
}
75+
}

src/main/java/com/teragrep/pth_10/steps/teragrep/TeragrepBloomStep.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,6 @@
5555
import org.apache.spark.sql.Dataset;
5656
import org.apache.spark.sql.Row;
5757
import org.apache.spark.sql.functions;
58-
import org.slf4j.Logger;
59-
import org.slf4j.LoggerFactory;
6058

6159
/**
6260
* teragrep exec bloom
@@ -67,8 +65,6 @@ public enum BloomMode {
6765
UPDATE, CREATE, ESTIMATE, AGGREGATE, DEFAULT
6866
}
6967

70-
private static final Logger LOGGER = LoggerFactory.getLogger(TeragrepBloomStep.class);
71-
7268
private final Config zeppelinConfig;
7369
public final BloomMode mode;
7470
private final String tableName;

0 commit comments

Comments
 (0)