Skip to content

Commit 7173a15

Browse files
authored
Epoch migration mode (#274)
* add epoch fetching from s3 WIP * implement epochMigrationMode option logic * limit to single next() call on epoch migration mode, WIP one off in batches * move first event only logic to RowConverter, clean up get() method, cleanup unused objects * make new objects and object ctor values final * new objects became irrelevant without more responsibilities, removed * remove unwanted logger message * refactor next() to have single point of return * add testing with data containing multiple messages per s3 objects * assert epoch migration mode epoch results for all rows * restore task package * create separate EpochMigrationRowConverter for migration process, add JSON metadata payload for _raw column * add unit tests for new objects + cleanup and better naming * add equals and hashcode for new objects and testing * add path extracted value to metadata and to _time column when s3 object is non-syslog, use new mock objects * return ZonedDateTime instead of epoch in the path-extracted value * implement mock support for non-syslog format s3 data, add non-syslog format test for epoch migration testing * fix EpochMigrationTest.testNonSyslogEvent assertions * cleanup code, add equals and hahscode for PathExtractedTimestamp * use timestampJson variable in EventMetadataTest instead of collecting it from the main JSON * add isSyslog to comparator for implementing objects * separate syslog and non syslog asJSON methods for EventMetadata, rename timestamp original to rfc5242timestamp * fix next() boolean value logic in EpochMigrationRowConverter, clean up get() object instantation into correct if-else blocks * use a static final boolean for isSyslog value in MockDBRowImpl and MockDBNonSyslogRowImpl * add missing equals, hashCode and toString methods to MockDBNonSyslogRowImpl * EventMetadata refacotring: better naming and remove unnecessary method variables for methods, remove unwanted JSON values * set isSyslog from a private constructor * add EpochMigrationRowConverterTest * add path-extracted-percision JSON value to EventMetadata * clean up boolean logic in next() method in EpochMigrationRowConverter, remove IOException from catch * clean up MockDBRowImpl equals and hashCode methods * clean up MockDBRowImpl equals and hashCode methods, remove empty event test from EpochMigrationRowConverterTest
1 parent 801bd74 commit 7173a15

29 files changed

Lines changed: 2806 additions & 452 deletions

pom.xml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -222,6 +222,18 @@
222222
<version>2.1.0</version>
223223
<scope>compile</scope>
224224
</dependency>
225+
<!-- json jakarta for Java 8-->
226+
<!-- https://mvnrepository.com/artifact/jakarta.json/jakarta.json-api -->
227+
<dependency>
228+
<groupId>jakarta.json</groupId>
229+
<artifactId>jakarta.json-api</artifactId>
230+
<version>2.1.3</version>
231+
</dependency>
232+
<dependency>
233+
<groupId>org.eclipse.parsson</groupId>
234+
<artifactId>parsson</artifactId>
235+
<version>1.1.7</version>
236+
</dependency>
225237
<!-- metrics -->
226238
<dependency>
227239
<groupId>io.dropwizard.metrics</groupId>

src/main/java/com/teragrep/pth_06/ArchiveMicroStreamReader.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -264,7 +264,8 @@ public InputPartition[] planInputPartitions(Offset start, Offset end) {
264264
config.auditConfig.reason,
265265
config.auditConfig.user,
266266
config.auditConfig.pluginClassName,
267-
config.archiveConfig.skipNonRFC5424Files
267+
config.archiveConfig.skipNonRFC5424Files,
268+
config.archiveConfig.epochMigrationMode
268269
)
269270
);
270271
}

