diff --git a/build.sbt b/build.sbt index 4c4728e..e13cb2a 100644 --- a/build.sbt +++ b/build.sbt @@ -10,7 +10,7 @@ import sbt.nio.Keys._ lazy val scala212 = "2.12.8" lazy val scala211 = "2.11.12" -lazy val sparkVersion = sys.env.getOrElse("SPARK_VERSION", "3.0.0") +lazy val sparkVersion = sys.env.getOrElse("SPARK_VERSION", "3.0.1") def majorMinorVersion(version: String): String = { StringUtils.ordinalIndexOf(version, ".", 2) match { @@ -158,7 +158,7 @@ lazy val sparkClasspath = taskKey[String]("sparkClasspath") lazy val sparkHome = taskKey[String]("sparkHome") // Publish to Bintray -ThisBuild / description := "An open-source toolkit for large-scale genomic analysis" +ThisBuild / description := "An open-source toolkit for large-scale EHR processing" ThisBuild / homepage := Some(url("https://databricks.com/solutions/industries/healthcare")) ThisBuild / scmInfo := Some( ScmInfo( diff --git a/src/main/scala/com/databricks/labs/smolder/Message.scala b/src/main/scala/com/databricks/labs/smolder/Message.scala index a742d02..122dc19 100644 --- a/src/main/scala/com/databricks/labs/smolder/Message.scala +++ b/src/main/scala/com/databricks/labs/smolder/Message.scala @@ -64,13 +64,16 @@ private[smolder] object Message { * @return Parses the message into a Message case class. */ def apply(text: UTF8String): Message = { + + val delim: Byte = 0x0d + if (text == null) { null } else { val textString = text.toString require(textString.nonEmpty, "Received empty string.") - Message(textString.split('\n').toIterator) + Message(textString.split(delim.toChar).toIterator) } } } diff --git a/src/test/scala/com/databricks/labs/smolder/MessageSuite.scala b/src/test/scala/com/databricks/labs/smolder/MessageSuite.scala index 6b40dd3..676e14d 100644 --- a/src/test/scala/com/databricks/labs/smolder/MessageSuite.scala +++ b/src/test/scala/com/databricks/labs/smolder/MessageSuite.scala @@ -101,8 +101,10 @@ class MessageSuite extends SmolderBaseTest { test("parse a full message, by string") { + val delim: Byte = 0x0d + val file = testFile("single_record.hl7") - val lines = Source.fromFile(file).getLines().mkString("\n") + val lines = Source.fromFile(file).getLines().mkString(delim.toChar.toString) val message = Message(UTF8String.fromString(lines)) diff --git a/src/test/scala/com/databricks/labs/smolder/functionsSuite.scala b/src/test/scala/com/databricks/labs/smolder/functionsSuite.scala index 62763e4..1617903 100644 --- a/src/test/scala/com/databricks/labs/smolder/functionsSuite.scala +++ b/src/test/scala/com/databricks/labs/smolder/functionsSuite.scala @@ -30,7 +30,9 @@ class functionsSuite extends SmolderBaseTest { .wholeTextFiles(file) .map(p => TextFile(p._1, p._2))) - val hl7Df = df.select(parse_hl7_message(df("value")).alias("hl7")) + val cleanDF = df.select(regexp_replace(df("value"), "\n", "\r").alias("clean")) + + val hl7Df = cleanDF.select(parse_hl7_message(cleanDF("clean")).alias("hl7")) assert(hl7Df.count() === 1) assert(hl7Df.selectExpr("explode(hl7.segments)").count() === 3) @@ -59,7 +61,8 @@ class functionsSuite extends SmolderBaseTest { .wholeTextFiles(file) .map(p => TextFile(p._1, p._2))) - val hl7Df = df.select(parse_hl7_message(df("value")).alias("hl7")) + val cleanDF = df.select(regexp_replace(df("value"), "\n", "\r").alias("clean")) + val hl7Df = cleanDF.select(parse_hl7_message(cleanDF("clean")).alias("hl7")) val evnType = hl7Df.select(segment_field("EVN", 0, col("hl7.segments")) .alias("type"))