Skip to content
This repository has been archived by the owner on Jan 15, 2022. It is now read-only.

glob list instead of simple hdfs list and pattern support for input #103

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions bin/etl/hraven-etl.sh
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,12 @@ costfile=/var/lib/hraven/conf/costFile
hadoopconfdir=${HADOOP_CONF_DIR:-$HADOOP_HOME/conf}
hbaseconfdir=${HBASE_CONF_DIR:-$HBASE_HOME/conf}
# HDFS directories for processing and loading job history data
historyRawDir=/yarn/history/done/
historyProcessingDir=/hraven/processing/
year=2014
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So each year you would have to manually adjust this to the new year ?
I'm pretty sure that we'd forget to make this change on January first in a daze of New Year and we'd have collection broken.

month="*"
day="*"
historyDirPattern=/hadoop/mapred/history/done/*/$year/$month/$day/*/*
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So you are saying that we should no longer support the original format, and everybody must use this new format ? We won't support the older format anymore ?

historyBasePath=/hadoop/mapred/history/done
historyProcessingDir=/hadoop/mapred/history/processing/
#######################################################

#If costfile is empty, fill it with default values
Expand All @@ -65,7 +69,7 @@ create_pidfile $HRAVEN_PID_DIR
trap 'cleanup_pidfile_and_exit $HRAVEN_PID_DIR' INT TERM EXIT

# Pre-process
$home/jobFilePreprocessor.sh $hadoopconfdir $historyRawDir $historyProcessingDir $cluster $batchsize $defaultrawfilesizelimit
$home/jobFilePreprocessor.sh $hadoopconfdir $historyBasePath $historyDirPattern $historyProcessingDir $cluster $batchsize $defaultrawfilesizelimit

# Load
$home/jobFileLoader.sh $hadoopconfdir $mapredmaxsplitsize $schedulerpoolname $cluster $historyProcessingDir
Expand Down
6 changes: 3 additions & 3 deletions bin/etl/jobFilePreprocessor.sh
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@
# Usage ./jobFilePreprocessor.sh [hadoopconfdir]
# [historyrawdir] [historyprocessingdir] [cluster] [batchsize]

