Skip to content

Commit c73cc91

Browse files
authored
Issue 695: Syslog stream returns message dataset instead of full dataset (#707)
* add CustomDataset object & test, set bloom create and update to return status dataset instead of the whole dataset. * apply spotless * add isStreaming(boolean) option for CustomDataset. * separate makeRowsList and makeRowsSeq into Rows object & tests. * change Bloom filter result to contain _time column and also switch result column to _raw. * remove null sparksession from CustomDatasetTest.testEqualsContract * remove double call to collectAsList() in CustomDatasetTest.testCreateStreamingDataset * add CustomDataset interface, rename CustomDataset -> CustomDatasetImpl, remove isStreaming ctor arg, this is now handled by CustomResultStep object instead. Result dataset is now created in a separate CustomResultStep, which is added after bloom create and update. * removed unused variables from TeragrepBloomStep * change SyslogStreamer to use ForEachFunction instead of MapFunction, use Sequential mode, add custom result * apply spotless * fix TeragrepTest failing * remove possibly unnecessary DataStreamWriter from CustomDatasetImpl
1 parent f27efda commit c73cc91

4 files changed

Lines changed: 34 additions & 14 deletions

File tree

src/main/java/com/teragrep/pth_10/ast/commands/transformstatement/TeragrepTransformation.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -278,7 +278,16 @@ public Node visitT_syslogModeParameter(DPLParser.T_syslogModeParameterContext ct
278278
}
279279
}
280280

281-
return new StepNode(new TeragrepSyslogStep(host, port));
281+
final CustomResultStep customResultStep = new CustomResultStep(
282+
new CustomDatasetImpl(new StructType(new StructField[] {
283+
StructField.apply("_time", DataTypes.TimestampType, false, new MetadataBuilder().build()),
284+
StructField.apply("_raw", DataTypes.StringType, false, new MetadataBuilder().build())
285+
}), Collections.singletonList(new Object[] {
286+
Instant.now(), "Syslog stream in progress..."
287+
}), catCtx)
288+
);
289+
290+
return new StepListNode(Arrays.asList(new TeragrepSyslogStep(host, port), customResultStep));
282291
}
283292

284293
// exec hdfs save path retention

