Skip to content
This repository has been archived by the owner on Nov 28, 2020. It is now read-only.

Commit

Permalink
featurecounts tests (#25)
Browse files Browse the repository at this point in the history
  • Loading branch information
mwiewior authored Mar 13, 2018
1 parent eba5a97 commit 5ab1cd5
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 139 deletions.
6 changes: 6 additions & 0 deletions bin/run_scenario.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
#!/usr/bin/env bash
export APP_SLACK_ICON_EMOJI=:hammer_and_pick:
export APP_SLACK_USERNAME=biodatageek-tester

seq 5 5 20 | xargs -I {} sh -c "slack '#project-genomicranges' 'Running a performance test using {} Spark Executors with 4 cores each...' ; bin/run_perf_test.sh --script=performance/test_min_overlap_cluster.scala --iterations=2 --master=yarn --spark-home=/data/local/opt/spark-2.2.1-bin-hadoop2.7 --executor-memory=8g --num-executors={} --executor-cores=4 --driver-memory=8g"
slack '#project-genomicranges' 'All tests completed!'
148 changes: 22 additions & 126 deletions performance/featureCounts.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,90 +3,17 @@ import htsjdk.samtools.ValidationStringency
import org.apache.hadoop.io.LongWritable
import org.apache.spark.SparkContext
import org.apache.spark.rdd.NewHadoopRDD
import org.biodatageeks.rangejoins.IntervalTree.IntervalTreeJoinStrategyOptim
import org.biodatageeks.rangejoins.common.metrics.MetricsCollector
import org.seqdoop.hadoop_bam.{BAMInputFormat, FileVirtualSplit, SAMRecordWritable}
import org.seqdoop.hadoop_bam.util.SAMHeaderReader
sc.hadoopConfiguration.set(SAMHeaderReader.VALIDATION_STRINGENCY_PROPERTY, ValidationStringency.SILENT.toString)
case class PosRecord(contigName:String,start:Int,end:Int)
val alignments = sc.newAPIHadoopFile[LongWritable, SAMRecordWritable, BAMInputFormat]("/data/granges/NA12878.ga2.exome.maq.recal.bam").map(_._2.get).map(r=>PosRecord(r.getContig,r.getStart,r.getEnd))

val reads=alignments.toDF
reads.createOrReplaceTempView("reads")

val targets = spark.read.parquet("/data/granges/tgp_exome_hg18.adam")
targets.createOrReplaceTempView("targets")


val query=""" SELECT targets.contigName,targets.start,targets.end,count(*) FROM reads JOIN targets
| ON (targets.contigName=reads.contigName
| AND
| CAST(reads.end AS INTEGER)>=CAST(targets.start AS INTEGER)
| AND
| CAST(reads.start AS INTEGER)<=CAST(targets.end AS INTEGER)
| )
| GROUP BY targets.contigName,targets.start,targets.end"""


time(spark.sql(query).show)


val metricsTable = "granges.metrics"

import htsjdk.samtools.ValidationStringency

import org.apache.hadoop.io.LongWritable
import org.apache.spark.SparkContext
import org.apache.spark.rdd.NewHadoopRDD
import org.seqdoop.hadoop_bam.{BAMInputFormat, FileVirtualSplit, SAMRecordWritable}
import org.seqdoop.hadoop_bam.util.SAMHeaderReader
sc.hadoopConfiguration.set(SAMHeaderReader.VALIDATION_STRINGENCY_PROPERTY, ValidationStringency.SILENT.toString)
case class PosRecord(contigName:String,start:Int,end:Int)
val alignments = sc.newAPIHadoopFile[LongWritable, SAMRecordWritable, BAMInputFormat]("file:///data/samples//NA12878/NA12878.ga2.exome.maq.recal.bam").map(_._2.get).map(r=>PosRecord(r.getContig,r.getStart,r.getEnd))

val reads=alignments.toDF
reads.createOrReplaceTempView("reads")

val targets = spark.read.parquet("file:///data/samples/NA12878/tgp_exome_hg18.adam")
targets.createOrReplaceTempView("targets")


val query=""" SELECT targets.contigName,targets.start,targets.end,count(*) FROM reads JOIN targets
| ON (targets.contigName=reads.contigName
| AND
| CAST(reads.end AS INTEGER)>=CAST(targets.start AS INTEGER)
| AND
| CAST(reads.start AS INTEGER)<=CAST(targets.end AS INTEGER)
| )
| GROUP BY targets.contigName,targets.start,targets.end"""




val query2="""SELECT targets.contigName,targets.start,targets.end,count(*) FROM reads JOIN targets
ON (targets.contigName=reads.contigName
AND
CAST(reads.end AS INTEGER)>=CAST(targets.start AS INTEGER)
AND
CAST(reads.start AS INTEGER)<=CAST(targets.end AS INTEGER)
)
GROUP BY targets.contigName,targets.start,targets.end
having contigName='chr1' AND start=20138 AND end=20294"""


time(spark.sql(query2))





import htsjdk.samtools.ValidationStringency
import org.apache.hadoop.io.LongWritable
import org.apache.spark.SparkContext
import org.apache.spark.rdd.NewHadoopRDD
import org.seqdoop.hadoop_bam.{BAMInputFormat, FileVirtualSplit, SAMRecordWritable}
import org.seqdoop.hadoop_bam.util.SAMHeaderReader
import org.biodatageeks.rangejoins.IntervalTree.IntervalTreeJoinStrategyOptim
spark.experimental.extraStrategies = new IntervalTreeJoinStrategyOptim(spark) :: Nil

sc.hadoopConfiguration.set(SAMHeaderReader.VALIDATION_STRINGENCY_PROPERTY, ValidationStringency.SILENT.toString)
case class PosRecord(contigName:String,start:Int,end:Int)
val alignments = sc.newAPIHadoopFile[LongWritable, SAMRecordWritable, BAMInputFormat]("/data/granges/NA12878.ga2.exome.maq.recal.bam").map(_._2.get).map(r=>PosRecord(r.getContig,r.getStart,r.getEnd))

val reads=alignments.toDF
Expand All @@ -95,7 +22,6 @@ reads.createOrReplaceTempView("reads")
val targets = spark.read.parquet("/data/granges/tgp_exome_hg18.adam")
targets.createOrReplaceTempView("targets")


val query=""" SELECT targets.contigName,targets.start,targets.end,count(*) FROM reads JOIN targets
| ON (targets.contigName=reads.contigName
| AND
Expand All @@ -106,62 +32,32 @@ val query=""" SELECT targets.contigName,targets.start,targets.end,count(*) FR
| GROUP BY targets.contigName,targets.start,targets.end"""




val query2="""SELECT targets.contigName,targets.start,targets.end,count(*) FROM reads JOIN targets
ON (targets.contigName=reads.contigName
AND
CAST(reads.end AS INTEGER)>=CAST(targets.start AS INTEGER)
AND
CAST(reads.start AS INTEGER)<=CAST(targets.end AS INTEGER)
)
GROUP BY targets.contigName,targets.start,targets.end
having contigName='chr1' AND start=20138 AND end=20294"""

val query3="""SELECT reads.contigName,reads.start,reads.end FROM reads JOIN targets
ON (targets.contigName=reads.contigName
AND
CAST(reads.end AS INTEGER)>=CAST(targets.start AS INTEGER)
AND
CAST(reads.start AS INTEGER)<=CAST(targets.end AS INTEGER)
spark.sqlContext.setConf("spark.biodatageeks.rangejoin.maxBroadcastSize", (100 *1024*1024).toString)
val mc = new MetricsCollector(spark,metricsTable)
mc.initMetricsTable
mc.runAndCollectMetrics(
"q_featurecounts_bam_wes",
"spark_granges_it_bc_all",
Array("reads","targets"),
query,
true
)
where targets.contigName='chr1' AND targets.start=20138 AND targets.end=20294"""

spark.experimental.extraStrategies = new IntervalTreeJoinStrategyOptim(spark) :: Nil
spark.sql(query3).write.csv("/data/granges/bam_chr1_20138_20294.csv")






import htsjdk.samtools.ValidationStringency
import org.apache.hadoop.io.LongWritable
import org.apache.spark.SparkContext
import org.apache.spark.rdd.NewHadoopRDD
import org.seqdoop.hadoop_bam.{BAMInputFormat, FileVirtualSplit, SAMRecordWritable}
import org.seqdoop.hadoop_bam.util.SAMHeaderReader
import org.biodatageeks.rangejoins.IntervalTree.IntervalTreeJoinStrategyOptim



val reads = spark.read.parquet("/data/granges/NA12878.ga2.exome.maq.recal.adam")
reads.createOrReplaceTempView("reads")

val targets = spark.read.parquet("/data/granges/tgp_exome_hg18.adam")
targets.createOrReplaceTempView("targets")




val query3="""SELECT reads.contigName,reads.start,reads.end FROM reads JOIN targets
ON (targets.contigName=reads.contigName
AND
CAST(reads.end AS INTEGER)>=CAST(targets.start AS INTEGER)
AND
CAST(reads.start AS INTEGER)<=CAST(targets.end AS INTEGER)
)
where targets.contigName='chr1' AND targets.start=20138 AND targets.end=20294"""

spark.experimental.extraStrategies = new IntervalTreeJoinStrategyOptim(spark) :: Nil
spark.sql(query3).write.csv("/data/granges/adam_chr1_20138_20294.csv")
val mc = new MetricsCollector(spark,metricsTable)
mc.initMetricsTable
mc.runAndCollectMetrics(
"q_featurecounts_adam_wes",
"spark_granges_it_bc_all",
Array("reads","targets"),
query,
true
)
22 changes: 11 additions & 11 deletions performance/test_min_overlap_cluster.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,14 @@ spark.sqlContext.udf.register("overlaplength", RangeMethods.calcOverlap _)
spark.experimental.extraStrategies = new IntervalTreeJoinStrategyOptim(spark) :: Nil
val query2 =
s"""
|SELECT ref.*,snp.* FROM snp JOIN ref
|ON (ref.chr=snp.chr
|AND
|snp.end>=ref.start
|SELECT * FROM reads JOIN targets
|ON (targets.contigName=reads.contigName
|AND
|snp.start<=ref.end
|CAST(reads.end AS INTEGER)>=CAST(targets.start AS INTEGER)
|AND
|overlaplength(snp.start,snp.end,ref.start,ref.end)>=10
|CAST(reads.start AS INTEGER)<=CAST(targets.end AS INTEGER)
| AND
|overlaplength(reads.start,reads.end,targets.start,targets.end)>=10
|)
|
""".stripMargin
Expand All @@ -49,7 +49,7 @@ minOverlap.foreach(mo=> {
val mc1 = new MetricsCollector(spark, metricsTable)
mc1.initMetricsTable
mc1.runAndCollectMetrics(
s"q_overlap_reads_target_adam_wgs_min${mo}",
s"q_overlap_reads_target_adam_wgs_min${mo}_fix",
"spark_granges_it_bc_all",
Array("reads", "targets"),
query
Expand All @@ -58,10 +58,10 @@ minOverlap.foreach(mo=> {

/*bdg-spark-granges - broadcast intervals*/
spark.experimental.extraStrategies = new IntervalTreeJoinStrategyOptim(spark) :: Nil
spark.sqlContext.setConf("spark.biodatageeks.rangejoin.maxBroadcastSize", (1024 * 1024).toString)
spark.sqlContext.setConf("spark.biodatageeks.rangejoin.maxBroadcastSize", (1024).toString)
val mc2 = new MetricsCollector(spark, metricsTable)
mc2.runAndCollectMetrics(
s"q_overlap_reads_target_adam_wgs_min${mo}",
s"q_overlap_reads_target_adam_wgs_min${mo}_fix",
"spark_granges_it_bc_int",
Array("reads", "targets"),
query
Expand All @@ -73,7 +73,7 @@ minOverlap.foreach(mo=> {
spark.experimental.extraStrategies = Nil
val mc3 = new MetricsCollector(spark, metricsTable)
mc3.runAndCollectMetrics(
s"q_overlap_reads_target_adam_wgs_min${mo}",
s"q_overlap_reads_target_adam_wgs_min${mo}_fix",
"spark_default",
Array("reads", "targets"),
query2
Expand All @@ -83,4 +83,4 @@ minOverlap.foreach(mo=> {
}
)

System.exit(0)
System.exit(0)
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ class MetricsCollector( sparkSession: SparkSession, metricsTableName: String) {



def runAndCollectMetrics(queryId:String,algoName:String,tables:Array[String],query:String) = {
def runAndCollectMetrics(queryId:String,algoName:String,tables:Array[String],query:String, saveOutput:Boolean = false) = {

val arraysCount = new Array[Long](tables.size)
for(i<- 0 to arraysCount.size - 1 ){
Expand All @@ -86,7 +86,11 @@ class MetricsCollector( sparkSession: SparkSession, metricsTableName: String) {
.getLong(0)
}
val stageMetrics = ch.cern.sparkmeasure.StageMetrics(spark)
val executionTime = time(stageMetrics.runAndMeasure(spark.sql(query).count()))._2
val executionTime = saveOutput match {
case false => time(stageMetrics.runAndMeasure(spark.sql(query).count()))._2
case _ => time(stageMetrics.runAndMeasure(spark.sql(query).write.csv(s"/tmp/${queryId}_${scala.util.Random.nextLong()}.csv")))._2
}

val metrics = stageMetrics.createStageMetricsDF()
val aggMetrics = metrics
.drop(columnsToDrop: _*)
Expand Down

0 comments on commit 5ab1cd5

Please sign in to comment.