diff --git a/src/main/scala/org/biodatageeks/rangejoins/methods/IntervalTree/IntervalTreeJoinOptimChromosome.scala b/src/main/scala/org/biodatageeks/rangejoins/methods/IntervalTree/IntervalTreeJoinOptimChromosome.scala index a6b27883..88772d7e 100644 --- a/src/main/scala/org/biodatageeks/rangejoins/methods/IntervalTree/IntervalTreeJoinOptimChromosome.scala +++ b/src/main/scala/org/biodatageeks/rangejoins/methods/IntervalTree/IntervalTreeJoinOptimChromosome.scala @@ -34,7 +34,7 @@ case class IntervalTreeJoinOptimChromosome(left: SparkPlan, right: SparkPlan, condition: Seq[Expression], context: SparkSession, - minOverlap: Int, maxGap: Int, maxBroadcastSize : Int, + minOverlap: Int, maxGap: Int, useJoinOrder: Boolean ) extends BinaryExecNode with Serializable { @transient lazy val output = left.output ++ right.output @@ -84,7 +84,7 @@ case class IntervalTreeJoinOptimChromosome(left: SparkPlan, if ( v1Size < v2Size ) { logger.warn(s"Broadcasting first table") val v3 = IntervalTreeJoinOptimChromosomeImpl.overlapJoin(context.sparkContext, v1kv, v2kv, - v1.count(), minOverlap, maxGap, maxBroadcastSize) //FIXME:can be further optimized! + v1.count(), minOverlap, maxGap) //FIXME:can be further optimized! v3.mapPartitions( p => { val joiner = GenerateUnsafeRowJoiner.create(left.schema, right.schema) @@ -97,7 +97,7 @@ case class IntervalTreeJoinOptimChromosome(left: SparkPlan, else { logger.warn(s"Broadcasting second table") val v3 = IntervalTreeJoinOptimChromosomeImpl.overlapJoin(context.sparkContext, v2kv, v1kv, - v2.count(), minOverlap, maxGap, maxBroadcastSize) + v2.count(), minOverlap, maxGap) v3.mapPartitions( p => { val joiner = GenerateUnsafeRowJoiner.create(right.schema, left.schema) diff --git a/src/main/scala/org/biodatageeks/rangejoins/methods/IntervalTree/IntervalTreeJoinOptimChromosomeImpl.scala b/src/main/scala/org/biodatageeks/rangejoins/methods/IntervalTree/IntervalTreeJoinOptimChromosomeImpl.scala index 5b68a0cb..44ad04d5 100644 --- a/src/main/scala/org/biodatageeks/rangejoins/methods/IntervalTree/IntervalTreeJoinOptimChromosomeImpl.scala +++ b/src/main/scala/org/biodatageeks/rangejoins/methods/IntervalTree/IntervalTreeJoinOptimChromosomeImpl.scala @@ -60,7 +60,7 @@ object IntervalTreeJoinOptimChromosomeImpl extends Serializable { def overlapJoin(sc: SparkContext, rdd1: RDD[(String,Interval[Int],InternalRow)], rdd2: RDD[(String,Interval[Int],InternalRow)], rdd1Count:Long, - minOverlap:Int, maxGap: Int, maxBroadCastSize: Int): RDD[(InternalRow, InternalRow)] = { + minOverlap:Int, maxGap: Int): RDD[(InternalRow, InternalRow)] = { val logger = Logger.getLogger(this.getClass.getCanonicalName) @@ -73,7 +73,7 @@ object IntervalTreeJoinOptimChromosomeImpl extends Serializable { * https://www.ncbi.nlm.nih.gov/pmc/articles/PMC4741060/ */ - val optimizer = new JoinOptimizerChromosome(sc,rdd1, rdd1Count, maxBroadCastSize) + val optimizer = new JoinOptimizerChromosome(sc,rdd1, rdd1Count) sc.setLogLevel("WARN") logger.warn(optimizer.debugInfo ) diff --git a/src/main/scala/org/biodatageeks/rangejoins/methods/IntervalTree/IntervalTreeJoinStrategyOptim.scala b/src/main/scala/org/biodatageeks/rangejoins/methods/IntervalTree/IntervalTreeJoinStrategyOptim.scala index 58f729ac..0455fa13 100644 --- a/src/main/scala/org/biodatageeks/rangejoins/methods/IntervalTree/IntervalTreeJoinStrategyOptim.scala +++ b/src/main/scala/org/biodatageeks/rangejoins/methods/IntervalTree/IntervalTreeJoinStrategyOptim.scala @@ -24,7 +24,7 @@ class IntervalTreeJoinStrategyOptim(spark: SparkSession) extends Strategy with S spark.sparkContext.setLogLevel("WARN") /*register functions*/ IntervalTreeJoinOptimChromosome(planLater(left), planLater(right), - rangeJoinKeys, spark, minOverlap.toInt, maxGap.toInt, maxBroadcastSize.toInt, useJoinOrder.toBoolean) :: Nil + rangeJoinKeys, spark, minOverlap.toInt, maxGap.toInt, useJoinOrder.toBoolean) :: Nil } case _ => Nil diff --git a/src/main/scala/org/biodatageeks/rangejoins/optimizer/JoinOptimizerChromosome.scala b/src/main/scala/org/biodatageeks/rangejoins/optimizer/JoinOptimizerChromosome.scala index 43b0182d..4cc1c243 100644 --- a/src/main/scala/org/biodatageeks/rangejoins/optimizer/JoinOptimizerChromosome.scala +++ b/src/main/scala/org/biodatageeks/rangejoins/optimizer/JoinOptimizerChromosome.scala @@ -10,10 +10,16 @@ import org.biodatageeks.rangejoins.IntervalTree.{Interval, IntervalWithRow} import org.biodatageeks.rangejoins.optimizer.RangeJoinMethod.RangeJoinMethod -class JoinOptimizerChromosome(sc: SparkContext, rdd: RDD[(String,Interval[Int],InternalRow)], rddCount : Long, maxBroadcastSize : Int) { +class JoinOptimizerChromosome(sc: SparkContext, rdd: RDD[(String,Interval[Int],InternalRow)], rddCount : Long) { - val estBroadcastSize = estimateBroadcastSize(rdd,rddCount) + val maxBroadcastSize = sc + .getConf + .getOption("spark.biodatageeks.rangejoin.maxBroadcastSize") match { + case Some(size) => size.toLong + case _ => 0.1*scala.math.max((sc.getConf.getSizeAsBytes("spark.driver.memory","0")),1024*(1024*1024)) //defaults 128MB or 0.1 * Spark Driver's memory + } + val estBroadcastSize = estimateBroadcastSize(rdd,rddCount) private def estimateBroadcastSize(rdd: RDD[(String,Interval[Int],InternalRow)], rddCount: Long): Long = { @@ -23,8 +29,8 @@ class JoinOptimizerChromosome(sc: SparkContext, rdd: RDD[(String,Interval[Int],I def debugInfo = { s""" - |Broadcast structure size is ~ ${estBroadcastSize/1024} kb - |spark.biodatageeks.rangejoin.maxBroadcastSize is set to ${maxBroadcastSize/1024} kb" + |Broadcast structure size is ~ ${math.rint(100*estBroadcastSize/1024.0)/100} kb + |spark.biodatageeks.rangejoin.maxBroadcastSize is set to ${(maxBroadcastSize/1024).toInt} kb" |Using ${getRangeJoinMethod.toString} join method """.stripMargin } diff --git a/src/test/scala/pl/edu/pw/ii/biodatageeks/tests/FeatureCountsTestSuite.scala b/src/test/scala/pl/edu/pw/ii/biodatageeks/tests/FeatureCountsTestSuite.scala index 6ebdd844..f49fcc92 100644 --- a/src/test/scala/pl/edu/pw/ii/biodatageeks/tests/FeatureCountsTestSuite.scala +++ b/src/test/scala/pl/edu/pw/ii/biodatageeks/tests/FeatureCountsTestSuite.scala @@ -54,6 +54,7 @@ class FeatureCountsTestSuite extends FunSuite with DataFrameSuiteBase with Befor targets .createOrReplaceTempView("targets") + spark.sql(query).explain(false) assert(spark.sql(query).first().getLong(0) === 1484L) }