4747
4848import com .teragrep .functions .dpf_02 .AbstractStep ;
4949import com .teragrep .pth10 .ast .NullValue ;
50- import org .apache .spark .api .java .function .MapFunction ;
5150import org .apache .spark .sql .*;
52- import org .apache .spark .sql .catalyst .encoders .RowEncoder ;
53- import org .apache .spark .sql .streaming .GroupState ;
54- import org .apache .spark .sql .streaming .GroupStateTimeout ;
55- import org .apache .spark .sql .streaming .OutputMode ;
56- import org .apache .spark .sql .types .DataTypes ;
57- import org .apache .spark .sql .types .StructField ;
5851import org .slf4j .Logger ;
5952import org .slf4j .LoggerFactory ;
6053
@@ -81,7 +74,7 @@ public DedupStep(
8174 NullValue nullValue ,
8275 boolean completeOutputMode
8376 ) {
84- this .properties .add (AbstractStep .CommandProperty .POST_BATCHCOLLECT );
77+ // this.properties.add(AbstractStep.CommandProperty.POST_BATCHCOLLECT);
8578
8679 this .listOfFields = listOfFields ;
8780 this .maxDuplicates = maxDuplicates ;
@@ -94,87 +87,7 @@ public DedupStep(
9487
9588 @ Override
9689 public Dataset <Row > get (Dataset <Row > dataset ) {
97-
98- final List <String > dedupHashFields = new ArrayList <>();
99- for (final String field : listOfFields ) {
100- final String dedupHashField = "dedupHash-" + field ;
101- dataset = dataset
102- .withColumn (dedupHashField , functions .sha2 (functions .col (field ).cast (DataTypes .BinaryType ), 256 ));
103- dedupHashFields .add (dedupHashField );
104- }
105-
106- KeyValueGroupedDataset <String , Row > groupedDs = dataset .groupByKey ((MapFunction <Row , String >) (r ) -> {
107- final StringBuilder groupId = new StringBuilder ();
108- for (final String hashField : dedupHashFields ) {
109- groupId .append (r .getString (r .fieldIndex (hashField )));
110- }
111-
112- return groupId .toString ();
113- }, Encoders .STRING ());
114-
115- Dataset <Row > rv = groupedDs
116- .flatMapGroupsWithState (
117- this ::flatMapGroupsWithStateFunc , OutputMode
118- .Append (),
119- Encoders .javaSerialization (DedupState .class ), RowEncoder .apply (dataset .schema ()), GroupStateTimeout .NoTimeout ()
120- );
121-
122- return rv .drop (dedupHashFields .toArray (new String [0 ]));
123- }
124-
125- private Iterator <Row > flatMapGroupsWithStateFunc (
126- final String group ,
127- final Iterator <Row > events ,
128- final GroupState <DedupState > state
129- ) {
130- final DedupState ds ;
131- if (state .exists ()) {
132- ds = state .get ();
133- }
134- else {
135- ds = new DedupState ();
136- }
137-
138- List <Row > rv = new ArrayList <>();
139- events .forEachRemaining (event -> {
140- ds .accumulate (group );
141-
142- boolean dropFullRow = false ;
143-
144- if (!keepEmpty ) {
145- for (int i = 0 ; i < event .length (); i ++) {
146- final StructField field = event .schema ().fields ()[i ];
147- if (listOfFields .contains (field .name ())) {
148- final Object fieldValue = event .get (i );
149- if (fieldValue == nullValue .value ()) {
150- // drop row, one of the fields is null
151- dropFullRow = true ;
152- break ;
153- }
154- }
155- }
156- }
157-
158- if (!dropFullRow && ds .amountOf (group ) <= maxDuplicates ) {
159- rv .add (event );
160- }
161- else if (!dropFullRow && keepEvents ) {
162- Object [] newRow = new Object [event .length ()];
163- for (int i = 0 ; i < event .length (); i ++) {
164- final StructField field = event .schema ().fields ()[i ];
165- if (listOfFields .contains (field .name ())) {
166- newRow [i ] = nullValue .value ();
167- }
168- else {
169- newRow [i ] = event .get (i );
170- }
171- }
172- rv .add (RowFactory .create (newRow ));
173- }
174- });
175-
176- state .update (ds );
177- return rv .iterator ();
90+ return dataset .withWatermark ("_time" , "1 hour" ).dropDuplicates (listOfFields .toArray (new String [0 ]));
17891 }
17992
18093 public List <String > getListOfFields () {
0 commit comments