From ffc1c916e461f9d6c44aab9de9a27bcce2f2485e Mon Sep 17 00:00:00 2001 From: elliVM <47@teragrep.com> Date: Wed, 14 May 2025 15:46:42 +0300 Subject: [PATCH 1/3] replace custom UDF with spark from_unixtime for ctime transformation --- .../pth10/steps/convert/ConvertStep.java | 9 +++------ .../pth10/ConvertTransformationTest.java | 16 ++++++++-------- 2 files changed, 11 insertions(+), 14 deletions(-) diff --git a/src/main/java/com/teragrep/pth10/steps/convert/ConvertStep.java b/src/main/java/com/teragrep/pth10/steps/convert/ConvertStep.java index df1a42af2..b3a39dfa6 100644 --- a/src/main/java/com/teragrep/pth10/steps/convert/ConvertStep.java +++ b/src/main/java/com/teragrep/pth10/steps/convert/ConvertStep.java @@ -231,12 +231,9 @@ private Dataset mktime(Dataset dataset, String field, String renameFie * @return Input dataset with added result column */ private Dataset ctime(Dataset dataset, String field, String renameField) { - UserDefinedFunction ctimeUDF = functions.udf(new Ctime(), DataTypes.StringType); - sparkSession.udf().register("UDF_Ctime", ctimeUDF); - - Column udfResult = functions - .callUDF("UDF_Ctime", functions.col(field).cast(DataTypes.StringType), functions.lit(timeformat)); - return dataset.withColumn(renameField == null ? field : renameField, udfResult); + final String iso8601Format = "yyyy-MM-dd'T'HH:mm:ssX"; + final Column result = functions.from_unixtime(functions.col(field), iso8601Format); + return dataset.withColumn(renameField == null ? field : renameField, result); } /** diff --git a/src/test/java/com/teragrep/pth10/ConvertTransformationTest.java b/src/test/java/com/teragrep/pth10/ConvertTransformationTest.java index 598478097..7bab3854d 100644 --- a/src/test/java/com/teragrep/pth10/ConvertTransformationTest.java +++ b/src/test/java/com/teragrep/pth10/ConvertTransformationTest.java @@ -108,7 +108,8 @@ void testConvertCtimeAs() { new StructField("new", DataTypes.StringType, true, new MetadataBuilder().build()) }); Assertions.assertEquals(expectedSchema, ds.schema()); - + // match yyyy-MM-dd'T'HH:mm:ssZ ISO 8601 with zone + Pattern iso8601pattern = Pattern.compile("^\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}Z$"); List listOfResults = ds .select("new") .collectAsList() @@ -116,10 +117,8 @@ void testConvertCtimeAs() { .map(r -> r.getAs(0).toString()) .collect(Collectors.toList()); for (String s : listOfResults) { - // match 00/00/0000 00:00:00 - Matcher m = Pattern.compile("\\d{2}/\\d{2}/\\d{4} \\d{2}:\\d{2}:\\d{2}").matcher(s); - - Assertions.assertTrue(m.find()); + Matcher matcher = iso8601pattern.matcher(s); + Assertions.assertTrue(matcher.find()); } }); } @@ -152,9 +151,10 @@ void testConvertCtime() { .collect(Collectors.toList()); List expectedResults = Arrays .asList( - "01/01/1970 00:00:11", "01/01/1970 00:00:11", "01/01/1970 00:00:10", "01/01/1970 00:00:09", - "01/01/1970 00:00:08", "01/01/1970 00:00:07", "01/01/1970 00:00:06", "01/01/1970 00:00:05", - "01/01/1970 00:00:04", "01/01/1970 00:00:03", "01/01/1970 00:00:02", "01/01/1970 00:00:01" + "1970-01-01T00:00:11Z", "1970-01-01T00:00:11Z", "1970-01-01T00:00:10Z", + "1970-01-01T00:00:09Z", "1970-01-01T00:00:08Z", "1970-01-01T00:00:07Z", + "1970-01-01T00:00:06Z", "1970-01-01T00:00:05Z", "1970-01-01T00:00:04Z", + "1970-01-01T00:00:03Z", "1970-01-01T00:00:02Z", "1970-01-01T00:00:01Z" ); for (int i = 0; i < listOfResults.size(); i++) { From ce8e1e0a87d83b3b025f86d9c69e87a28908ea57 Mon Sep 17 00:00:00 2001 From: elliVM <47@teragrep.com> Date: Wed, 14 May 2025 16:12:18 +0300 Subject: [PATCH 2/3] assert result list sizes in test --- src/test/java/com/teragrep/pth10/ConvertTransformationTest.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/test/java/com/teragrep/pth10/ConvertTransformationTest.java b/src/test/java/com/teragrep/pth10/ConvertTransformationTest.java index 7bab3854d..73e648af2 100644 --- a/src/test/java/com/teragrep/pth10/ConvertTransformationTest.java +++ b/src/test/java/com/teragrep/pth10/ConvertTransformationTest.java @@ -116,6 +116,7 @@ void testConvertCtimeAs() { .stream() .map(r -> r.getAs(0).toString()) .collect(Collectors.toList()); + Assertions.assertEquals(12, listOfResults.size()); for (String s : listOfResults) { Matcher matcher = iso8601pattern.matcher(s); Assertions.assertTrue(matcher.find()); @@ -149,6 +150,7 @@ void testConvertCtime() { .stream() .map(r -> r.getAs(0).toString()) .collect(Collectors.toList()); + Assertions.assertEquals(12, listOfResults.size()); List expectedResults = Arrays .asList( "1970-01-01T00:00:11Z", "1970-01-01T00:00:11Z", "1970-01-01T00:00:10Z", From 3d3728a44324bd637daebb8d3a4923aa671e7fe6 Mon Sep 17 00:00:00 2001 From: elliVM <47@teragrep.com> Date: Thu, 12 Jun 2025 11:11:56 +0300 Subject: [PATCH 3/3] make CTime have a final time format class member field, support different time format, default to ISO 8601 --- .../transformstatement/convert/Ctime.java | 41 ++++++++++++------- .../steps/convert/AbstractConvertStep.java | 2 +- .../pth10/steps/convert/ConvertStep.java | 8 ++-- 3 files changed, 32 insertions(+), 19 deletions(-) diff --git a/src/main/java/com/teragrep/pth10/ast/commands/transformstatement/convert/Ctime.java b/src/main/java/com/teragrep/pth10/ast/commands/transformstatement/convert/Ctime.java index 5e4ce8029..88bf002b5 100644 --- a/src/main/java/com/teragrep/pth10/ast/commands/transformstatement/convert/Ctime.java +++ b/src/main/java/com/teragrep/pth10/ast/commands/transformstatement/convert/Ctime.java @@ -45,12 +45,12 @@ */ package com.teragrep.pth10.ast.commands.transformstatement.convert; -import com.teragrep.pth10.ast.DPLTimeFormat; -import org.apache.spark.sql.api.java.UDF2; +import org.apache.spark.sql.api.java.UDF1; -import java.text.DateFormat; -import java.util.Date; -import java.util.TimeZone; +import java.time.Instant; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; +import java.time.format.DateTimeParseException; /** * UDF for convert command 'ctime'
@@ -58,20 +58,31 @@ * * @author eemhu */ -public class Ctime implements UDF2 { +public class Ctime implements UDF1 { private static final long serialVersionUID = 1L; + private final DateTimeFormatter formatter; - @Override - public String call(String epoch, String tf) throws Exception { - Long e = Long.valueOf(epoch); - - Date date = new Date(e * 1000L); - DateFormat format = new DPLTimeFormat(tf).createSimpleDateFormat(); - format.setTimeZone(TimeZone.getTimeZone("Etc/UTC")); - String formatted = format.format(date); + public Ctime(final String format) { + this(DateTimeFormatter.ofPattern(format).withZone(ZoneId.of("UTC"))); + } - return formatted; + public Ctime(final DateTimeFormatter formatter) { + this.formatter = formatter; } + @Override + public String call(String epoch) throws Exception { + String result; + try { + Instant instant = Instant.from(formatter.parse(epoch)); + result = instant.toString(); + } + catch (DateTimeParseException e) { + throw new IllegalArgumentException( + "Could not parse value <" + epoch + "> with set time formatter <" + formatter + ">" + ); + } + return result; + } } diff --git a/src/main/java/com/teragrep/pth10/steps/convert/AbstractConvertStep.java b/src/main/java/com/teragrep/pth10/steps/convert/AbstractConvertStep.java index e4dca5a8c..7dc217588 100644 --- a/src/main/java/com/teragrep/pth10/steps/convert/AbstractConvertStep.java +++ b/src/main/java/com/teragrep/pth10/steps/convert/AbstractConvertStep.java @@ -54,7 +54,7 @@ public abstract class AbstractConvertStep extends AbstractStep { protected List listOfCommands = new ArrayList<>(); protected List listOfFieldsToOmit = new ArrayList<>(); - protected String timeformat = "%m/%d/%Y %H:%M:%S"; + protected String timeformat = "yyyy-MM-dd'T'HH:mm:ssX"; public AbstractConvertStep() { super(); diff --git a/src/main/java/com/teragrep/pth10/steps/convert/ConvertStep.java b/src/main/java/com/teragrep/pth10/steps/convert/ConvertStep.java index b3a39dfa6..41d82c151 100644 --- a/src/main/java/com/teragrep/pth10/steps/convert/ConvertStep.java +++ b/src/main/java/com/teragrep/pth10/steps/convert/ConvertStep.java @@ -231,9 +231,11 @@ private Dataset mktime(Dataset dataset, String field, String renameFie * @return Input dataset with added result column */ private Dataset ctime(Dataset dataset, String field, String renameField) { - final String iso8601Format = "yyyy-MM-dd'T'HH:mm:ssX"; - final Column result = functions.from_unixtime(functions.col(field), iso8601Format); - return dataset.withColumn(renameField == null ? field : renameField, result); + final Ctime ctime = new Ctime(timeformat); + final UserDefinedFunction cTimeUDF = functions.udf(ctime, DataTypes.StringType); + sparkSession.udf().register("UDF_Ctime", cTimeUDF); + final Column udfResult = functions.callUDF("UDF_Ctime", functions.col(field).cast(DataTypes.StringType)); + return dataset.withColumn(renameField == null ? field : renameField, udfResult); } /**