Skip to content
This repository has been archived by the owner on Nov 28, 2020. It is now read-only.

Commit

Permalink
Chromosome opt (#27)
Browse files Browse the repository at this point in the history
  • Loading branch information
mwiewior authored Mar 14, 2018
1 parent 75adf28 commit 77d0724
Show file tree
Hide file tree
Showing 5 changed files with 17 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ case class IntervalTreeJoinOptimChromosome(left: SparkPlan,
right: SparkPlan,
condition: Seq[Expression],
context: SparkSession,
minOverlap: Int, maxGap: Int, maxBroadcastSize : Int,
minOverlap: Int, maxGap: Int,
useJoinOrder: Boolean
) extends BinaryExecNode with Serializable {
@transient lazy val output = left.output ++ right.output
Expand Down Expand Up @@ -84,7 +84,7 @@ case class IntervalTreeJoinOptimChromosome(left: SparkPlan,
if ( v1Size < v2Size ) {
logger.warn(s"Broadcasting first table")
val v3 = IntervalTreeJoinOptimChromosomeImpl.overlapJoin(context.sparkContext, v1kv, v2kv,
v1.count(), minOverlap, maxGap, maxBroadcastSize) //FIXME:can be further optimized!
v1.count(), minOverlap, maxGap) //FIXME:can be further optimized!
v3.mapPartitions(
p => {
val joiner = GenerateUnsafeRowJoiner.create(left.schema, right.schema)
Expand All @@ -97,7 +97,7 @@ case class IntervalTreeJoinOptimChromosome(left: SparkPlan,
else {
logger.warn(s"Broadcasting second table")
val v3 = IntervalTreeJoinOptimChromosomeImpl.overlapJoin(context.sparkContext, v2kv, v1kv,
v2.count(), minOverlap, maxGap, maxBroadcastSize)
v2.count(), minOverlap, maxGap)
v3.mapPartitions(
p => {
val joiner = GenerateUnsafeRowJoiner.create(right.schema, left.schema)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ object IntervalTreeJoinOptimChromosomeImpl extends Serializable {
def overlapJoin(sc: SparkContext,
rdd1: RDD[(String,Interval[Int],InternalRow)],
rdd2: RDD[(String,Interval[Int],InternalRow)], rdd1Count:Long,
minOverlap:Int, maxGap: Int, maxBroadCastSize: Int): RDD[(InternalRow, InternalRow)] = {
minOverlap:Int, maxGap: Int): RDD[(InternalRow, InternalRow)] = {

val logger = Logger.getLogger(this.getClass.getCanonicalName)

Expand All @@ -73,7 +73,7 @@ object IntervalTreeJoinOptimChromosomeImpl extends Serializable {
* https://www.ncbi.nlm.nih.gov/pmc/articles/PMC4741060/
*/

val optimizer = new JoinOptimizerChromosome(sc,rdd1, rdd1Count, maxBroadCastSize)
val optimizer = new JoinOptimizerChromosome(sc,rdd1, rdd1Count)
sc.setLogLevel("WARN")
logger.warn(optimizer.debugInfo )

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class IntervalTreeJoinStrategyOptim(spark: SparkSession) extends Strategy with S
spark.sparkContext.setLogLevel("WARN")
/*register functions*/
IntervalTreeJoinOptimChromosome(planLater(left), planLater(right),
rangeJoinKeys, spark, minOverlap.toInt, maxGap.toInt, maxBroadcastSize.toInt, useJoinOrder.toBoolean) :: Nil
rangeJoinKeys, spark, minOverlap.toInt, maxGap.toInt, useJoinOrder.toBoolean) :: Nil
}
case _ =>
Nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,16 @@ import org.biodatageeks.rangejoins.IntervalTree.{Interval, IntervalWithRow}
import org.biodatageeks.rangejoins.optimizer.RangeJoinMethod.RangeJoinMethod


class JoinOptimizerChromosome(sc: SparkContext, rdd: RDD[(String,Interval[Int],InternalRow)], rddCount : Long, maxBroadcastSize : Int) {
class JoinOptimizerChromosome(sc: SparkContext, rdd: RDD[(String,Interval[Int],InternalRow)], rddCount : Long) {


val estBroadcastSize = estimateBroadcastSize(rdd,rddCount)
val maxBroadcastSize = sc
.getConf
.getOption("spark.biodatageeks.rangejoin.maxBroadcastSize") match {
case Some(size) => size.toLong
case _ => 0.1*scala.math.max((sc.getConf.getSizeAsBytes("spark.driver.memory","0")),1024*(1024*1024)) //defaults 128MB or 0.1 * Spark Driver's memory
}
val estBroadcastSize = estimateBroadcastSize(rdd,rddCount)


private def estimateBroadcastSize(rdd: RDD[(String,Interval[Int],InternalRow)], rddCount: Long): Long = {
Expand All @@ -23,8 +29,8 @@ class JoinOptimizerChromosome(sc: SparkContext, rdd: RDD[(String,Interval[Int],I

def debugInfo = {
s"""
|Broadcast structure size is ~ ${estBroadcastSize/1024} kb
|spark.biodatageeks.rangejoin.maxBroadcastSize is set to ${maxBroadcastSize/1024} kb"
|Broadcast structure size is ~ ${math.rint(100*estBroadcastSize/1024.0)/100} kb
|spark.biodatageeks.rangejoin.maxBroadcastSize is set to ${(maxBroadcastSize/1024).toInt} kb"
|Using ${getRangeJoinMethod.toString} join method
""".stripMargin
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ class FeatureCountsTestSuite extends FunSuite with DataFrameSuiteBase with Befor
targets
.createOrReplaceTempView("targets")

spark.sql(query).explain(false)
assert(spark.sql(query).first().getLong(0) === 1484L)
}

Expand Down

0 comments on commit 77d0724

Please sign in to comment.