Skip to content

Commit

Permalink
Update to b4 build with segments (groupBy) and withTimeColumn support…
Browse files Browse the repository at this point in the history
… for RankingMetrics in scala example
  • Loading branch information
jamie256 committed May 2, 2024
1 parent b113652 commit f3749f0
Showing 1 changed file with 36 additions and 13 deletions.
49 changes: 36 additions & 13 deletions scala/src/main/scala/WhylogsScalaRankingMetricsExample.scala
Original file line number Diff line number Diff line change
@@ -1,7 +1,14 @@
// Tested on Databricks cluster running as scala notebook:
// * cluster version: 13.3 (includes Apache Spark 3.4.1, Scala 2.12)
// * installed whylogs jar: https://oss.sonatype.org/service/local/repositories/snapshots/content/ai/whylabs/whylogs-spark-bundle_3.1.1-scala_2.12/0.2.0-b2-SNAPSHOT/whylogs-spark-bundle_3.1.1-scala_2.12-0.2.0-b2-20240429.142812-1-all.jar

// * installed whylogs jar: https://oss.sonatype.org/service/local/repositories/snapshots/content/ai/whylabs/whylogs-spark-bundle_3.1.1-scala_2.12/0.2.0-b4-SNAPSHOT/whylogs-spark-bundle_3.1.1-scala_2.12-0.2.0-b4-20240502.212838-1-all.jar
/* Maven module
<dependency>
<groupId>ai.whylabs</groupId>
<artifactId>whylogs-spark-bundle_3.1.1-scala_2.12</artifactId>
<version>0.2.0-b4-SNAPSHOT</version>
<classifier>all</classifier>
</dependency>
*/

import java.time.LocalDateTime
import java.time.ZonedDateTime
Expand All @@ -22,7 +29,12 @@ def unixTimestampForNumberOfDaysAgo(numDaysAgo: Int): ZonedDateTime = {
zdt
}

val timestamp_two_days_ago = unixTimestampForNumberOfDaysAgo(2)

// create a few days so we get different profiles for a daily model using timeColumn
val t1 = unixEpochTimeForNumberOfDaysAgo(1)
val t2 = unixEpochTimeForNumberOfDaysAgo(2)
val t3 = unixEpochTimeForNumberOfDaysAgo(3)
val timeColumn = "dataset_timestamp"



Expand All @@ -37,24 +49,36 @@ val spark = SparkSession
.config("spark.ui.enabled", "false")
.getOrCreate()

// Using some examples from the Apache support for RankingMetrics:
// Adapting examples from the Apache support for RankingMetrics:
// See https://spark.apache.org/docs/2.3.0/mllib-evaluation-metrics.html#ranking-systems
val predictionAndLabelsRDD = spark.sparkContext.parallelize(
val predictionAndLabelsAndeScoresWithGroupsRDD = spark.sparkContext.parallelize(
Seq(
(Array(1, 6, 2, 7, 8, 3, 9, 10, 4, 5), Array(1, 2, 3, 4, 5)),
(Array(4, 1, 5, 6, 2, 7, 3, 8, 9, 10), Array(1, 2, 3)),
(Array(1, 2, 3, 4, 5), Array(0, 0, 0, 0, 0))),
2)
("g1", Array(1, 6, 2, 7, 8, 3, 9, 10, 4, 5), Array(1, 2, 3, 4, 5)),
("g1", Array(4, 1, 5, 6, 2, 7, 3, 8, 9, 10), Array(1, 2, 3)),
("g1", Array(1, 2, 3, 4, 5), null), // test out missing elements for labels
("g3", Array(1, 6, 2, 7, 8, 3, 9, 10, 4, 5), Array(1, 2, 3, 4, 5)),
("g2", Array(4, 1, 5, 6, 2, 7, 3, 8, 9, 10), Array(1, 2, 3)),
("g2", Array(4, 1, 5, 6, 2, 7, 3, 8, 9, 10), Array(1, 2, 3)),
("g2", Array(1, 2, 6, 4, 3), null),
("g2", Array(1, 2, 3, 4, 5), Array(0, 0, 0, 0, 0)),
("g3", Array(1, 6, 2, 7, 8, 3, 9, 10, 4, 5), Array(1, 2, 3, 4, 5))),
6)

// Now we have an example DataFrame with columns for the predictions and targets
val df = predictionAndLabelsRDD.toDF("predictions", "targets")
// We'll copy it a few times for different timestamps and then combine them into a single df to mimic backfill data
val df1 = predictionAndLabelsAndeScoresWithGroupsRDD.toDF("groups", "predictions", "labels").withColumn(timeColumn, lit(t1).cast(DataTypes.TimestampType))
val df2 = predictionAndLabelsAndeScoresWithGroupsRDD.toDF("groups", "predictions", "labels").withColumn(timeColumn, lit(t2).cast(DataTypes.TimestampType))
val df3 = predictionAndLabelsAndeScoresWithGroupsRDD.toDF("groups", "predictions", "labels").withColumn(timeColumn, lit(t3).cast(DataTypes.TimestampType))
val df = df1.union(df2).union(df3)
df.printSchema()

// Next we create a profiling session to compute RankingMetrics
// This must be a stand alone profiling session that does not compute
// other whylogs metrics. The default of k is 10 if not specified
val session = df.newProfilingSession("RankingMetricsTest") // start a new WhyLogs profiling job
.withRankingMetrics(predictionField="predictions", targetField="targets", k=Some(2))
.withTimeColumn(timeColumn) // profiles generated for each unique time
.withRankingMetrics(predictionField="predictions", targetField="labels", k=2)
.groupBy("groups")

// COMMAND ----------

Expand All @@ -63,8 +87,7 @@ val session = df.newProfilingSession("RankingMetricsTest") // start a new WhyLog
// also the settings page allows you t create new apiKeys which you will need an apiKey to upload to your account in Whylabs
// The modelId below specifies which model this profile is for, by default an initial model-1 is created but you will update this
// if you create a new model here https://hub.whylabsapp.com/settings/model-management
// Note: if you don't specify the timestamp current local time is used as a default.
session.logRankingMetrics(timestamp=timestamp_two_days_ago.toInstant,
session.logRankingMetrics(
orgId = "replace-with-org-id",
modelId = "replace-with-model-id",
apiKey = "replace-with-api-key")

0 comments on commit f3749f0

Please sign in to comment.