From 47dda8f37e985170192c9407a35a1364d782b96d Mon Sep 17 00:00:00 2001 From: "Xu, Qinying (H&B, Herston)" Date: Fri, 5 Apr 2024 10:56:50 +1000 Subject: [PATCH] add comments as reviewer suggested --- .../csiro/variantspark/utils/FileUtils.java | 28 +------------------ .../variantspark/cli/args/SparkArgs.scala | 6 +++- 2 files changed, 6 insertions(+), 28 deletions(-) diff --git a/src/main/java/au/csiro/variantspark/utils/FileUtils.java b/src/main/java/au/csiro/variantspark/utils/FileUtils.java index 582db5f5..1288bcad 100644 --- a/src/main/java/au/csiro/variantspark/utils/FileUtils.java +++ b/src/main/java/au/csiro/variantspark/utils/FileUtils.java @@ -20,37 +20,11 @@ public static boolean isBGZFile(String filePath) { * .vcf is not GZP file and get htsjdk.samtools.SAMFormatException: at header from java.io.BufferedReader.readLine(BufferedReader.java:389) */ try (BufferedInputStream bufferedInputStream = new BufferedInputStream(new FileInputStream(filePath))) { - //bufferedInputStream.mark(100); // mark the current position boolean isValid = BlockCompressedInputStream.isValidFile(bufferedInputStream); - //bufferedInputStream.reset(); // reset back to the marked position return isValid; } catch (IOException e) { - // Handle the exception + //handle exception for non proper bgzip file return false; } } - - /** - * - * @param file: an input file - * @return true if input file is Gzip by check the first two byte of input file - * @throws IOException - */ - public static boolean isInputGZip(final File file) throws IOException { - //final PushbackInputStream pb = new PushbackInputStream(input, 2); - - try(final InputStream input = new FileInputStream(file)){ - int header = input.read(); //read ID1 - if(header == -1) return false; - - int b = input.read(); //read ID2 - if(b == -1) return false; - - //ID2 * 256 + ID1 = 35615 - if( ( (b << 8) | header) == GZIPInputStream.GZIP_MAGIC) - return true; - } - - return false; - } } diff --git a/src/main/scala/au/csiro/variantspark/cli/args/SparkArgs.scala b/src/main/scala/au/csiro/variantspark/cli/args/SparkArgs.scala index 6e72a20a..1eac4ecc 100644 --- a/src/main/scala/au/csiro/variantspark/cli/args/SparkArgs.scala +++ b/src/main/scala/au/csiro/variantspark/cli/args/SparkArgs.scala @@ -2,7 +2,7 @@ package au.csiro.variantspark.cli.args import org.kohsuke.args4j.Option import au.csiro.pbdava.ssparkle.spark.SparkApp -import au.csiro.variantspark.utils._ +import au.csiro.variantspark.utils.FileUtils import org.apache.spark.rdd.RDD import htsjdk.samtools.util.BlockCompressedInputStream import org.apache.hadoop.fs.Path @@ -18,11 +18,15 @@ trait SparkArgs extends SparkApp { val isBGZ = FileUtils.isBGZFile(inputFile) println(inputFile + " is loading to spark RDD, isBGZFile: " + isBGZ) if (isBGZ) { + // BGZIP file is compressed as blocks, requires specialized libraries htsjdk val path = new Path(inputFile) val fs = path.getFileSystem(sc.hadoopConfiguration) val bgzInputStream = new BlockCompressedInputStream(fs.open(path)) + // each blocks can be decompressed independently and to be read in parallel sc.parallelize(Stream.continually(bgzInputStream.readLine()).takeWhile(_ != null).toList) } else { + // The standard GZIP libraries can handle files compressed as a whole + // load .vcf, .vcf.gz or .vcf.bz2 to RDD sc.textFile(inputFile, if (sparkPar > 0) sparkPar else sc.defaultParallelism) } }