Skip to content

Commit 64cddf2

Browse files
authored
Add pth_15 class implementations, move pth_07 DPLExecutor stuff here (#657)
* add pth_15 class implementations, move DPLExecutor from zep_01/pth_07 to pth_10, change BatchHandler type to BiConsumer, add pth_15 dependency. * remove runIncrement and queryId fields from DPLExecutorImpl; remove metrics from DPLExecutorResultImpl; remove consumers from DPLStreamingQueryListener; update pth_15 to 2.0.0
1 parent 94b8ee2 commit 64cddf2

10 files changed

Lines changed: 756 additions & 10 deletions

File tree

pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,10 +54,16 @@
5454
<teragrep.jue_01.version>0.4.3</teragrep.jue_01.version>
5555
<teragrep.pth_03.version>9.3.0</teragrep.pth_03.version>
5656
<teragrep.pth_06.version>3.5.0</teragrep.pth_06.version>
57+
<teragrep.pth_15.version>2.0.0</teragrep.pth_15.version>
5758
<teragrep.rlp_01.version>4.0.1</teragrep.rlp_01.version>
5859
<teragrep.rlp_03.version>9.0.0</teragrep.rlp_03.version>
5960
</properties>
6061
<dependencies>
62+
<dependency>
63+
<groupId>com.teragrep</groupId>
64+
<artifactId>pth_15</artifactId>
65+
<version>${teragrep.pth_15.version}</version>
66+
</dependency>
6167
<!--DPL-dependencies -->
6268
<dependency>
6369
<groupId>com.teragrep</groupId>

src/main/java/com/teragrep/pth_10/ast/DPLParserCatalystVisitor.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@
6969
import javax.xml.parsers.DocumentBuilderFactory;
7070
import javax.xml.parsers.ParserConfigurationException;
7171
import java.util.*;
72+
import java.util.function.BiConsumer;
7273
import java.util.function.Consumer;
7374

7475
/**
@@ -124,7 +125,7 @@ public List<String> getTraceBuffer() {
124125
}
125126

126127
@Deprecated
127-
public Consumer<Dataset<Row>> getConsumer() {
128+
public BiConsumer<Dataset<Row>, Boolean> getConsumer() {
128129
return this.stepList.getBatchHandler();
129130
}
130131

@@ -133,7 +134,7 @@ public Consumer<Dataset<Row>> getConsumer() {
133134
*
134135
* @param consumer Consumer with type Dataset to be implemented in pth_07
135136
*/
136-
public void setConsumer(Consumer<Dataset<Row>> consumer) {
137+
public void setConsumer(BiConsumer<Dataset<Row>, Boolean> consumer) {
137138
this.stepList.setBatchHandler(consumer);
138139
}
139140

src/main/java/com/teragrep/pth_10/ast/StepList.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@
6565
import java.util.HashMap;
6666
import java.util.List;
6767
import java.util.Map;
68-
import java.util.function.Consumer;
68+
import java.util.function.BiConsumer;
6969

7070
public class StepList implements VoidFunction2<Dataset<Row>, Long> {
7171

@@ -81,15 +81,15 @@ private enum BreakpointType {
8181
private boolean ignoreDefaultSorting = false;
8282

8383
private OutputMode outputMode = OutputMode.Append();
84-
private Consumer<Dataset<Row>> batchHandler = null; // for UI
84+
private BiConsumer<Dataset<Row>, Boolean> batchHandler = null; // for UI
8585
private BatchCollect batchCollect; // standard batchCollect, used before sending batch event
8686
private DPLParserCatalystVisitor catVisitor;
8787

8888
public void setBatchCollect(BatchCollect batchCollect) {
8989
this.batchCollect = batchCollect;
9090
}
9191

92-
public void setBatchHandler(Consumer<Dataset<Row>> batchHandler) {
92+
public void setBatchHandler(BiConsumer<Dataset<Row>, Boolean> batchHandler) {
9393
this.batchHandler = batchHandler;
9494
}
9595

@@ -98,7 +98,7 @@ public BatchCollect batchCollect() {
9898
}
9999

100100
@Deprecated
101-
public Consumer<Dataset<Row>> getBatchHandler() {
101+
public BiConsumer<Dataset<Row>, Boolean> getBatchHandler() {
102102
return batchHandler;
103103
}
104104

@@ -299,14 +299,15 @@ else if (step.hasProperty(AbstractStep.CommandProperty.POST_BATCHCOLLECT)) {
299299
* @param id ID of the processed batch dataset
300300
*/
301301
private void sendBatchEvent(Dataset<Row> ds, Long id) {
302+
boolean aggregatesUsed = aggregateCount > 0;
302303
if (this.batchHandler != null) {
303304
if (outputMode == OutputMode.Complete()) {
304305
LOGGER.info("------------------ Aggregates (Complete Mode) used, sending batch event!");
305-
this.batchHandler.accept(ds);
306+
this.batchHandler.accept(ds, aggregatesUsed);
306307
}
307308
else if (this.batchCollect == null) {
308309
LOGGER.info("------------------ No batchCollect present (no sorting column), sending batch event!");
309-
this.batchHandler.accept(ds);
310+
this.batchHandler.accept(ds, aggregatesUsed);
310311
}
311312
else {
312313
LOGGER.info("------------------ Aggregates NOT USED (before seq. switch), using batchCollect!");
@@ -315,7 +316,7 @@ else if (this.batchCollect == null) {
315316
index = breakpoints.get(BreakpointType.POST_BC);
316317
}
317318
this.batchCollect.collect(ds, id, this.list.subList(index, this.list.size()), false);
318-
this.batchHandler.accept(batchCollect.getCollectedAsDataframe());
319+
this.batchHandler.accept(batchCollect.getCollectedAsDataframe(), aggregatesUsed);
319320
}
320321
}
321322
}
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
/*
2+
* Teragrep Data Processing Language (DPL) translator for Apache Spark (pth_10)
3+
* Copyright (C) 2019-2025 Suomen Kanuuna Oy
4+
*
5+
* This program is free software: you can redistribute it and/or modify
6+
* it under the terms of the GNU Affero General Public License as published by
7+
* the Free Software Foundation, either version 3 of the License, or
8+
* (at your option) any later version.
9+
*
10+
* This program is distributed in the hope that it will be useful,
11+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
12+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13+
* GNU Affero General Public License for more details.
14+
*
15+
* You should have received a copy of the GNU Affero General Public License
16+
* along with this program. If not, see <https://www.gnu.org/licenses/>.
17+
*
18+
*
19+
* Additional permission under GNU Affero General Public License version 3
20+
* section 7
21+
*
22+
* If you modify this Program, or any covered work, by linking or combining it
23+
* with other code, such other code is not for that reason alone subject to any
24+
* of the requirements of the GNU Affero GPL version 3 as long as this Program
25+
* is the same Program as licensed from Suomen Kanuuna Oy without any additional
26+
* modifications.
27+
*
28+
* Supplemented terms under GNU Affero General Public License version 3
29+
* section 7
30+
*
31+
* Origin of the software must be attributed to Suomen Kanuuna Oy. Any modified
32+
* versions must be marked as "Modified version of" The Program.
33+
*
34+
* Names of the licensors and authors may not be used for publicity purposes.
35+
*
36+
* No rights are granted for use of trade names, trademarks, or service marks
37+
* which are in The Program if any.
38+
*
39+
* Licensee must indemnify licensors and authors for any liability that these
40+
* contractual assumptions impose on licensors and authors.
41+
*
42+
* To the extent this program is licensed as part of the Commercial versions of
43+
* Teragrep, the applicable Commercial License may apply to this file if you as
44+
* a licensee so wish it.
45+
*/
46+
package com.teragrep.pth_10.executor;
47+
48+
import com.teragrep.pth_03.shaded.org.antlr.v4.runtime.BaseErrorListener;
49+
import com.teragrep.pth_03.shaded.org.antlr.v4.runtime.RecognitionException;
50+
import com.teragrep.pth_03.shaded.org.antlr.v4.runtime.Recognizer;
51+
import org.slf4j.Logger;
52+
import org.slf4j.LoggerFactory;
53+
54+
/**
55+
* Used to listen for errors in DPL queries. Overrides empty functions from BaseErrorListener.
56+
*/
57+
public final class DPLErrorListenerImpl extends BaseErrorListener {
58+
59+
private static final Logger LOGGER = LoggerFactory.getLogger(DPLErrorListenerImpl.class);
60+
61+
private final String listenedTo;
62+
63+
public DPLErrorListenerImpl(String listenedTo) {
64+
this.listenedTo = listenedTo;
65+
}
66+
67+
@Override
68+
public void syntaxError(
69+
Recognizer<?, ?> recognizer,
70+
Object offendingSymbol,
71+
int line,
72+
int charPositionInLine,
73+
String msg,
74+
RecognitionException e
75+
) {
76+
if (e == null) {
77+
LOGGER.error("Got an exception from <{}>, no message", listenedTo);
78+
}
79+
else {
80+
LOGGER.error("Got an exception from <{}>: <[{}]>", listenedTo, e.getMessage(), e);
81+
}
82+
throw new IllegalStateException(
83+
listenedTo + " failure on line " + line + ", column " + charPositionInLine + " due to " + msg + "\n"
84+
+ "Please check that the query is written correctly. Otherwise, please report this error and include the query used and this error."
85+
);
86+
}
87+
}

0 commit comments

Comments
 (0)