diff --git a/build.sbt b/build.sbt index 671da8bc..f950fa31 100644 --- a/build.sbt +++ b/build.sbt @@ -73,8 +73,8 @@ lazy val core = (project in file("core")) // regular scala code with @native met javaOptions ++= Seq("-Xms512M", "-Xmx2048M", "-Djna.nosys=true"), Test / javaOptions ++= specialOptions, // 2.4.5 is the highest version we have with the old spark-testing-base deps - sparkVersion := System.getProperty("sparkVersion", "3.3.0"), - sparkTestingVersion := "1.4.0", + sparkVersion := System.getProperty("sparkVersion", "3.5.0"), + sparkTestingVersion := "1.4.7", // additional libraries libraryDependencies ++= Seq( "org.apache.spark" %% "spark-core" % sparkVersion.value, diff --git a/core/src/main/scala/com/high-performance-spark-examples/mllib/GoldilocksMLlib.scala b/core/src/main/scala/com/high-performance-spark-examples/mllib/GoldilocksMLlib.scala index cde64c75..f57b4694 100644 --- a/core/src/main/scala/com/high-performance-spark-examples/mllib/GoldilocksMLlib.scala +++ b/core/src/main/scala/com/high-performance-spark-examples/mllib/GoldilocksMLlib.scala @@ -14,7 +14,6 @@ import org.apache.spark.mllib.linalg.{Vector => SparkVector} import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.rdd.RDD -import com.github.fommil.netlib.BLAS.{getInstance => blas} import com.highperformancespark.examples.dataframe._ //end::imports[] @@ -97,42 +96,6 @@ object GoldilocksMLlib { } //end::trainScaler[] - //tag::word2vecSimple[] - def word2vec(sc: SparkContext, rdd: RDD[String]): RDD[SparkVector] = { - // Tokenize our data - val tokenized = rdd.map(_.split(" ").toIterable) - // Construct our word2vec model - val wv = new Word2Vec() - val wvm = wv.fit(tokenized) - val wvmb = sc.broadcast(wvm) - // WVM can now transform single words - println(wvm.transform("panda")) - // Vector size is 100 - we use this to build a transformer on top of WVM that - // works on sentences. - val vectorSize = 100 - // The transform function works on a per-word basis, but we have - // sentences as input. - tokenized.map{words => - // If there is nothing in the sentence output a null vector - if (words.isEmpty) { - Vectors.sparse(vectorSize, Array.empty[Int], Array.empty[Double]) - } else { - // If there are sentences construct a running sum of the - // vectors for each word - val sum = Array[Double](vectorSize) - words.foreach { word => - blas.daxpy( - vectorSize, 1.0, wvmb.value.transform(word).toArray, 1, sum, 1) - } - // Then scale it by the number of words - blas.dscal(sum.length, 1.0 / words.size, sum, 1) - // And wrap it in a Spark vector - Vectors.dense(sum) - } - } - } - //end::word2vecSimple[] - //tag::hashingTFPreserve[] def toVectorPerserving(rdd: RDD[RawPanda]): RDD[(RawPanda, SparkVector)] = { val ht = new HashingTF() diff --git a/env_setup.sh b/env_setup.sh index a79213e3..34fa4279 100644 --- a/env_setup.sh +++ b/env_setup.sh @@ -3,12 +3,12 @@ # Download Spark and iceberg if not present SPARK_MAJOR=${SPARK_MAJOR:-"3.4"} -SPARK_VERSION=${SPARK_VERSION:-"3.4.1"} +SPARK_VERSION=${SPARK_VERSION:-"3.5.0"} SCALA_VERSION=${SCALA_VERSION:-"2.12"} HADOOP_VERSION="3" SPARK_PATH="spark-${SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}" SPARK_FILE="spark-${SPARK_VERSION}-bin-hadoop3.tgz" -ICEBERG_VERSION=${ICEBERG_VERSION:-"1.3.1"} +ICEBERG_VERSION=${ICEBERG_VERSION:-"1.4.0"} if [ ! -f "${SPARK_FILE}" ]; then wget "https://dlcdn.apache.org/spark/spark-${SPARK_VERSION}/${SPARK_FILE}" & fi @@ -26,6 +26,8 @@ fi export SPARK_HOME="${SPARK_PATH}" if [ ! -f "${SPARK_PATH}/jars/${ICEBERG_FILE}" ]; then + # Delete the old JAR first. + rm "${SPARK_PATH}/jars/iceberg-spark-runtime*.jar" || echo "No old version to delete." cp "${ICEBERG_FILE}" "${SPARK_PATH}/jars/${ICEBERG_FILE}" fi