src/main/java/com/teragrep/pth_10/ast/commands/transformstatement/teragrep/SyslogStreamer.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@
5151
import com.cloudbees.syslog.SyslogMessage;
5252
import com.teragrep.rlp_01.RelpBatch;
5353
import com.teragrep.rlp_01.RelpConnection;
54-
import org.apache.spark.api.java.function.MapFunction;
54+
import org.apache.spark.api.java.function.ForeachFunction;
5555
import org.apache.spark.sql.Row;
5656
import org.apache.spark.sql.types.StructField;
5757
import org.slf4j.Logger;
@@ -68,7 +68,7 @@
6868
* Streams the given dataset (using the map function of a dataset) as syslog messages via the RELP protocol.<br>
6969
* Provide the RELP server's hostname/ip and port using the constructor.
7070
*/
71-
public class SyslogStreamer implements MapFunction<Row, Row>, Serializable {
71+
public class SyslogStreamer implements ForeachFunction<Row>, Serializable {
7272

7373
private static final Logger LOGGER = LoggerFactory.getLogger(SyslogStreamer.class);
7474
private transient RelpConnection sender;
@@ -235,10 +235,9 @@ private void stop() {
235235
* <br>
236236
*
237237
* @param row Input row to send as syslog
238-
* @return the given input row unchanged
239238
* @throws Exception If any failure is encountered during the call
240239
*/
241-
public Row call(Row row) throws Exception {
240+
public void call(Row row) throws Exception {
242241
if (!initialized) {
243242
this.start();
244243
initialized = true;
@@ -289,8 +288,6 @@ else if (field.name().equals("_raw")) {
289288

290289
// send to server
291290
this.append(syslogMessage);
292-
293-
return row;
294291
}
295292

296293
/**

src/main/java/com/teragrep/pth_10/steps/teragrep/TeragrepSyslogStep.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,14 +67,16 @@ public TeragrepSyslogStep(String relpHost, int relpPort) {
6767
this.relpHost = relpHost;
6868
this.relpPort = relpPort;
6969
this.properties.add(CommandProperty.NO_PRECEDING_AGGREGATE);
70+
this.properties.add(CommandProperty.SEQUENTIAL_ONLY);
7071
}
7172

7273
@Override
7374
public Dataset<Row> get(Dataset<Row> dataset) {
7475
LOGGER.info("Calling sendDataframeAsSyslog function");
7576
final SyslogStreamer syslogStreamer = new SyslogStreamer(relpHost, relpPort);
7677

77-
return dataset.map(syslogStreamer, dataset.exprEnc());
78+
dataset.foreach(syslogStreamer);
79+
return dataset;
7880
}
7981

8082
@Override

src/test/java/com/teragrep/pth_10/translationTests/TeragrepTest.java

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
import com.teragrep.pth_10.ast.bo.StepListNode;
5252
import com.teragrep.pth_10.ast.bo.StepNode;
5353
import com.teragrep.pth_10.ast.commands.transformstatement.TeragrepTransformation;
54+
import com.teragrep.pth_10.steps.CustomResultStep;
5455
import com.teragrep.pth_10.steps.teragrep.*;
5556
import com.teragrep.pth_03.antlr.DPLLexer;
5657
import com.teragrep.pth_03.antlr.DPLParser;
@@ -67,6 +68,7 @@
6768
import org.slf4j.LoggerFactory;
6869

6970
import java.util.HashMap;
71+
import java.util.List;
7072

7173
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
7274
public class TeragrepTest {
@@ -86,15 +88,19 @@ void testTeragrepTranslation() {
8688
final DPLParserCatalystVisitor visitor = new DPLParserCatalystVisitor(ctx);
8789

8890
final TeragrepTransformation ct = new TeragrepTransformation(ctx, visitor);
89-
StepNode stepNode = (StepNode) ct
91+
final StepListNode stepNode = (StepListNode) ct
9092
.visitTeragrepTransformation((DPLParser.TeragrepTransformationContext) tree.getChild(1).getChild(0));
91-
AbstractStep step = stepNode.get();
93+
final List<AbstractStep> steps = stepNode.asList();
9294

95+
final AbstractStep step = steps.get(0);
9396
Assertions.assertEquals(TeragrepSyslogStep.class, step.getClass());
9497
TeragrepSyslogStep syslogStep = (TeragrepSyslogStep) step;
9598

9699
Assertions.assertEquals("127.0.0.123", syslogStep.relpHost);
97100
Assertions.assertEquals(1337, syslogStep.relpPort);
101+
102+
final AbstractStep step2 = steps.get(1);
103+
Assertions.assertEquals(CustomResultStep.class, step2.getClass());
98104
}
99105

100106
@Test
@@ -110,15 +116,18 @@ void testTeragrepDefaultParamsTranslation() {
110116
final DPLParserCatalystVisitor visitor = new DPLParserCatalystVisitor(ctx);
111117

112118
final TeragrepTransformation ct = new TeragrepTransformation(ctx, visitor);
113-
StepNode stepNode = (StepNode) ct
119+
final StepListNode stepNode = (StepListNode) ct
114120
.visitTeragrepTransformation((DPLParser.TeragrepTransformationContext) tree.getChild(1).getChild(0));
115-
final AbstractStep step = stepNode.get();
121+
final List<AbstractStep> steps = stepNode.asList();
122+
final AbstractStep step = steps.get(0);
116123

117124
Assertions.assertEquals(TeragrepSyslogStep.class, step.getClass());
118125
final TeragrepSyslogStep syslogStep = (TeragrepSyslogStep) step;
119126

120127
Assertions.assertEquals("127.0.0.1", syslogStep.relpHost);
121128
Assertions.assertEquals(601, syslogStep.relpPort);
129+
final AbstractStep step2 = steps.get(1);
130+
Assertions.assertEquals(CustomResultStep.class, step2.getClass());
122131
}
123132

124133
@Test
@@ -144,15 +153,18 @@ void testTeragrepSyslogConfigTranslation() {
144153
final DPLParserCatalystVisitor visitor = new DPLParserCatalystVisitor(ctx);
145154

146155
final TeragrepTransformation ct = new TeragrepTransformation(ctx, visitor);
147-
StepNode stepNode = (StepNode) ct
156+
final StepListNode stepNode = (StepListNode) ct
148157
.visitTeragrepTransformation((DPLParser.TeragrepTransformationContext) tree.getChild(1).getChild(0));
149-
final AbstractStep step = stepNode.get();
158+
final List<AbstractStep> steps = stepNode.asList();
159+
final AbstractStep step = steps.get(0);
150160

151161
Assertions.assertEquals(TeragrepSyslogStep.class, step.getClass());
152162
final TeragrepSyslogStep syslogStep = (TeragrepSyslogStep) step;
153163

154164
Assertions.assertEquals(host, syslogStep.relpHost);
155165
Assertions.assertEquals(port, syslogStep.relpPort);
166+
final AbstractStep step2 = steps.get(1);
167+
Assertions.assertEquals(CustomResultStep.class, step2.getClass());
156168
}
157169

158170
@Test

0 commit comments

Comments
 (0)