forked from marshackVB/databricks_feature_store
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpassenger_demographic_features.py
63 lines (50 loc) · 2.09 KB
/
passenger_demographic_features.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
import pyspark.sql.functions as func
from databricks.feature_store import FeatureStoreClient, feature_table
from pyspark.sql.functions import col
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("appName").getOrCreate()
fs = FeatureStoreClient()
def compute_passenger_demographic_features(df):
# Extract prefix from name, such as Mr. Mrs., etc.
return (
df.withColumn("NamePrefix", func.regexp_extract(col("Name"), "([A-Za-z]+)\.", 1))
# Extract a secondary name in the Name column if one exists
.withColumn(
"NameSecondary_extract",
func.regexp_extract(col("Name"), "\(([A-Za-z ]+)\)", 1),
)
# Create a feature indicating if a secondary name is present in the Name column
.selectExpr(
"*",
"case when length(NameSecondary_extract) > 0 then NameSecondary_extract else NULL end as NameSecondary",
)
.drop("NameSecondary_extract")
.selectExpr(
"PassengerId",
"Name",
"Sex",
"case when Age = 'NaN' then NULL else Age end as Age",
"SibSp",
"NamePrefix",
"NameSecondary",
"case when NameSecondary is not NULL then '1' else '0' end as NameMultiple",
)
)
df = spark.table("hive_metastore.robkisk.passenger_demographic_base")
passenger_demographic_features = compute_passenger_demographic_features(df)
# passenger_demographic_features.show(10, False)
# display(passenger_demographic_features)
feature_table_name = "hive_metastore.robkisk.passenger_demographic_features"
# If the feature table has already been created, no need to recreate
try:
fs.get_table(feature_table_name)
print("Feature table entry already exists")
pass
except Exception:
fs.create_table(
name=feature_table_name,
primary_keys="PassengerId",
schema=passenger_demographic_features.schema,
description="Demographic-related features for Titanic passengers",
)
fs.write_table(name=feature_table_name, df=passenger_demographic_features, mode="merge")