Skip to content

Commit e2db236

Browse files
authored
add Consumers to interface for query started/stopped/progress and ini… (#3)
* add Consumers to interface for query started/stopped/progress and initial logging. * fix typo in DPLExecutorTestImpl * remove metrics() from DPLExecutorResult
1 parent e8e40b3 commit e2db236

3 files changed

Lines changed: 15 additions & 11 deletions

File tree

src/main/java/com/teragrep/pth_15/DPLExecutor.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import org.apache.spark.sql.Dataset;
4949
import org.apache.spark.sql.Row;
5050
import org.apache.spark.sql.SparkSession;
51+
import org.apache.spark.sql.streaming.StreamingQuery;
5152
import org.apache.spark.sql.streaming.StreamingQueryListener;
5253

5354
import java.util.concurrent.TimeoutException;
@@ -58,7 +59,10 @@ public interface DPLExecutor {
5859

5960
public DPLExecutorResult interpret(
6061
BiConsumer<Dataset<Row>, Boolean> batchHandler,
61-
Consumer<StreamingQueryListener.QueryProgressEvent> queryProgressConsumer,
62+
BiConsumer<StreamingQuery, StreamingQueryListener.QueryStartedEvent> queryStartedConsumer,
63+
BiConsumer<StreamingQuery, StreamingQueryListener.QueryProgressEvent> queryProgressConsumer,
64+
BiConsumer<StreamingQuery, StreamingQueryListener.QueryTerminatedEvent> queryTerminatedConsumer,
65+
Consumer<String> initialLogConsumer,
6266
SparkSession sparkSession,
6367
String noteId,
6468
String paragraphId,

src/main/java/com/teragrep/pth_15/DPLExecutorResult.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,6 @@
4545
*/
4646
package com.teragrep.pth_15;
4747

48-
import java.util.Map;
49-
5048
public interface DPLExecutorResult {
5149

5250
public enum Code {
@@ -56,6 +54,4 @@ public enum Code {
5654
public abstract Code code();
5755

5856
public abstract String message();
59-
60-
public abstract Map<String, String> metrics();
6157
}

src/test/java/com/teragrep/pth_15/DPLExecutorTestImpl.java

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import org.apache.spark.sql.Dataset;
5050
import org.apache.spark.sql.Row;
5151
import org.apache.spark.sql.SparkSession;
52+
import org.apache.spark.sql.streaming.StreamingQuery;
5253
import org.apache.spark.sql.streaming.StreamingQueryListener;
5354

5455
import java.util.concurrent.TimeoutException;
@@ -62,12 +63,15 @@ public DPLExecutorTestImpl(Config ignored) {
6263

6364
@Override
6465
public DPLExecutorResult interpret(
65-
final BiConsumer<Dataset<Row>, Boolean> batchHandler,
66-
final Consumer<StreamingQueryListener.QueryProgressEvent> queryProgressConsumer,
67-
final SparkSession sparkSession,
68-
final String noteId,
69-
final String paragraphId,
70-
final String lines
66+
BiConsumer<Dataset<Row>, Boolean> batchHandler,
67+
BiConsumer<StreamingQuery, StreamingQueryListener.QueryStartedEvent> queryStartedConsumer,
68+
BiConsumer<StreamingQuery, StreamingQueryListener.QueryProgressEvent> queryProgressConsumer,
69+
BiConsumer<StreamingQuery, StreamingQueryListener.QueryTerminatedEvent> queryTerminatedConsumer,
70+
Consumer<String> initialLogConsumer,
71+
SparkSession sparkSession,
72+
String noteId,
73+
String paragraphId,
74+
String lines
7175
) throws TimeoutException {
7276
throw new UnsupportedOperationException("Not supported.");
7377
}

0 commit comments

Comments
 (0)