src/main/java/com/teragrep/pth_06/config/ArchiveConfig.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ public final class ArchiveConfig {
6161
public final String s3Credential;
6262

6363
public final boolean skipNonRFC5424Files;
64+
public final boolean epochMigrationMode;
6465

6566
public final long archiveIncludeBeforeEpoch;
6667

@@ -89,6 +90,7 @@ public ArchiveConfig(Map<String, String> opts) {
8990

9091
// skip not rfc5424 parseable files
9192
skipNonRFC5424Files = opts.getOrDefault("skipNonRFC5424Files", "false").equalsIgnoreCase("true");
93+
epochMigrationMode = opts.getOrDefault("epochMigrationMode", "false").equalsIgnoreCase("true");
9294

9395
archiveIncludeBeforeEpoch = Long
9496
.parseLong(opts.getOrDefault("archive.includeBeforeEpoch", String.valueOf(Long.MAX_VALUE)));
@@ -113,6 +115,7 @@ public ArchiveConfig() {
113115
bloomDbName = "";
114116

115117
skipNonRFC5424Files = false;
118+
epochMigrationMode = false;
116119

117120
archiveIncludeBeforeEpoch = 0L;
118121

src/main/java/com/teragrep/pth_06/task/ArchiveMicroBatchInputPartition.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ public class ArchiveMicroBatchInputPartition implements InputPartition {
7676
public final String TeragrepAuditPluginClassName;
7777

7878
public final boolean skipNonRFC5424Files;
79+
public final boolean epochMigrationMode;
7980

8081
public ArchiveMicroBatchInputPartition(
8182
String S3endPoint,
@@ -86,7 +87,8 @@ public ArchiveMicroBatchInputPartition(
8687
String TeragrepAuditReason,
8788
String TeragrepAuditUser,
8889
String TeragrepAuditPluginClassName,
89-
boolean skipNonRFC5424Files
90+
boolean skipNonRFC5424Files,
91+
boolean epochMigrationMode
9092
) {
9193
LOGGER.debug("ArchiveMicroBatchInputPartition> init");
9294

@@ -102,6 +104,7 @@ public ArchiveMicroBatchInputPartition(
102104
this.TeragrepAuditPluginClassName = TeragrepAuditPluginClassName;
103105

104106
this.skipNonRFC5424Files = skipNonRFC5424Files;
107+
this.epochMigrationMode = epochMigrationMode;
105108
}
106109

107110
@Override

src/main/java/com/teragrep/pth_06/task/ArchiveMicroBatchInputPartitionReader.java

Lines changed: 37 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,9 @@
4949
import com.codahale.metrics.MetricRegistry;
5050
import com.teragrep.pth_06.metrics.TaskMetric;
5151
import com.teragrep.pth_06.ArchiveS3ObjectMetadata;
52+
import com.teragrep.pth_06.task.s3.EpochMigrationRowConverter;
5253
import com.teragrep.pth_06.task.s3.Pth06S3Client;
54+
import com.teragrep.pth_06.task.s3.RowConverterImpl;
5355
import com.teragrep.pth_06.task.s3.RowConverter;
5456
import com.teragrep.rad_01.AuditPlugin;
5557
import com.teragrep.rad_01.AuditPluginFactory;
@@ -88,6 +90,7 @@ class ArchiveMicroBatchInputPartitionReader implements PartitionReader<InternalR
8890
private final AmazonS3 s3client;
8991

9092
private final boolean skipNonRFC5424Files;
93+
private final boolean epochMigrationMode;
9194
private final MetricRegistry metricRegistry;
9295

9396
public ArchiveMicroBatchInputPartitionReader(
@@ -100,7 +103,8 @@ public ArchiveMicroBatchInputPartitionReader(
100103
String TeragrepAuditReason,
101104
String TeragrepAuditUser,
102105
String TeragrepAuditPluginClassName,
103-
boolean skipNonRFC5424Files
106+
boolean skipNonRFC5424Files,
107+
boolean epochMigrationMode
104108
) {
105109
this.taskObjectList = taskObjectList;
106110

@@ -124,6 +128,7 @@ public ArchiveMicroBatchInputPartitionReader(
124128
}
125129

126130
this.skipNonRFC5424Files = skipNonRFC5424Files;
131+
this.epochMigrationMode = epochMigrationMode;
127132
this.metricRegistry = metricRegistry;
128133
}
129134

@@ -137,17 +142,7 @@ public boolean next() throws IOException {
137142
// loop until all objects are consumed
138143
if (rowConverter == null) {
139144
// initial run
140-
rowConverter = new RowConverter(
141-
auditPlugin,
142-
s3client,
143-
taskObjectList.getFirst().id,
144-
taskObjectList.getFirst().bucket,
145-
taskObjectList.getFirst().path,
146-
taskObjectList.getFirst().directory,
147-
taskObjectList.getFirst().stream,
148-
taskObjectList.getFirst().host,
149-
skipNonRFC5424Files
150-
);
145+
rowConverter = selectedRowConverter();
151146

152147
metricRegistry.counter("ArchiveCompressedBytesProcessed").inc(taskObjectList.getFirst().compressedSize);
153148
metricRegistry.counter("BytesProcessed").inc(taskObjectList.getFirst().uncompressedSize);
@@ -172,17 +167,7 @@ public boolean next() throws IOException {
172167

173168
if (!taskObjectList.isEmpty()) {
174169
// new object still available
175-
rowConverter = new RowConverter(
176-
auditPlugin,
177-
s3client,
178-
taskObjectList.getFirst().id,
179-
taskObjectList.getFirst().bucket,
180-
taskObjectList.getFirst().path,
181-
taskObjectList.getFirst().directory,
182-
taskObjectList.getFirst().stream,
183-
taskObjectList.getFirst().host,
184-
skipNonRFC5424Files
185-
);
170+
rowConverter = selectedRowConverter();
186171
metricRegistry
187172
.counter("ArchiveCompressedBytesProcessed")
188173
.inc(taskObjectList.getFirst().compressedSize);
@@ -222,6 +207,35 @@ public CustomTaskMetric[] currentMetricsValues() {
222207
};
223208
}
224209

210+
private RowConverter selectedRowConverter() {
211+
final RowConverter selectedConverter;
212+
if (epochMigrationMode) {
213+
selectedConverter = new EpochMigrationRowConverter(
214+
s3client,
215+
taskObjectList.getFirst().id,
216+
taskObjectList.getFirst().bucket,
217+
taskObjectList.getFirst().path,
218+
taskObjectList.getFirst().directory,
219+
taskObjectList.getFirst().stream,
220+
taskObjectList.getFirst().host
221+
);
222+
}
223+
else {
224+
selectedConverter = new RowConverterImpl(
225+
auditPlugin,
226+
s3client,
227+
taskObjectList.getFirst().id,
228+
taskObjectList.getFirst().bucket,
229+
taskObjectList.getFirst().path,
230+
taskObjectList.getFirst().directory,
231+
taskObjectList.getFirst().stream,
232+
taskObjectList.getFirst().host,
233+
skipNonRFC5424Files
234+
);
235+
}
236+
return selectedConverter;
237+
}
238+
225239
@Override
226240
public void close() throws IOException {
227241
if (LOGGER.isDebugEnabled()) {

src/main/java/com/teragrep/pth_06/task/TeragrepPartitionReaderFactory.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,8 @@ else if (inputPartition instanceof ArchiveMicroBatchInputPartition) {
101101
aip.TeragrepAuditReason,
102102
aip.TeragrepAuditUser,
103103
aip.TeragrepAuditPluginClassName,
104-
aip.skipNonRFC5424Files
104+
aip.skipNonRFC5424Files,
105+
aip.epochMigrationMode
105106
);
106107
}
107108
else if (inputPartition instanceof KafkaMicroBatchInputPartition) {
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
/*
2+
* Teragrep Archive Datasource (pth_06)
3+
* Copyright (C) 2021-2024 Suomen Kanuuna Oy
4+
*
5+
* This program is free software: you can redistribute it and/or modify
6+
* it under the terms of the GNU Affero General Public License as published by
7+
* the Free Software Foundation, either version 3 of the License, or
8+
* (at your option) any later version.
9+
*
10+
* This program is distributed in the hope that it will be useful,
11+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
12+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13+
* GNU Affero General Public License for more details.
14+
*
15+
* You should have received a copy of the GNU Affero General Public License
16+
* along with this program. If not, see <https://www.gnu.org/licenses/>.
17+
*
18+
*
19+
* Additional permission under GNU Affero General Public License version 3
20+
* section 7
21+
*
22+
* If you modify this Program, or any covered work, by linking or combining it
23+
* with other code, such other code is not for that reason alone subject to any
24+
* of the requirements of the GNU Affero GPL version 3 as long as this Program
25+
* is the same Program as licensed from Suomen Kanuuna Oy without any additional
26+
* modifications.
27+
*
28+
* Supplemented terms under GNU Affero General Public License version 3
29+
* section 7
30+
*
31+
* Origin of the software must be attributed to Suomen Kanuuna Oy. Any modified
32+
* versions must be marked as "Modified version of" The Program.
33+
*
34+
* Names of the licensors and authors may not be used for publicity purposes.
35+
*
36+
* No rights are granted for use of trade names, trademarks, or service marks
37+
* which are in The Program if any.
38+
*
39+
* Licensee must indemnify licensors and authors for any liability that these
40+
* contractual assumptions impose on licensors and authors.
41+
*
42+
* To the extent this program is licensed as part of the Commercial versions of
43+
* Teragrep, the applicable Commercial License may apply to this file if you as
44+
* a licensee so wish it.
45+
*/
46+
package com.teragrep.pth_06.task.s3;
47+
48+
import com.teragrep.rlo_06.RFC5424Timestamp;
49+
50+
import java.time.Instant;
51+
import java.util.Objects;
52+
53+
public final class EpochMicros {
54+
55+
private final Instant instant;
56+
private final String source;
57+
private final long microsPerSecond;
58+
private final long nanosPerMicro;
59+
60+
public EpochMicros(final RFC5424Timestamp rfc5424Timestamp) {
61+
this(rfc5424Timestamp.toZonedDateTime().toInstant(), "syslog");
62+
}
63+
64+
public EpochMicros(final String path) {
65+
this(new PathExtractedTimestamp(path));
66+
}
67+
68+
public EpochMicros(final PathExtractedTimestamp pathExtractedTimestamp) {
69+
this(pathExtractedTimestamp.toZonedDateTime().toInstant(), "object-path");
70+
}
71+
72+
public EpochMicros(final Instant instant, final String source) {
73+
this(instant, source, 1000L * 1000L, 1000L);
74+
}
75+
76+
private EpochMicros(final Instant instant, final String source, long microsPerSecond, long nanosPerMicro) {
77+
this.instant = instant;
78+
this.source = source;
79+
this.microsPerSecond = microsPerSecond;
80+
this.nanosPerMicro = nanosPerMicro;
81+
}
82+
83+
long asLong() {
84+
final long sec = Math.multiplyExact(instant.getEpochSecond(), microsPerSecond);
85+
return Math.addExact(sec, instant.getNano() / nanosPerMicro);
86+
}
87+
88+
String source() {
89+
return source;
90+
}
91+
92+
@Override
93+
public boolean equals(final Object object) {
94+
if (object == null) {
95+
return false;
96+
}
97+
if (getClass() != object.getClass()) {
98+
return false;
99+
}
100+
final EpochMicros that = (EpochMicros) object;
101+
return microsPerSecond == that.microsPerSecond && nanosPerMicro == that.nanosPerMicro
102+
&& Objects.equals(instant, that.instant) && Objects.equals(source, that.source);
103+
}
104+
105+
@Override
106+
public int hashCode() {
107+
return Objects.hash(instant, source, microsPerSecond, nanosPerMicro);
108+
}
109+
}

0 commit comments

Comments
 (0)