4545 */
4646package com .teragrep .pth10 .ast ;
4747
48+ import com .teragrep .functions .dpf_02 .AbstractStep ;
4849import com .teragrep .functions .dpf_02 .BatchCollect ;
49- import com .teragrep .pth10 .steps .AbstractStep ;
5050import org .apache .spark .api .java .function .VoidFunction2 ;
5151import org .apache .spark .sql .Dataset ;
5252import org .apache .spark .sql .Row ;
6969
7070public class StepList implements VoidFunction2 <Dataset <Row >, Long > {
7171
72+ private enum BreakpointType {
73+ SEQUENTIAL , POST_BC
74+ }
75+
7276 private static final Logger LOGGER = LoggerFactory .getLogger (StepList .class );
7377 private final List <AbstractStep > list ;
74- private int breakpoint = - 1 ;
78+ private final Map < BreakpointType , Integer > breakpoints ;
7579 private int aggregateCount = 0 ;
7680 private boolean useInternalBatchCollect = false ;
7781 private boolean ignoreDefaultSorting = false ;
7882
7983 private OutputMode outputMode = OutputMode .Append ();
8084 private Consumer <Dataset <Row >> batchHandler = null ; // for UI
8185 private BatchCollect batchCollect ; // standard batchCollect, used before sending batch event
82- private BatchCollect sequentialModeBatchCollect ; // used if in append mode and in sequential, to allow aggregates in sequential mode
8386 private DPLParserCatalystVisitor catVisitor ;
8487
8588 public void setBatchCollect (BatchCollect batchCollect ) {
@@ -90,6 +93,10 @@ public void setBatchHandler(Consumer<Dataset<Row>> batchHandler) {
9093 this .batchHandler = batchHandler ;
9194 }
9295
96+ public BatchCollect batchCollect () {
97+ return this .batchCollect ;
98+ }
99+
93100 @ Deprecated
94101 public Consumer <Dataset <Row >> getBatchHandler () {
95102 return batchHandler ;
@@ -102,8 +109,12 @@ public void setCatVisitor(DPLParserCatalystVisitor catVisitor) {
102109 public StepList (DPLParserCatalystVisitor catVisitor ) {
103110 this .list = new ArrayList <>();
104111 this .catVisitor = catVisitor ;
105- this .batchCollect = new BatchCollect ("_time" , catVisitor .getCatalystContext ().getDplRecallSize ());
106- this .sequentialModeBatchCollect = new BatchCollect (null , catVisitor .getCatalystContext ().getDplRecallSize ());
112+ this .batchCollect = new BatchCollect (
113+ "_time" ,
114+ catVisitor .getCatalystContext ().getDplRecallSize (),
115+ catVisitor .getCatalystContext ().postBcLimitSize ()
116+ );
117+ this .breakpoints = new HashMap <>();
107118 }
108119
109120 /**
@@ -180,7 +191,11 @@ public Dataset<Row> executeSubsearch(Dataset<Row> ds) throws StreamingQueryExcep
180191 private DataStreamWriter <Row > executeFromStep (int fromStepIndex , Dataset <Row > ds ) throws StreamingQueryException {
181192 for (int i = fromStepIndex ; i < this .list .size (); i ++) {
182193 AbstractStep step = this .list .get (i );
183- if (i == breakpoint ) {
194+ if (
195+ (breakpoints .containsKey (BreakpointType .SEQUENTIAL ) && breakpoints
196+ .get (BreakpointType .SEQUENTIAL ) == i ) || (breakpoints .containsKey (BreakpointType .POST_BC )
197+ && breakpoints .get (BreakpointType .POST_BC ) == i )
198+ ) {
184199 // Switch to sequential; aka run the step inside forEachBatch
185200 LOGGER .debug ("breakpoint encountered at index <{}>" , i );
186201
@@ -193,12 +208,12 @@ private DataStreamWriter<Row> executeFromStep(int fromStepIndex, Dataset<Row> ds
193208 }
194209
195210 private Dataset <Row > executeInBatch (Dataset <Row > ds ) throws StreamingQueryException {
196- if (breakpoint == - 1 ) { // no sequential ops
211+ if (! breakpoints . containsKey ( BreakpointType . SEQUENTIAL ) ) { // no sequential ops
197212 return ds ;
198213 }
199214
200215 // sequential ops found
201- for (int i = breakpoint ; i < this .list .size (); i ++) {
216+ for (int i = breakpoints . get ( BreakpointType . SEQUENTIAL ) ; i < this .list .size (); i ++) {
202217 AbstractStep step = this .list .get (i );
203218 LOGGER .info ("Executing seq ops in batch: <{}>" , step .toString ());
204219 ds = step .get (ds );
@@ -216,7 +231,7 @@ private void analyze() {
216231
217232 for (int i = 0 ; i < this .list .size (); i ++) {
218233 AbstractStep step = this .list .get (i );
219-
234+ LOGGER . info ( "Analyzing step: <{}>" , step . toString ());
220235 step .setAggregatesUsedBefore (aggregateCount > 0 );
221236
222237 if (step .hasProperty (AbstractStep .CommandProperty .USES_INTERNAL_BATCHCOLLECT )) {
@@ -228,7 +243,11 @@ private void analyze() {
228243 if (step .hasProperty (AbstractStep .CommandProperty .IGNORE_DEFAULT_SORTING )) {
229244 LOGGER .info ("[Analyze] Ignore default sorting: <{}>" , step );
230245 this .ignoreDefaultSorting = true ;
231- this .batchCollect = new BatchCollect (null , catVisitor .getDPLRecallSize ());
246+ this .batchCollect = new BatchCollect (
247+ null ,
248+ catVisitor .getDPLRecallSize (),
249+ catVisitor .getCatalystContext ().postBcLimitSize ()
250+ );
232251 }
233252
234253 if (step .hasProperty (AbstractStep .CommandProperty .REQUIRE_PRECEDING_AGGREGATE )) {
@@ -246,20 +265,29 @@ private void analyze() {
246265 if (step .hasProperty (AbstractStep .CommandProperty .SEQUENTIAL_ONLY )) {
247266 LOGGER .info ("[Analyze] Sequential only command: <{}>" , step );
248267 // set the breakpoint just once
249- if (breakpoint == - 1 ) {
250- breakpoint = i ;
268+ if (! breakpoints . containsKey ( BreakpointType . SEQUENTIAL ) ) {
269+ breakpoints . put ( BreakpointType . SEQUENTIAL , i ) ;
251270 }
252271 }
253272 else if (step .hasProperty (AbstractStep .CommandProperty .AGGREGATE )) {
254273 LOGGER .info ("[Analyze] Aggregate command: <{}>" , step );
255274 aggregateCount ++;
256275
257276 // set the breakpoint just once
258- if (aggregateCount > 0 && breakpoint == -1 ) {
259- breakpoint = i + 1 ;
277+ if (
278+ aggregateCount > 0 && !breakpoints .containsKey (BreakpointType .SEQUENTIAL )
279+ && !breakpoints .containsKey (BreakpointType .POST_BC )
280+ ) {
281+ breakpoints .put (BreakpointType .SEQUENTIAL , i + 1 );
260282 outputMode = OutputMode .Complete ();
261283 }
262284 }
285+ else if (step .hasProperty (AbstractStep .CommandProperty .POST_BATCHCOLLECT )) {
286+ if (!breakpoints .containsKey (BreakpointType .POST_BC )) {
287+ LOGGER .info ("[Analyze] Post batch collect command: <{}>" , step );
288+ breakpoints .put (BreakpointType .POST_BC , i );
289+ }
290+ }
263291 }
264292 }
265293
@@ -282,7 +310,11 @@ else if (this.batchCollect == null) {
282310 }
283311 else {
284312 LOGGER .info ("------------------ Aggregates NOT USED (before seq. switch), using batchCollect!" );
285- this .batchCollect .collect (ds , id );
313+ int index = this .list .size ();
314+ if (breakpoints .containsKey (BreakpointType .POST_BC )) {
315+ index = breakpoints .get (BreakpointType .POST_BC );
316+ }
317+ this .batchCollect .collect (ds , id , this .list .subList (index , this .list .size ()), false );
286318 this .batchHandler .accept (batchCollect .getCollectedAsDataframe ());
287319 }
288320 }
@@ -292,7 +324,10 @@ public void call(Dataset<Row> batchDF, Long batchId) throws StreamingQueryExcept
292324 LOGGER .info ("StepList batch processing received a new batch <{}>" , batchId );
293325
294326 // timechart empty buckets
295- if (catVisitor .getCatalystContext ().getTimeChartSpanSeconds () != null ) {
327+ if (
328+ catVisitor .getCatalystContext ().getTimeChartSpanSeconds () != null
329+ && !breakpoints .containsKey (BreakpointType .POST_BC )
330+ ) {
296331 // create spans
297332 final long min = catVisitor .getCatalystContext ().getDplMinimumEarliest ();
298333 final long max = catVisitor .getCatalystContext ().getDplMaximumLatest ();
@@ -346,11 +381,7 @@ else if (dataType == DataTypes.FloatType) {
346381
347382 // Continue sub list of steps execution, if necessary
348383 if (!this .list .isEmpty ()) {
349- LOGGER
350- .info (
351- "StepList batch processing - Continuing execution to next ops after breakpoint index: <{}>" ,
352- breakpoint
353- );
384+ LOGGER .info ("StepList batch processing - Continuing execution to next ops after breakpoint index" );
354385
355386 Dataset <Row > ret = this .executeInBatch (batchDF );
356387
0 commit comments