diff --git a/bin/etl/hraven-etl.sh b/bin/etl/hraven-etl.sh index b2e2caa..2240811 100755 --- a/bin/etl/hraven-etl.sh +++ b/bin/etl/hraven-etl.sh @@ -40,8 +40,12 @@ costfile=/var/lib/hraven/conf/costFile hadoopconfdir=${HADOOP_CONF_DIR:-$HADOOP_HOME/conf} hbaseconfdir=${HBASE_CONF_DIR:-$HBASE_HOME/conf} # HDFS directories for processing and loading job history data -historyRawDir=/yarn/history/done/ -historyProcessingDir=/hraven/processing/ +year=2014 +month="*" +day="*" +historyDirPattern=/hadoop/mapred/history/done/*/$year/$month/$day/*/* +historyBasePath=/hadoop/mapred/history/done +historyProcessingDir=/hadoop/mapred/history/processing/ ####################################################### #If costfile is empty, fill it with default values @@ -65,7 +69,7 @@ create_pidfile $HRAVEN_PID_DIR trap 'cleanup_pidfile_and_exit $HRAVEN_PID_DIR' INT TERM EXIT # Pre-process -$home/jobFilePreprocessor.sh $hadoopconfdir $historyRawDir $historyProcessingDir $cluster $batchsize $defaultrawfilesizelimit +$home/jobFilePreprocessor.sh $hadoopconfdir $historyBasePath $historyDirPattern $historyProcessingDir $cluster $batchsize $defaultrawfilesizelimit # Load $home/jobFileLoader.sh $hadoopconfdir $mapredmaxsplitsize $schedulerpoolname $cluster $historyProcessingDir diff --git a/bin/etl/jobFilePreprocessor.sh b/bin/etl/jobFilePreprocessor.sh index fd478b6..d050305 100755 --- a/bin/etl/jobFilePreprocessor.sh +++ b/bin/etl/jobFilePreprocessor.sh @@ -19,9 +19,9 @@ # Usage ./jobFilePreprocessor.sh [hadoopconfdir] # [historyrawdir] [historyprocessingdir] [cluster] [batchsize] -if [ $# -ne 6 ] +if [ $# -lt 7 ] then - echo "Usage: `basename $0` [hadoopconfdir] [historyrawdir] [historyprocessingdir] [cluster] [batchsize] [defaultrawfilesizelimit]" + echo "Usage: `basename $0` [hadoopconfdir] [historyBasePath] [historyrawdir] [historyprocessingdir] [cluster] [batchsize] [defaultrawfilesizelimit]" exit 1 fi @@ -39,4 +39,4 @@ fi create_pidfile $HRAVEN_PID_DIR trap 'cleanup_pidfile_and_exit $HRAVEN_PID_DIR' INT TERM EXIT -hadoop --config $1 jar $hravenEtlJar com.twitter.hraven.etl.JobFilePreprocessor -libjars=$LIBJARS -d -i $2 -o $3 -c $4 -b $5 -s $6 \ No newline at end of file +hadoop --config $1 jar $hravenEtlJar com.twitter.hraven.etl.JobFilePreprocessor -libjars=$LIBJARS -d -bi $2 -i $3 -o $4 -c $5 -b $6 -s $7 diff --git a/hraven-core/src/main/java/com/twitter/hraven/Constants.java b/hraven-core/src/main/java/com/twitter/hraven/Constants.java index 9e8653a..8cc607f 100644 --- a/hraven-core/src/main/java/com/twitter/hraven/Constants.java +++ b/hraven-core/src/main/java/com/twitter/hraven/Constants.java @@ -17,6 +17,8 @@ import java.text.SimpleDateFormat; import java.util.TimeZone; +import java.util.regex.Pattern; + import org.apache.hadoop.hbase.util.Bytes; /** @@ -426,4 +428,6 @@ public class Constants { /** name of the properties file used for cluster to cluster identifier mapping */ public static final String HRAVEN_CLUSTER_PROPERTIES_FILENAME = "hRavenClusters.properties"; + + public static final Pattern HADOOPV1HISTORYPATTERN = Pattern.compile("(.*)/done/(.*)/([0-9]{4})/([0-9]{2})/([0-9]{2})/(.*)/(.*)"); } diff --git a/hraven-etl/src/main/java/com/twitter/hraven/etl/FileLister.java b/hraven-etl/src/main/java/com/twitter/hraven/etl/FileLister.java index 91fc66a..13afaaf 100644 --- a/hraven-etl/src/main/java/com/twitter/hraven/etl/FileLister.java +++ b/hraven-etl/src/main/java/com/twitter/hraven/etl/FileLister.java @@ -118,7 +118,9 @@ public static FileStatus[] getListFilesToProcess(long maxFileSize, boolean recur LOG.info(" in getListFilesToProcess maxFileSize=" + maxFileSize + " inputPath= " + inputPath.toUri()); - FileStatus[] origList = listFiles(recurse, hdfs, inputPath, pathFilter); + //Instead of getting a base path and recursing, we insist on getting a path pattern + //and using globStatus to return all files instead, which is much faster than the recursive method call: + FileStatus[] origList = hdfs.globStatus(inputPath, pathFilter); if (origList == null) { LOG.info(" No files found, orig list returning 0"); return new FileStatus[0]; diff --git a/hraven-etl/src/main/java/com/twitter/hraven/etl/JobFilePreprocessor.java b/hraven-etl/src/main/java/com/twitter/hraven/etl/JobFilePreprocessor.java index c34f85a..ae30547 100644 --- a/hraven-etl/src/main/java/com/twitter/hraven/etl/JobFilePreprocessor.java +++ b/hraven-etl/src/main/java/com/twitter/hraven/etl/JobFilePreprocessor.java @@ -18,6 +18,8 @@ import java.io.IOException; import java.util.Arrays; import java.util.Date; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; @@ -137,10 +139,29 @@ private static CommandLine parseArgs(String[] args) throws ParseException { "i", "input", true, - "input directory in hdfs. Default is mapred.job.tracker.history.completed.location."); + "Path pattern for mapred.job.tracker.history.completed.location"); o.setArgName("input-path"); o.setRequired(false); options.addOption(o); + + // Input + o = new Option( + "bi", + "baseinput", + true, + "Base path for mapred.job.tracker.history.completed.location"); + o.setArgName("base-path"); + o.setRequired(false); + options.addOption(o); + + // special parameter - specify if month in history folder pattern should start from 00 + + o = + new Option("zm", "zeromonth", false, + "Pass this option if month in history folder pattern starts from 00"); + o.setArgName("zeromonth"); + o.setRequired(false); + options.addOption(o); // Batch o = new Option("b", "batchSize", true, @@ -214,25 +235,45 @@ public int run(String[] args) throws Exception { // Output should be an hdfs path. FileSystem hdfs = FileSystem.get(hbaseConf); - // Grab the input path argument - String output = commandLine.getOptionValue("o"); - LOG.info(" output=" + output); - Path outputPath = new Path(output); - FileStatus outputFileStatus = hdfs.getFileStatus(outputPath); - - if (!outputFileStatus.isDir()) { - throw new IOException("Output is not a directory" - + outputFileStatus.getPath().getName()); + // Grab the output path argument + String processingDirectory = commandLine.getOptionValue("o"); + LOG.info("output: " + processingDirectory); + Path processingDirectoryPath = new Path(processingDirectory); + + if (!hdfs.exists(processingDirectoryPath)) { + hdfs.mkdirs(processingDirectoryPath); } // Grab the input path argument String input; if (commandLine.hasOption("i")) { input = commandLine.getOptionValue("i"); + + if (commandLine.hasOption("zm")) { + LOG.info("Changing input path pattern for zero-month folder glitch in hadoop"); + Matcher matcher = Constants.HADOOPV1HISTORYPATTERN.matcher(input); + if (matcher.matches()) { + //month is from 00 till 11 for some reason + int month = Integer.parseInt(matcher.group(4)); + input = matcher.replaceFirst("$1/done/$2/$3/" + String.format("%02d", month-1) + "/$5/$6/$7"); + } + } } else { - input = hbaseConf.get("mapred.job.tracker.history.completed.location"); + //input = hbaseConf.get("mapred.job.tracker.history.completed.location"); + //Use should specify the complete path pattern for the history folder + //much more efficient to use globStatus instead + throw new RuntimeException("Kindly provide a path pattern for the history folder"); } LOG.info("input=" + input); + + // Grab the base input argumnt + String baseinput; + if (commandLine.hasOption("bi")) { + baseinput = commandLine.getOptionValue("bi"); + } else { + baseinput = hbaseConf.get("mapred.job.tracker.history.completed.location"); + } + LOG.info("baseinput=" + baseinput); // Grab the batch-size argument int batchSize; @@ -258,11 +299,12 @@ public int run(String[] args) throws Exception { LOG.info("forceAllFiles: " + forceAllFiles); Path inputPath = new Path(input); - FileStatus inputFileStatus = hdfs.getFileStatus(inputPath); + Path baseInputPath = new Path(baseinput); + FileStatus baseInputFileStatus = hdfs.getFileStatus(baseInputPath); - if (!inputFileStatus.isDir()) { - throw new IOException("Input is not a directory" - + inputFileStatus.getPath().getName()); + if (!baseInputFileStatus.isDir()) { + throw new IOException("Base input is not a directory" + + baseInputFileStatus.getPath().getName()); } // Grab the cluster argument @@ -323,10 +365,9 @@ public int run(String[] args) throws Exception { String timestamp = Constants.TIMESTAMP_FORMAT.format(new Date( minModificationTimeMillis)); - ContentSummary contentSummary = hdfs.getContentSummary(inputPath); - LOG.info("Listing / filtering (" - + contentSummary.getFileCount() + ") files in: " + inputPath - + " that are modified since " + timestamp); + ContentSummary contentSummary = hdfs.getContentSummary(baseInputPath); + LOG.info("Listing / filtering " + contentSummary.getFileCount() + " files in: " + inputPath + + " (" + baseInputPath + ") that are modified since " + timestamp + "(" + minModificationTimeMillis + ")"); // get the files in the done folder, // need to traverse dirs under done recursively for versions @@ -344,14 +385,14 @@ public int run(String[] args) throws Exception { LOG.info("Batch count: " + batchCount); for (int b = 0; b < batchCount; b++) { processBatch(jobFileStatusses, b, batchSize, processRecordService, - cluster, outputPath); + cluster, processingDirectoryPath); } } finally { processRecordService.close(); } - Statistics statistics = FileSystem.getStatistics(inputPath.toUri() + Statistics statistics = FileSystem.getStatistics(baseInputPath.toUri() .getScheme(), hdfs.getClass()); if (statistics != null) { LOG.info("HDFS bytes read: " + statistics.getBytesRead());