From b5a9a0cd590aeb105d32917e790106547b8cd350 Mon Sep 17 00:00:00 2001 From: Tristan Nixon Date: Tue, 14 May 2024 16:20:04 -0700 Subject: [PATCH] checkpoint commit of some updates to the code that converts nano-second precision timestamps --- python/tempo/tsdf.py | 45 ++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 43 insertions(+), 2 deletions(-) diff --git a/python/tempo/tsdf.py b/python/tempo/tsdf.py index 8ae1d44f..d471ce1b 100644 --- a/python/tempo/tsdf.py +++ b/python/tempo/tsdf.py @@ -65,9 +65,11 @@ def __init__( # Timestamp string matching then do some pattern matching to extract # the time stamp. if isinstance(df.schema[ts_col].dataType, StringType): # pragma: no cover - sample_ts = df.limit(1).collect()[0][0] + sample_ts = df.select(ts_col).limit(1).collect()[0][0] self.__validate_ts_string(sample_ts) - self.df = self.__add_double_ts().withColumnRenamed("double_ts", self.ts_col) + self.df = self.__add_double_ts()\ + .drop(self.ts_col)\ + .withColumnRenamed("double_ts", self.ts_col) """ Make sure DF is ordered by its respective ts_col and partition columns. @@ -77,6 +79,45 @@ def __init__( # Helper functions # + @staticmethod + def parse_nanos_timestamp(df: DataFrame, + str_ts_col: str, + ts_fmt: str = "yyyy-MM-dd HH:mm:ss", + double_ts_col: Optional[str] = None, + parsed_ts_col: Optional[str] = None) -> DataFrame: + """ + Parse a string timestamp column with nanosecond precision into a double timestamp column. + + :param df: DataFrame containing the string timestamp column + :param str_ts_col: Name of the string timestamp column + :param ts_fmt: Format of the string timestamp column (default: "yyyy-MM-dd HH:mm:ss") + :param double_ts_col: Name of the double timestamp column to create, if None + the source string column will be overwritten + :param parsed_ts_col: Name of the parsed timestamp column to create, if None + no parsed timestamp column will be kept + + :return: DataFrame with the double timestamp column + """ + + # add a parsed timestamp column if requested + src_df = df.withColumn(parsed_ts_col, + sfn.to_timestamp(sfn.col(str_ts_col), ts_fmt)) \ + if parsed_ts_col else df + + return ( + src_df.withColumn("nanos", + sfn.when(sfn.col(str_ts_col).contains("."), + sfn.concat(sfn.lit("0."), + sfn.split(sfn.col(str_ts_col), + r"\.")[1]) + ).otherwise(0).cast("double")) + .withColumn("long_ts", + sfn.unix_timestamp(str_ts_col, ts_fmt)) + .withColumn((double_ts_col or str_ts_col), + sfn.col("long_ts") + sfn.col("nanos"))) + + + def __add_double_ts(self) -> DataFrame: """Add a double (epoch) version of the string timestamp out to nanos""" return (