diff --git a/src/main/java/com/teragrep/pth10/ast/commands/transformstatement/convert/Ctime.java b/src/main/java/com/teragrep/pth10/ast/commands/transformstatement/convert/Ctime.java index 5e4ce8029..88bf002b5 100644 --- a/src/main/java/com/teragrep/pth10/ast/commands/transformstatement/convert/Ctime.java +++ b/src/main/java/com/teragrep/pth10/ast/commands/transformstatement/convert/Ctime.java @@ -45,12 +45,12 @@ */ package com.teragrep.pth10.ast.commands.transformstatement.convert; -import com.teragrep.pth10.ast.DPLTimeFormat; -import org.apache.spark.sql.api.java.UDF2; +import org.apache.spark.sql.api.java.UDF1; -import java.text.DateFormat; -import java.util.Date; -import java.util.TimeZone; +import java.time.Instant; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; +import java.time.format.DateTimeParseException; /** * UDF for convert command 'ctime'
@@ -58,20 +58,31 @@ * * @author eemhu */ -public class Ctime implements UDF2 { +public class Ctime implements UDF1 { private static final long serialVersionUID = 1L; + private final DateTimeFormatter formatter; - @Override - public String call(String epoch, String tf) throws Exception { - Long e = Long.valueOf(epoch); - - Date date = new Date(e * 1000L); - DateFormat format = new DPLTimeFormat(tf).createSimpleDateFormat(); - format.setTimeZone(TimeZone.getTimeZone("Etc/UTC")); - String formatted = format.format(date); + public Ctime(final String format) { + this(DateTimeFormatter.ofPattern(format).withZone(ZoneId.of("UTC"))); + } - return formatted; + public Ctime(final DateTimeFormatter formatter) { + this.formatter = formatter; } + @Override + public String call(String epoch) throws Exception { + String result; + try { + Instant instant = Instant.from(formatter.parse(epoch)); + result = instant.toString(); + } + catch (DateTimeParseException e) { + throw new IllegalArgumentException( + "Could not parse value <" + epoch + "> with set time formatter <" + formatter + ">" + ); + } + return result; + } } diff --git a/src/main/java/com/teragrep/pth10/steps/convert/AbstractConvertStep.java b/src/main/java/com/teragrep/pth10/steps/convert/AbstractConvertStep.java index e4dca5a8c..7dc217588 100644 --- a/src/main/java/com/teragrep/pth10/steps/convert/AbstractConvertStep.java +++ b/src/main/java/com/teragrep/pth10/steps/convert/AbstractConvertStep.java @@ -54,7 +54,7 @@ public abstract class AbstractConvertStep extends AbstractStep { protected List listOfCommands = new ArrayList<>(); protected List listOfFieldsToOmit = new ArrayList<>(); - protected String timeformat = "%m/%d/%Y %H:%M:%S"; + protected String timeformat = "yyyy-MM-dd'T'HH:mm:ssX"; public AbstractConvertStep() { super(); diff --git a/src/main/java/com/teragrep/pth10/steps/convert/ConvertStep.java b/src/main/java/com/teragrep/pth10/steps/convert/ConvertStep.java index df1a42af2..41d82c151 100644 --- a/src/main/java/com/teragrep/pth10/steps/convert/ConvertStep.java +++ b/src/main/java/com/teragrep/pth10/steps/convert/ConvertStep.java @@ -231,11 +231,10 @@ private Dataset mktime(Dataset dataset, String field, String renameFie * @return Input dataset with added result column */ private Dataset ctime(Dataset dataset, String field, String renameField) { - UserDefinedFunction ctimeUDF = functions.udf(new Ctime(), DataTypes.StringType); - sparkSession.udf().register("UDF_Ctime", ctimeUDF); - - Column udfResult = functions - .callUDF("UDF_Ctime", functions.col(field).cast(DataTypes.StringType), functions.lit(timeformat)); + final Ctime ctime = new Ctime(timeformat); + final UserDefinedFunction cTimeUDF = functions.udf(ctime, DataTypes.StringType); + sparkSession.udf().register("UDF_Ctime", cTimeUDF); + final Column udfResult = functions.callUDF("UDF_Ctime", functions.col(field).cast(DataTypes.StringType)); return dataset.withColumn(renameField == null ? field : renameField, udfResult); } diff --git a/src/test/java/com/teragrep/pth10/ConvertTransformationTest.java b/src/test/java/com/teragrep/pth10/ConvertTransformationTest.java index 598478097..73e648af2 100644 --- a/src/test/java/com/teragrep/pth10/ConvertTransformationTest.java +++ b/src/test/java/com/teragrep/pth10/ConvertTransformationTest.java @@ -108,18 +108,18 @@ void testConvertCtimeAs() { new StructField("new", DataTypes.StringType, true, new MetadataBuilder().build()) }); Assertions.assertEquals(expectedSchema, ds.schema()); - + // match yyyy-MM-dd'T'HH:mm:ssZ ISO 8601 with zone + Pattern iso8601pattern = Pattern.compile("^\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}Z$"); List listOfResults = ds .select("new") .collectAsList() .stream() .map(r -> r.getAs(0).toString()) .collect(Collectors.toList()); + Assertions.assertEquals(12, listOfResults.size()); for (String s : listOfResults) { - // match 00/00/0000 00:00:00 - Matcher m = Pattern.compile("\\d{2}/\\d{2}/\\d{4} \\d{2}:\\d{2}:\\d{2}").matcher(s); - - Assertions.assertTrue(m.find()); + Matcher matcher = iso8601pattern.matcher(s); + Assertions.assertTrue(matcher.find()); } }); } @@ -150,11 +150,13 @@ void testConvertCtime() { .stream() .map(r -> r.getAs(0).toString()) .collect(Collectors.toList()); + Assertions.assertEquals(12, listOfResults.size()); List expectedResults = Arrays .asList( - "01/01/1970 00:00:11", "01/01/1970 00:00:11", "01/01/1970 00:00:10", "01/01/1970 00:00:09", - "01/01/1970 00:00:08", "01/01/1970 00:00:07", "01/01/1970 00:00:06", "01/01/1970 00:00:05", - "01/01/1970 00:00:04", "01/01/1970 00:00:03", "01/01/1970 00:00:02", "01/01/1970 00:00:01" + "1970-01-01T00:00:11Z", "1970-01-01T00:00:11Z", "1970-01-01T00:00:10Z", + "1970-01-01T00:00:09Z", "1970-01-01T00:00:08Z", "1970-01-01T00:00:07Z", + "1970-01-01T00:00:06Z", "1970-01-01T00:00:05Z", "1970-01-01T00:00:04Z", + "1970-01-01T00:00:03Z", "1970-01-01T00:00:02Z", "1970-01-01T00:00:01Z" ); for (int i = 0; i < listOfResults.size(); i++) {