Skip to content

Commit

Permalink
Merge branch 'fix_nanos_conversion' into tox-refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
tnixon committed May 14, 2024
2 parents 4d02b80 + b5a9a0c commit 8a7eb5e
Showing 1 changed file with 43 additions and 2 deletions.
45 changes: 43 additions & 2 deletions python/tempo/tsdf.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,11 @@ def __init__(
# Timestamp string matching then do some pattern matching to extract
# the time stamp.
if isinstance(df.schema[ts_col].dataType, StringType): # pragma: no cover
sample_ts = df.limit(1).collect()[0][0]
sample_ts = df.select(ts_col).limit(1).collect()[0][0]
self.__validate_ts_string(sample_ts)
self.df = self.__add_double_ts().withColumnRenamed("double_ts", self.ts_col)
self.df = self.__add_double_ts()\
.drop(self.ts_col)\
.withColumnRenamed("double_ts", self.ts_col)

"""
Make sure DF is ordered by its respective ts_col and partition columns.
Expand All @@ -77,6 +79,45 @@ def __init__(
# Helper functions
#

@staticmethod
def parse_nanos_timestamp(df: DataFrame,
str_ts_col: str,
ts_fmt: str = "yyyy-MM-dd HH:mm:ss",
double_ts_col: Optional[str] = None,
parsed_ts_col: Optional[str] = None) -> DataFrame:
"""
Parse a string timestamp column with nanosecond precision into a double timestamp column.
:param df: DataFrame containing the string timestamp column
:param str_ts_col: Name of the string timestamp column
:param ts_fmt: Format of the string timestamp column (default: "yyyy-MM-dd HH:mm:ss")
:param double_ts_col: Name of the double timestamp column to create, if None
the source string column will be overwritten
:param parsed_ts_col: Name of the parsed timestamp column to create, if None
no parsed timestamp column will be kept
:return: DataFrame with the double timestamp column
"""

# add a parsed timestamp column if requested
src_df = df.withColumn(parsed_ts_col,
sfn.to_timestamp(sfn.col(str_ts_col), ts_fmt)) \
if parsed_ts_col else df

return (
src_df.withColumn("nanos",
sfn.when(sfn.col(str_ts_col).contains("."),
sfn.concat(sfn.lit("0."),
sfn.split(sfn.col(str_ts_col),
r"\.")[1])
).otherwise(0).cast("double"))
.withColumn("long_ts",
sfn.unix_timestamp(str_ts_col, ts_fmt))
.withColumn((double_ts_col or str_ts_col),
sfn.col("long_ts") + sfn.col("nanos")))



def __add_double_ts(self) -> DataFrame:
"""Add a double (epoch) version of the string timestamp out to nanos"""
return (
Expand Down

0 comments on commit 8a7eb5e

Please sign in to comment.