if [ $# -ne 6 ]
if [ $# -lt 7 ]
then
echo "Usage: `basename $0` [hadoopconfdir] [historyrawdir] [historyprocessingdir] [cluster] [batchsize] [defaultrawfilesizelimit]"
echo "Usage: `basename $0` [hadoopconfdir] [historyBasePath] [historyrawdir] [historyprocessingdir] [cluster] [batchsize] [defaultrawfilesizelimit]"
exit 1
fi

Expand All @@ -39,4 +39,4 @@ fi
create_pidfile $HRAVEN_PID_DIR
trap 'cleanup_pidfile_and_exit $HRAVEN_PID_DIR' INT TERM EXIT

hadoop --config $1 jar $hravenEtlJar com.twitter.hraven.etl.JobFilePreprocessor -libjars=$LIBJARS -d -i $2 -o $3 -c $4 -b $5 -s $6
hadoop --config $1 jar $hravenEtlJar com.twitter.hraven.etl.JobFilePreprocessor -libjars=$LIBJARS -d -bi $2 -i $3 -o $4 -c $5 -b $6 -s $7
4 changes: 4 additions & 0 deletions hraven-core/src/main/java/com/twitter/hraven/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

import java.text.SimpleDateFormat;
import java.util.TimeZone;
import java.util.regex.Pattern;

import org.apache.hadoop.hbase.util.Bytes;

/**
Expand Down Expand Up @@ -426,4 +428,6 @@ public class Constants {

/** name of the properties file used for cluster to cluster identifier mapping */
public static final String HRAVEN_CLUSTER_PROPERTIES_FILENAME = "hRavenClusters.properties";

public static final Pattern HADOOPV1HISTORYPATTERN = Pattern.compile("(.*)/done/(.*)/([0-9]{4})/([0-9]{2})/([0-9]{2})/(.*)/(.*)");
}
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,9 @@ public static FileStatus[] getListFilesToProcess(long maxFileSize, boolean recur

LOG.info(" in getListFilesToProcess maxFileSize=" + maxFileSize
+ " inputPath= " + inputPath.toUri());
FileStatus[] origList = listFiles(recurse, hdfs, inputPath, pathFilter);
//Instead of getting a base path and recursing, we insist on getting a path pattern
//and using globStatus to return all files instead, which is much faster than the recursive method call:
FileStatus[] origList = hdfs.globStatus(inputPath, pathFilter);
if (origList == null) {
LOG.info(" No files found, orig list returning 0");
return new FileStatus[0];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import java.io.IOException;
import java.util.Arrays;
import java.util.Date;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
Expand Down Expand Up @@ -137,10 +139,29 @@ private static CommandLine parseArgs(String[] args) throws ParseException {
"i",
"input",
true,
"input directory in hdfs. Default is mapred.job.tracker.history.completed.location.");
"Path pattern for mapred.job.tracker.history.completed.location");
o.setArgName("input-path");
o.setRequired(false);
options.addOption(o);

// Input
o = new Option(
"bi",
"baseinput",
true,
"Base path for mapred.job.tracker.history.completed.location");
o.setArgName("base-path");
o.setRequired(false);
options.addOption(o);

// special parameter - specify if month in history folder pattern should start from 00

o =
new Option("zm", "zeromonth", false,
"Pass this option if month in history folder pattern starts from 00");
o.setArgName("zeromonth");
o.setRequired(false);
options.addOption(o);

// Batch
o = new Option("b", "batchSize", true,
Expand Down Expand Up @@ -214,25 +235,45 @@ public int run(String[] args) throws Exception {
// Output should be an hdfs path.
FileSystem hdfs = FileSystem.get(hbaseConf);

// Grab the input path argument
String output = commandLine.getOptionValue("o");
LOG.info(" output=" + output);
Path outputPath = new Path(output);
FileStatus outputFileStatus = hdfs.getFileStatus(outputPath);

if (!outputFileStatus.isDir()) {
throw new IOException("Output is not a directory"
+ outputFileStatus.getPath().getName());
// Grab the output path argument
String processingDirectory = commandLine.getOptionValue("o");
LOG.info("output: " + processingDirectory);
Path processingDirectoryPath = new Path(processingDirectory);

if (!hdfs.exists(processingDirectoryPath)) {
hdfs.mkdirs(processingDirectoryPath);
}

// Grab the input path argument
String input;
if (commandLine.hasOption("i")) {
input = commandLine.getOptionValue("i");

if (commandLine.hasOption("zm")) {
LOG.info("Changing input path pattern for zero-month folder glitch in hadoop");
Matcher matcher = Constants.HADOOPV1HISTORYPATTERN.matcher(input);
if (matcher.matches()) {
//month is from 00 till 11 for some reason
int month = Integer.parseInt(matcher.group(4));
input = matcher.replaceFirst("$1/done/$2/$3/" + String.format("%02d", month-1) + "/$5/$6/$7");
}
}
} else {
input = hbaseConf.get("mapred.job.tracker.history.completed.location");
//input = hbaseConf.get("mapred.job.tracker.history.completed.location");
//Use should specify the complete path pattern for the history folder
//much more efficient to use globStatus instead
throw new RuntimeException("Kindly provide a path pattern for the history folder");
}
LOG.info("input=" + input);

// Grab the base input argumnt
String baseinput;
if (commandLine.hasOption("bi")) {
baseinput = commandLine.getOptionValue("bi");
} else {
baseinput = hbaseConf.get("mapred.job.tracker.history.completed.location");
}
LOG.info("baseinput=" + baseinput);

// Grab the batch-size argument
int batchSize;
Expand All @@ -258,11 +299,12 @@ public int run(String[] args) throws Exception {
LOG.info("forceAllFiles: " + forceAllFiles);

Path inputPath = new Path(input);
FileStatus inputFileStatus = hdfs.getFileStatus(inputPath);
Path baseInputPath = new Path(baseinput);
FileStatus baseInputFileStatus = hdfs.getFileStatus(baseInputPath);

if (!inputFileStatus.isDir()) {
throw new IOException("Input is not a directory"
+ inputFileStatus.getPath().getName());
if (!baseInputFileStatus.isDir()) {
throw new IOException("Base input is not a directory"
+ baseInputFileStatus.getPath().getName());
}

// Grab the cluster argument
Expand Down Expand Up @@ -323,10 +365,9 @@ public int run(String[] args) throws Exception {
String timestamp = Constants.TIMESTAMP_FORMAT.format(new Date(
minModificationTimeMillis));

ContentSummary contentSummary = hdfs.getContentSummary(inputPath);
LOG.info("Listing / filtering ("
+ contentSummary.getFileCount() + ") files in: " + inputPath
+ " that are modified since " + timestamp);
ContentSummary contentSummary = hdfs.getContentSummary(baseInputPath);
LOG.info("Listing / filtering " + contentSummary.getFileCount() + " files in: " + inputPath
+ " (" + baseInputPath + ") that are modified since " + timestamp + "(" + minModificationTimeMillis + ")");

// get the files in the done folder,
// need to traverse dirs under done recursively for versions
Expand All @@ -344,14 +385,14 @@ public int run(String[] args) throws Exception {
LOG.info("Batch count: " + batchCount);
for (int b = 0; b < batchCount; b++) {
processBatch(jobFileStatusses, b, batchSize, processRecordService,
cluster, outputPath);
cluster, processingDirectoryPath);
}

} finally {
processRecordService.close();
}

Statistics statistics = FileSystem.getStatistics(inputPath.toUri()
Statistics statistics = FileSystem.getStatistics(baseInputPath.toUri()
.getScheme(), hdfs.getClass());
if (statistics != null) {
LOG.info("HDFS bytes read: " + statistics.getBytesRead());
Expand Down