From df99df103752c0127969f7c0ecdc3edeeb4d10d4 Mon Sep 17 00:00:00 2001 From: Angad Singh Date: Sat, 28 Jun 2014 17:08:46 +0530 Subject: [PATCH] Refactoring hraven for multiple sink support + generic object model and abstraction for output records of JobFileProcessor's mapper instead of directly emitting Hbase puts at the lowest level of code hierarchy. Added graphite sink and refactored hbase storage to work as a sink. Changes no hraven behavior. --- bin/etl/hraven-etl.sh | 4 +- bin/etl/jobFileProcessor.sh | 6 +- .../main/java/com/twitter/hraven/AppKey.java | 28 +- .../java/com/twitter/hraven/Constants.java | 27 ++ .../main/java/com/twitter/hraven/FlowKey.java | 22 +- .../java/com/twitter/hraven/HravenRecord.java | 172 ++++++++ .../com/twitter/hraven/HravenService.java | 32 ++ .../twitter/hraven/JobHistoryRawRecord.java | 40 ++ .../com/twitter/hraven/JobHistoryRecord.java | 51 +++ .../hraven/JobHistoryRecordCollection.java | 245 ++++++++++++ .../twitter/hraven/JobHistoryTaskRecord.java | 49 +++ .../main/java/com/twitter/hraven/JobId.java | 23 +- .../main/java/com/twitter/hraven/JobKey.java | 19 +- .../com/twitter/hraven/QualifiedJobId.java | 22 +- .../com/twitter/hraven/RecordCategory.java | 6 + .../com/twitter/hraven/RecordDataKey.java | 82 ++++ .../main/java/com/twitter/hraven/TaskKey.java | 21 +- .../datasource/JobHistoryRawService.java | 4 + .../hraven/datasource/JobHistoryService.java | 40 +- .../com/twitter/hraven/util/EnumWritable.java | 32 ++ .../com/twitter/hraven/util/Serializer.java | 22 ++ .../com/twitter/hraven/util/StringUtil.java | 1 + .../datasource/TestJobHistoryService.java | 81 ++-- .../hraven/etl/JobFilePreprocessor.java | 28 +- .../twitter/hraven/etl/JobFileProcessor.java | 79 +++- .../hraven/etl/JobHistoryFileParser.java | 11 +- .../hraven/etl/JobHistoryFileParserBase.java | 18 +- .../etl/JobHistoryFileParserHadoop1.java | 17 +- .../etl/JobHistoryFileParserHadoop2.java | 351 ++++++++--------- .../hraven/etl/ProcessRecordService.java | 4 +- .../java/com/twitter/hraven/etl/Sink.java | 33 ++ .../mapreduce/GraphiteHistoryWriter.java | 366 ++++++++++++++++++ .../mapreduce/GraphiteOutputFormat.java | 198 ++++++++++ .../hraven/mapreduce/HbaseHistoryWriter.java | 105 +++++ .../hraven/mapreduce/HbaseOutputFormat.java | 130 +++++++ .../hraven/mapreduce/JobFileTableMapper.java | 239 ++++++++---- .../hraven/mapreduce/JobHistoryListener.java | 179 +++++---- .../etl/TestJobHistoryFileParserHadoop1.java | 6 +- .../etl/TestJobHistoryFileParserHadoop2.java | 99 ++--- .../hraven/etl/TestJobHistoryListener.java | 40 +- 40 files changed, 2353 insertions(+), 579 deletions(-) create mode 100644 hraven-core/src/main/java/com/twitter/hraven/HravenRecord.java create mode 100644 hraven-core/src/main/java/com/twitter/hraven/HravenService.java create mode 100644 hraven-core/src/main/java/com/twitter/hraven/JobHistoryRawRecord.java create mode 100644 hraven-core/src/main/java/com/twitter/hraven/JobHistoryRecord.java create mode 100644 hraven-core/src/main/java/com/twitter/hraven/JobHistoryRecordCollection.java create mode 100644 hraven-core/src/main/java/com/twitter/hraven/JobHistoryTaskRecord.java create mode 100644 hraven-core/src/main/java/com/twitter/hraven/RecordCategory.java create mode 100644 hraven-core/src/main/java/com/twitter/hraven/RecordDataKey.java create mode 100644 hraven-core/src/main/java/com/twitter/hraven/util/EnumWritable.java create mode 100644 hraven-core/src/main/java/com/twitter/hraven/util/Serializer.java create mode 100644 hraven-etl/src/main/java/com/twitter/hraven/etl/Sink.java create mode 100644 hraven-etl/src/main/java/com/twitter/hraven/mapreduce/GraphiteHistoryWriter.java create mode 100644 hraven-etl/src/main/java/com/twitter/hraven/mapreduce/GraphiteOutputFormat.java create mode 100644 hraven-etl/src/main/java/com/twitter/hraven/mapreduce/HbaseHistoryWriter.java create mode 100644 hraven-etl/src/main/java/com/twitter/hraven/mapreduce/HbaseOutputFormat.java diff --git a/bin/etl/hraven-etl.sh b/bin/etl/hraven-etl.sh index b2e2caa..f6f38db 100755 --- a/bin/etl/hraven-etl.sh +++ b/bin/etl/hraven-etl.sh @@ -42,6 +42,8 @@ hbaseconfdir=${HBASE_CONF_DIR:-$HBASE_HOME/conf} # HDFS directories for processing and loading job history data historyRawDir=/yarn/history/done/ historyProcessingDir=/hraven/processing/ +sinks=GRAPHITE,HBASE +jobFileProcessorConfOpts=-Dhraven.sink.graphite.userfilter=rmcuser -Dhraven.sink.graphite.queuefilter=userplatform -Dhraven.sink.graphite.excludedcomponents=MultiInputCounters ####################################################### #If costfile is empty, fill it with default values @@ -71,4 +73,4 @@ $home/jobFilePreprocessor.sh $hadoopconfdir $historyRawDir $historyProcessingDir $home/jobFileLoader.sh $hadoopconfdir $mapredmaxsplitsize $schedulerpoolname $cluster $historyProcessingDir # Process -$home/jobFileProcessor.sh $hbaseconfdir $schedulerpoolname $historyProcessingDir $cluster $threads $batchsize $machinetype $costfile +$home/jobFileProcessor.sh $hbaseconfdir $schedulerpoolname $historyProcessingDir $cluster $threads $batchsize $machinetype $costfile $sinks $jobFileProcessorConfOpts diff --git a/bin/etl/jobFileProcessor.sh b/bin/etl/jobFileProcessor.sh index a657a5d..76e23dc 100755 --- a/bin/etl/jobFileProcessor.sh +++ b/bin/etl/jobFileProcessor.sh @@ -21,9 +21,9 @@ # [schedulerpoolname] [historyprocessingdir] [cluster] [threads] [batchsize] [machinetype] [costfile] # a sample cost file can be found in the conf dir as sampleCostDetails.properties -if [ $# -ne 8 ] +if [ $# -ne 10 ] then - echo "Usage: `basename $0` [hbaseconfdir] [schedulerpoolname] [historyprocessingdir] [cluster] [threads] [batchsize] [machinetype] [costfile]" + echo "Usage: `basename $0` [hbaseconfdir] [schedulerpoolname] [historyprocessingdir] [cluster] [threads] [batchsize] [machinetype] [costfile] [sinks] [jobfileprocessorconf]" exit 1 fi @@ -40,5 +40,5 @@ fi create_pidfile $HRAVEN_PID_DIR trap 'cleanup_pidfile_and_exit $HRAVEN_PID_DIR' INT TERM EXIT -hadoop --config $1 jar $hravenEtlJar com.twitter.hraven.etl.JobFileProcessor -libjars=$LIBJARS -Dmapred.fairscheduler.pool=$2 -d -p $3 -c $4 -t $5 -b $6 -m $7 -z $8 +hadoop --config $1 jar $hravenEtlJar com.twitter.hraven.etl.JobFileProcessor -libjars=$LIBJARS -Dmapred.fairscheduler.pool=$2 ${10} -d -p $3 -c $4 -t $5 -b $6 -m $7 -z $8 -s $9 diff --git a/hraven-core/src/main/java/com/twitter/hraven/AppKey.java b/hraven-core/src/main/java/com/twitter/hraven/AppKey.java index 53e8fce..bda6606 100644 --- a/hraven-core/src/main/java/com/twitter/hraven/AppKey.java +++ b/hraven-core/src/main/java/com/twitter/hraven/AppKey.java @@ -16,27 +16,33 @@ package com.twitter.hraven; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + import org.apache.commons.lang.builder.CompareToBuilder; import org.apache.commons.lang.builder.HashCodeBuilder; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.WritableComparable; import org.codehaus.jackson.annotate.JsonCreator; import org.codehaus.jackson.annotate.JsonProperty; -public class AppKey implements Comparable { +public class AppKey implements WritableComparable { /** * The cluster on which the application ran */ - protected final String cluster; + protected String cluster; /** * Who ran the application on Hadoop */ - protected final String userName; + protected String userName; /** * The thing that identifies an application, * such as Pig script identifier, or Scalding identifier. */ - protected final String appId; + protected String appId; @JsonCreator public AppKey(@JsonProperty("cluster") String cluster, @JsonProperty("userName") String userName, @@ -111,4 +117,18 @@ public int hashCode() { .toHashCode(); } + @Override + public void write(DataOutput out) throws IOException { + Text.writeString(out, this.cluster); + Text.writeString(out, this.userName); + Text.writeString(out, this.appId); + } + + @Override + public void readFields(DataInput in) throws IOException { + this.cluster = Text.readString(in); + this.userName = Text.readString(in); + this.appId = Text.readString(in); + } + } diff --git a/hraven-core/src/main/java/com/twitter/hraven/Constants.java b/hraven-core/src/main/java/com/twitter/hraven/Constants.java index 9e8653a..6e22f61 100644 --- a/hraven-core/src/main/java/com/twitter/hraven/Constants.java +++ b/hraven-core/src/main/java/com/twitter/hraven/Constants.java @@ -31,6 +31,8 @@ public class Constants { public static final char SEP_CHAR = '!'; public static final String SEP = "" + SEP_CHAR; public static final byte[] SEP_BYTES = Bytes.toBytes(SEP); + + public static final String PERIOD_SEP_CHAR = "."; // common default values public static final byte[] EMPTY_BYTES = new byte[0]; @@ -426,4 +428,29 @@ public class Constants { /** name of the properties file used for cluster to cluster identifier mapping */ public static final String HRAVEN_CLUSTER_PROPERTIES_FILENAME = "hRavenClusters.properties"; + + public static final String JOBCONF_SINKS = "hraven.sinks"; + + public static final String JOBCONF_GRAPHITE_HOST_KEY = "hraven.sink.graphite.host"; + public static final String JOBCONF_GRAPHITE_PORT_KEY = "hraven.sink.graphite.port"; + public static final String JOBCONF_GRAPHITE_PREFIX = "hraven.sink.graphite.prefix"; + public static final String JOBCONF_GRAPHITE_USER_FILTER = "hraven.sink.graphite.userfilter"; + public static final String JOBCONF_GRAPHITE_QUEUE_FILTER = "hraven.sink.graphite.queuefilter"; + public static final String JOBCONF_GRAPHITE_EXCLUDED_COMPONENTS = "hraven.sink.graphite.excludedcomponents"; + public static final String JOBCONF_GRAPHITE_DONOTEXCLUDE_APPS = "hraven.sink.graphite.donotexcludeapps"; + + public static final int GRAPHITE_DEFAULT_PORT = 2003; + public static final String GRAPHITE_DEFAULT_HOST = "localhost"; + public static final String GRAPHITE_DEFAULT_PREFIX = "DEFAULT"; + + public static final String GRAPHITE_KEY_MAPPING_TABLE = PREFIX + "graphite_key_mapping"; + public static final byte[] GRAPHITE_KEY_MAPPING_TABLE_BYTES = Bytes.toBytes(GRAPHITE_KEY_MAPPING_TABLE); + + public static final String GRAPHITE_REVERSE_KEY_MAPPING_TABLE = PREFIX + "graphite_key_mapping_r"; + public static final byte[] GRAPHITE_REVERSE_KEY_MAPPING_TABLE_BYTES = Bytes.toBytes(GRAPHITE_REVERSE_KEY_MAPPING_TABLE); + + public static final String GRAPHITE_KEY_MAPPING_COLUMN = "k"; + public static final byte[] GRAPHITE_KEY_MAPPING_COLUMN_BYTES = Bytes.toBytes(GRAPHITE_KEY_MAPPING_COLUMN); + + } diff --git a/hraven-core/src/main/java/com/twitter/hraven/FlowKey.java b/hraven-core/src/main/java/com/twitter/hraven/FlowKey.java index ad87e04..1322dc1 100644 --- a/hraven-core/src/main/java/com/twitter/hraven/FlowKey.java +++ b/hraven-core/src/main/java/com/twitter/hraven/FlowKey.java @@ -15,12 +15,19 @@ */ package com.twitter.hraven; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + import org.apache.commons.lang.builder.CompareToBuilder; import org.apache.commons.lang.builder.HashCodeBuilder; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.WritableComparable; import org.codehaus.jackson.annotate.JsonCreator; import org.codehaus.jackson.annotate.JsonProperty; -public class FlowKey extends AppKey implements Comparable { +public class FlowKey extends AppKey implements WritableComparable { /** * Identifying one single run of a version of an app. Smaller values indicate @@ -111,4 +118,17 @@ public int hashCode(){ .toHashCode(); } + @Override + public void write(DataOutput out) throws IOException { + super.write(out); + new LongWritable(this.runId).write(out); + } + + @Override + public void readFields(DataInput in) throws IOException { + super.readFields(in); + LongWritable lw = new LongWritable(); + lw.readFields(in); + this.runId = lw.get(); + } } diff --git a/hraven-core/src/main/java/com/twitter/hraven/HravenRecord.java b/hraven-core/src/main/java/com/twitter/hraven/HravenRecord.java new file mode 100644 index 0000000..b338571 --- /dev/null +++ b/hraven-core/src/main/java/com/twitter/hraven/HravenRecord.java @@ -0,0 +1,172 @@ +package com.twitter.hraven; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Writable; + +import com.twitter.hraven.util.EnumWritable; +import com.twitter.hraven.util.Serializer; + +/** + * + * @author angad.singh + * + * {@link JobFileTableMapper outputs this as value. It corresponds to the + * Put record which was earlier emitted + * + * @param key type + * @param type of dataValue object to be stored + */ + +public abstract class HravenRecord implements Writable{ + private K key; + private RecordCategory dataCategory; + private RecordDataKey dataKey; + private V dataValue; + private Long submitTime; + + public HravenRecord() { + + } + + public K getKey() { + return key; + } + + public void setKey(K key) { + this.key = key; + } + + public RecordCategory getDataCategory() { + return dataCategory; + } + + public void setDataCategory(RecordCategory dataCategory) { + this.dataCategory = dataCategory; + } + + public RecordDataKey getDataKey() { + return dataKey; + } + + public void setDataKey(RecordDataKey dataKey) { + this.dataKey = dataKey; + } + + public V getDataValue() { + return dataValue; + } + + public void setDataValue(V dataValue) { + this.dataValue = dataValue; + } + + public Long getSubmitTime() { + return submitTime; + } + + public void setSubmitTime(Long submitTime) { + this.submitTime = submitTime; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((dataCategory == null) ? 0 : dataCategory.hashCode()); + result = prime * result + ((dataKey == null) ? 0 : dataKey.hashCode()); + result = prime * result + ((dataValue == null) ? 0 : dataValue.hashCode()); + result = prime * result + ((key == null) ? 0 : key.hashCode()); + result = prime * result + (int) (submitTime ^ (submitTime >>> 32)); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + HravenRecord other = (HravenRecord) obj; + if (dataCategory != other.dataCategory) { + return false; + } + if (dataKey == null) { + if (other.dataKey != null) { + return false; + } + } else if (!dataKey.equals(other.dataKey)) { + return false; + } + if (dataValue == null) { + if (other.dataValue != null) { + return false; + } + } else if (!dataValue.equals(other.dataValue)) { + return false; + } + if (key == null) { + if (other.key != null) { + return false; + } + } else if (!key.equals(other.key)) { + return false; + } + if (submitTime != other.submitTime) { + return false; + } + return true; + } + + @Override + public String toString() { + return "HravenRecord [key=" + key + ", dataCategory=" + dataCategory + ", dataKey=" + dataKey + + ", dataValue=" + dataValue + ", submitTime=" + submitTime + "]"; + } + + @Override + public void write(DataOutput out) throws IOException { + //key + this.key.write(out); + //dataCategory + new EnumWritable(this.dataCategory).write(out); + //dataKey + this.dataKey.write(out); + //dataValue + byte[] b = Serializer.serialize(this.dataValue); + out.writeInt(b.length); + out.write(b); + //submitTime + new LongWritable(this.submitTime).write(out); + } + + @Override + public void readFields(DataInput in) throws IOException { + //key + this.key.readFields(in); + //dataCategory + new EnumWritable(this.dataCategory).readFields(in); + //dataKey + this.dataKey.readFields(in); + //dataValue + byte[] b = new byte[in.readInt()]; + in.readFully(b); + try { + this.dataValue = (V) Serializer.deserialize(b); + } catch (ClassNotFoundException e) { + throw new RuntimeException("Failure in deserializing HravenRecord.dataValue"); + } + //submitTime + LongWritable lw = new LongWritable(); + lw.readFields(in); + this.submitTime = lw.get(); + } +} diff --git a/hraven-core/src/main/java/com/twitter/hraven/HravenService.java b/hraven-core/src/main/java/com/twitter/hraven/HravenService.java new file mode 100644 index 0000000..886aed9 --- /dev/null +++ b/hraven-core/src/main/java/com/twitter/hraven/HravenService.java @@ -0,0 +1,32 @@ +package com.twitter.hraven; + +/** + * + * @author angad.singh + * + * {@link JobFileTableMapper} outputs this as key. It corresponds to the + * Hbase table which was earlier emitted + */ + +public enum HravenService { + JOB_HISTORY_RAW { + @Override + public HravenRecord getNewRecord() { + return new JobHistoryRawRecord(); + } + }, + JOB_HISTORY { + @Override + public HravenRecord getNewRecord() { + return new JobHistoryRecord(); + } + }, + JOB_HISTORY_TASK { + @Override + public HravenRecord getNewRecord() { + return new JobHistoryTaskRecord(); + } + }; + + public abstract HravenRecord getNewRecord(); +} diff --git a/hraven-core/src/main/java/com/twitter/hraven/JobHistoryRawRecord.java b/hraven-core/src/main/java/com/twitter/hraven/JobHistoryRawRecord.java new file mode 100644 index 0000000..ab65a3f --- /dev/null +++ b/hraven-core/src/main/java/com/twitter/hraven/JobHistoryRawRecord.java @@ -0,0 +1,40 @@ +package com.twitter.hraven; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.io.Text; + +public class JobHistoryRawRecord extends HravenRecord { + + public JobHistoryRawRecord(RecordCategory dataCategory, String key, RecordDataKey dataKey, + Object dataValue) { + this.setKey(new Text(key)); + this.setDataCategory(dataCategory); + this.setDataKey(dataKey); + this.setDataValue(dataValue); + } + + public JobHistoryRawRecord() { + + } + + public JobHistoryRawRecord(String rawKey) { + this.setKey(new Text(rawKey)); + } + + public void set(RecordCategory category, RecordDataKey key, String value) { + this.setDataCategory(category); + this.setDataKey(key); + this.setDataValue(value); + } + + public void write(DataOutput out) throws IOException { + super.write(out); + } + + public void readFields(DataInput in) throws IOException { + super.readFields(in); + } +} diff --git a/hraven-core/src/main/java/com/twitter/hraven/JobHistoryRecord.java b/hraven-core/src/main/java/com/twitter/hraven/JobHistoryRecord.java new file mode 100644 index 0000000..7507c6f --- /dev/null +++ b/hraven-core/src/main/java/com/twitter/hraven/JobHistoryRecord.java @@ -0,0 +1,51 @@ +package com.twitter.hraven; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +/** + * @author angad.singh Abstraction of a record to be stored in the {@link HravenService#JOB_HISTORY} + * service. Was earlier directly written as an Hbase put + */ + +public class JobHistoryRecord extends HravenRecord { + + public JobHistoryRecord(RecordCategory dataCategory, JobKey key, RecordDataKey dataKey, + Object dataValue) { + this.setKey(key); + this.setDataCategory(dataCategory); + this.setDataKey(dataKey); + this.setDataValue(dataValue); + } + + public JobHistoryRecord(RecordCategory dataCategory, JobKey key, RecordDataKey dataKey, + Object dataValue, Long submitTime) { + this(dataCategory, key, dataKey, dataValue); + setSubmitTime(submitTime); + } + + public JobHistoryRecord() { + + } + + public JobHistoryRecord(JobKey jobKey) { + this.setKey(jobKey); + } + + public void set(RecordCategory category, RecordDataKey key, String value) { + this.setDataCategory(category); + this.setDataKey(key); + this.setDataValue(value); + } + + @Override + public void write(DataOutput out) throws IOException { + super.write(out); + } + + @Override + public void readFields(DataInput in) throws IOException { + super.readFields(in); + } +} diff --git a/hraven-core/src/main/java/com/twitter/hraven/JobHistoryRecordCollection.java b/hraven-core/src/main/java/com/twitter/hraven/JobHistoryRecordCollection.java new file mode 100644 index 0000000..b926e9e --- /dev/null +++ b/hraven-core/src/main/java/com/twitter/hraven/JobHistoryRecordCollection.java @@ -0,0 +1,245 @@ +package com.twitter.hraven; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; + +/** + * @author angad.singh Store multiple {@link JobHistoryRecord}s in a 2 level HashMap Supports + * iteration to get individual {@link JobHistoryRecord}s + */ + +public class JobHistoryRecordCollection extends HravenRecord implements + Collection { + + private Map> valueMap; + + public JobHistoryRecordCollection() { + valueMap = new HashMap>(); + } + + public JobHistoryRecordCollection(JobKey jobKey) { + setKey(jobKey); + valueMap = new HashMap>(); + } + + public JobHistoryRecordCollection(JobHistoryRecord record) { + valueMap = new HashMap>(); + setKey(record.getKey()); + setSubmitTime(record.getSubmitTime()); + add(record); + } + + public boolean add(RecordCategory category, RecordDataKey key, Object value) { + if (valueMap.containsKey(category)) { + valueMap.get(category).put(key, value); + } else { + HashMap categoryMap = new HashMap(); + valueMap.put(category, categoryMap); + categoryMap.put(key, value); + } + + return true; + } + + public boolean add(JobHistoryRecord record) { + return add(record.getDataCategory(), record.getDataKey(), record.getDataValue()); + } + + public Map> getValueMap() { + return valueMap; + } + + public Object getValue(RecordCategory category, RecordDataKey key) { + return valueMap.containsKey(category) ? valueMap.get(category).get(key) : null; + } + + public int size() { + int size = 0; + for (Entry> catMap : valueMap.entrySet()) { + size += catMap.getValue().size(); + } + + return size; + } + + /** + * Be able to iterate easily to get individual {@link JobHistoryRecord}s + */ + + @Override + public Iterator iterator() { + + return new Iterator() { + + private Iterator>> catIterator; + private Iterator> dataIterator; + Entry> nextCat; + Entry nextData; + + { + initIterators(); + } + + private void initIterators() { + if (catIterator == null) { + catIterator = valueMap.entrySet().iterator(); + } + + if (catIterator.hasNext()) { + nextCat = catIterator.next(); + dataIterator = nextCat.getValue().entrySet().iterator(); + } + } + + @Override + public boolean hasNext() { + if (dataIterator == null) { + initIterators(); + } + return dataIterator == null ? false : dataIterator.hasNext() || catIterator.hasNext(); + } + + @Override + public JobHistoryRecord next() { + if (!dataIterator.hasNext()) { + nextCat = catIterator.next(); + dataIterator = nextCat.getValue().entrySet().iterator(); + } + + nextData = dataIterator.next(); + + return new JobHistoryRecord(nextCat.getKey(), getKey(), nextData.getKey(), + nextData.getValue(), getSubmitTime()); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + + }; + } + + public void mergeWith(JobHistoryRecordCollection confRecord) { + for (JobHistoryRecord record : confRecord) { + add(record); + } + } + + @Override + public int hashCode() { + final int prime = 31; + int result = super.hashCode(); + result = prime * result + ((valueMap == null) ? 0 : valueMap.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (!super.equals(obj)) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + JobHistoryRecordCollection other = (JobHistoryRecordCollection) obj; + if (valueMap == null) { + if (other.valueMap != null) { + return false; + } + } else if (!valueMap.equals(other.valueMap)) { + return false; + } + return true; + } + + @Override + public String toString() { + return "JobHistoryRecordCollection [key=" + getKey() + ", dataCategory=" + getDataCategory() + ", dataKey=" + getDataKey() + + ", dataValue=" + getDataValue() + ", submitTime=" + getSubmitTime() + ", valueMap=" + valueMap + "]"; + } + + @Override + public boolean isEmpty() { + return valueMap.isEmpty(); + } + + @Override + public boolean contains(Object o) { + throw new UnsupportedOperationException(); + } + + @Override + public Object[] toArray() { + throw new UnsupportedOperationException(); + } + + @Override + public T[] toArray(T[] a) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean remove(Object o) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean containsAll(Collection c) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean addAll(Collection recordCol) { + for (JobHistoryRecord record : recordCol) { + add(record); + } + return true; + } + + @Override + public boolean removeAll(Collection c) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean retainAll(Collection c) { + throw new UnsupportedOperationException(); + } + + @Override + public void clear() { + this.valueMap = null; + } + + public void write(DataOutput out) throws IOException { + super.write(out); + + out.writeInt(this.size()); + for (JobHistoryRecord r: this) { + r.write(out); + } + } + + public void readFields(DataInput in) throws IOException { + super.readFields(in); + + int size = in.readInt(); + int i = 0; + while (i < size) { + JobHistoryRecord r = new JobHistoryRecord(); + r.readFields(in); + this.add(r); + i++; + } + } +} \ No newline at end of file diff --git a/hraven-core/src/main/java/com/twitter/hraven/JobHistoryTaskRecord.java b/hraven-core/src/main/java/com/twitter/hraven/JobHistoryTaskRecord.java new file mode 100644 index 0000000..24f5bc7 --- /dev/null +++ b/hraven-core/src/main/java/com/twitter/hraven/JobHistoryTaskRecord.java @@ -0,0 +1,49 @@ +package com.twitter.hraven; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +/** + * @author angad.singh Abstraction of a record to be stored in the + * {@link HravenService#JOB_HISTORY_TASK} service. + */ + +public class JobHistoryTaskRecord extends HravenRecord { + + public JobHistoryTaskRecord(RecordCategory dataCategory, TaskKey key, RecordDataKey dataKey, + Object dataValue) { + this.setKey(key); + this.setDataCategory(dataCategory); + this.setDataKey(dataKey); + this.setDataValue(dataValue); + } + + public JobHistoryTaskRecord(RecordCategory dataCategory, TaskKey key, RecordDataKey dataKey, + Object dataValue, Long submitTime) { + this(dataCategory, key, dataKey, dataValue); + setSubmitTime(submitTime); + } + + public JobHistoryTaskRecord() { + + } + + public JobHistoryTaskRecord(TaskKey jobKey) { + this.setKey(jobKey); + } + + public void set(RecordCategory category, RecordDataKey key, String value) { + this.setDataCategory(category); + this.setDataKey(key); + this.setDataValue(value); + } + + public void write(DataOutput out) throws IOException { + super.write(out); + } + + public void readFields(DataInput in) throws IOException { + super.readFields(in); + } +} diff --git a/hraven-core/src/main/java/com/twitter/hraven/JobId.java b/hraven-core/src/main/java/com/twitter/hraven/JobId.java index 887a0a5..0457796 100644 --- a/hraven-core/src/main/java/com/twitter/hraven/JobId.java +++ b/hraven-core/src/main/java/com/twitter/hraven/JobId.java @@ -15,8 +15,14 @@ */ package com.twitter.hraven; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + import org.apache.commons.lang.builder.CompareToBuilder; import org.apache.commons.lang.builder.HashCodeBuilder; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.WritableComparable; import org.codehaus.jackson.annotate.JsonCreator; import org.codehaus.jackson.annotate.JsonProperty; @@ -25,7 +31,7 @@ * apart. The jobtracker ID is parsed as: job_[epoch]_[sequence] * */ -public class JobId implements Comparable { +public class JobId implements WritableComparable { protected static final String JOB_ID_SEP = "_"; /** * The jobtracker start time from the job ID, obtained from parsing the @@ -136,4 +142,19 @@ public int hashCode(){ .append(this.jobSequence) .toHashCode(); } + + @Override + public void write(DataOutput out) throws IOException { + new LongWritable(this.jobEpoch).write(out); + new LongWritable(this.jobSequence).write(out); + } + + @Override + public void readFields(DataInput in) throws IOException { + LongWritable lw = new LongWritable(); + lw.readFields(in); + this.jobEpoch = lw.get(); + lw.readFields(in); + this.jobSequence = lw.get(); + } } diff --git a/hraven-core/src/main/java/com/twitter/hraven/JobKey.java b/hraven-core/src/main/java/com/twitter/hraven/JobKey.java index 8a9d427..7f52734 100644 --- a/hraven-core/src/main/java/com/twitter/hraven/JobKey.java +++ b/hraven-core/src/main/java/com/twitter/hraven/JobKey.java @@ -15,8 +15,13 @@ */ package com.twitter.hraven; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + import org.apache.commons.lang.builder.CompareToBuilder; import org.apache.commons.lang.builder.HashCodeBuilder; +import org.apache.hadoop.io.WritableComparable; import org.codehaus.jackson.annotate.JsonCreator; import org.codehaus.jackson.annotate.JsonProperty; @@ -24,7 +29,7 @@ * Represents the row key for a given job. Row keys are stored as: username ! * appid ! version ! runid ! jobid */ -public class JobKey extends FlowKey implements Comparable{ +public class JobKey extends FlowKey implements WritableComparable{ /** * Fully qualified cluster + parsed job identifier @@ -142,4 +147,16 @@ public int hashCode(){ .toHashCode(); } + @Override + public void write(DataOutput out) throws IOException { + super.write(out); + this.jobId.write(out); + } + + @Override + public void readFields(DataInput in) throws IOException { + super.readFields(in); + this.jobId.readFields(in); + } + } diff --git a/hraven-core/src/main/java/com/twitter/hraven/QualifiedJobId.java b/hraven-core/src/main/java/com/twitter/hraven/QualifiedJobId.java index 36f762f..4374e98 100644 --- a/hraven-core/src/main/java/com/twitter/hraven/QualifiedJobId.java +++ b/hraven-core/src/main/java/com/twitter/hraven/QualifiedJobId.java @@ -15,6 +15,12 @@ */ package com.twitter.hraven; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; import org.codehaus.jackson.annotate.JsonCreator; import org.codehaus.jackson.annotate.JsonProperty; @@ -25,12 +31,12 @@ * current one). This class represents the fully qualified job identifier. * */ -public class QualifiedJobId extends JobId { +public class QualifiedJobId extends JobId implements Writable { /** * The Hadoop cluster on which the job ran. */ - private final String cluster; + private String cluster; /** * Constructor. @@ -57,4 +63,16 @@ public String getCluster() { return cluster; } + @Override + public void write(DataOutput out) throws IOException { + super.write(out); + Text.writeString(out, this.cluster); + } + + @Override + public void readFields(DataInput in) throws IOException { + super.readFields(in); + this.cluster = Text.readString(in); + } + } diff --git a/hraven-core/src/main/java/com/twitter/hraven/RecordCategory.java b/hraven-core/src/main/java/com/twitter/hraven/RecordCategory.java new file mode 100644 index 0000000..31e3380 --- /dev/null +++ b/hraven-core/src/main/java/com/twitter/hraven/RecordCategory.java @@ -0,0 +1,6 @@ +package com.twitter.hraven; + +public enum RecordCategory { + HISTORY_COUNTER, HISTORY_META, HISTORY_TASK_COUNTER, HISTORY_TASK_META, CONF, CONF_META, META, + INFERRED +} \ No newline at end of file diff --git a/hraven-core/src/main/java/com/twitter/hraven/RecordDataKey.java b/hraven-core/src/main/java/com/twitter/hraven/RecordDataKey.java new file mode 100644 index 0000000..e940f14 --- /dev/null +++ b/hraven-core/src/main/java/com/twitter/hraven/RecordDataKey.java @@ -0,0 +1,82 @@ +package com.twitter.hraven; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.apache.hadoop.io.ArrayWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.util.StringUtils; + +public class RecordDataKey implements Writable { + private List components; + + public RecordDataKey(String... components) { + this.components = Arrays.asList(components); + } + + public RecordDataKey(String firstComponent) { + this.components = new ArrayList(); + this.components.add(firstComponent); + } + + public void add(String component) { + this.components.add(component); + } + + public String get(int index) { + return components.get(index); + } + + public List getComponents() { + return components; + } + + @Override + public int hashCode() { + return 1 + ((components == null) ? 0 : components.hashCode()); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + RecordDataKey other = (RecordDataKey) obj; + if (components == null) { + if (other.components != null) { + return false; + } + } else if (!components.equals(other.components)) { + return false; + } + return true; + } + + @Override + public String toString() { + return StringUtils.join(Constants.PERIOD_SEP_CHAR, components); + } + + @Override + public void write(DataOutput out) throws IOException { + new ArrayWritable((String[])this.components.toArray()).write(out); + } + + @Override + public void readFields(DataInput in) throws IOException { + ArrayWritable a = new ArrayWritable(Text.class); + a.readFields(in); + this.components = Arrays.asList(a.toStrings()); + } +} diff --git a/hraven-core/src/main/java/com/twitter/hraven/TaskKey.java b/hraven-core/src/main/java/com/twitter/hraven/TaskKey.java index 479500d..16a47f2 100644 --- a/hraven-core/src/main/java/com/twitter/hraven/TaskKey.java +++ b/hraven-core/src/main/java/com/twitter/hraven/TaskKey.java @@ -15,13 +15,18 @@ */ package com.twitter.hraven; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + import org.apache.commons.lang.builder.CompareToBuilder; import org.apache.commons.lang.builder.HashCodeBuilder; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.WritableComparable; import org.codehaus.jackson.annotate.JsonCreator; import org.codehaus.jackson.annotate.JsonProperty; import org.codehaus.jackson.map.annotate.JsonSerialize; - /** * Represents the row key for an individual job task. This key shares all the * same components from the job key, with the additional of the task ID: @@ -32,7 +37,7 @@ @JsonSerialize( include=JsonSerialize.Inclusion.NON_NULL ) -public class TaskKey extends JobKey implements Comparable { +public class TaskKey extends JobKey implements WritableComparable { private String taskId; @JsonCreator @@ -83,4 +88,16 @@ public int hashCode(){ .append(this.taskId) .toHashCode(); } + + @Override + public void write(DataOutput out) throws IOException { + super.write(out); + Text.writeString(out, this.taskId); + } + + @Override + public void readFields(DataInput in) throws IOException { + super.readFields(in); + this.taskId = Text.readString(in); + } } diff --git a/hraven-core/src/main/java/com/twitter/hraven/datasource/JobHistoryRawService.java b/hraven-core/src/main/java/com/twitter/hraven/datasource/JobHistoryRawService.java index 4c6fef7..539dd7f 100644 --- a/hraven-core/src/main/java/com/twitter/hraven/datasource/JobHistoryRawService.java +++ b/hraven-core/src/main/java/com/twitter/hraven/datasource/JobHistoryRawService.java @@ -551,4 +551,8 @@ public byte[] getJobHistoryRawFromResult(Result value) throws MissingColumnInRes byte[] jobHistoryRaw = keyValue.getValue(); return jobHistoryRaw; } + + public HTable getTable() { + return rawTable; + } } diff --git a/hraven-core/src/main/java/com/twitter/hraven/datasource/JobHistoryService.java b/hraven-core/src/main/java/com/twitter/hraven/datasource/JobHistoryService.java index 12e83bd..d46525c 100644 --- a/hraven-core/src/main/java/com/twitter/hraven/datasource/JobHistoryService.java +++ b/hraven-core/src/main/java/com/twitter/hraven/datasource/JobHistoryService.java @@ -680,8 +680,7 @@ public static CounterMap parseCounters(byte[] prefix, * * @throws IllegalArgumentException if neither config param is found */ - static void setHravenQueueNamePut(Configuration jobConf, Put jobPut, - JobKey jobKey, byte[] jobConfColumnPrefix) { + static void setHravenQueueNameRecord(Configuration jobConf, JobHistoryRecordCollection recordCollection, JobKey jobKey) { String hRavenQueueName = HadoopConfUtil.getQueueName(jobConf); if (hRavenQueueName.equalsIgnoreCase(Constants.DEFAULT_VALUE_QUEUENAME)){ @@ -692,9 +691,8 @@ static void setHravenQueueNamePut(Configuration jobConf, Put jobPut, // set the "queue" property defined by hRaven // this makes it independent of hadoop version config parameters - byte[] column = Bytes.add(jobConfColumnPrefix, Constants.HRAVEN_QUEUE_BYTES); - jobPut.add(Constants.INFO_FAM_BYTES, column, - Bytes.toBytes(hRavenQueueName)); + recordCollection.add(RecordCategory.CONF_META, new RecordDataKey( + Constants.HRAVEN_QUEUE), hRavenQueueName); } /** @@ -708,40 +706,28 @@ static void setHravenQueueNamePut(Configuration jobConf, Put jobPut, * the job configuration * @return puts for the given job configuration */ - public static List getHbasePuts(JobDesc jobDesc, Configuration jobConf) { - List puts = new LinkedList(); - + public static JobHistoryRecordCollection getConfRecord(JobDesc jobDesc, Configuration jobConf) { JobKey jobKey = new JobKey(jobDesc); - byte[] jobKeyBytes = new JobKeyConverter().toBytes(jobKey); // Add all columns to one put - Put jobPut = new Put(jobKeyBytes); - jobPut.add(Constants.INFO_FAM_BYTES, Constants.VERSION_COLUMN_BYTES, - Bytes.toBytes(jobDesc.getVersion())); - jobPut.add(Constants.INFO_FAM_BYTES, Constants.FRAMEWORK_COLUMN_BYTES, - Bytes.toBytes(jobDesc.getFramework().toString())); + JobHistoryRecordCollection recordCollection = new JobHistoryRecordCollection(jobKey); - // Avoid doing string to byte conversion inside loop. - byte[] jobConfColumnPrefix = Bytes.toBytes(Constants.JOB_CONF_COLUMN_PREFIX - + Constants.SEP); + recordCollection.add(RecordCategory.CONF_META, new RecordDataKey(Constants.VERSION_COLUMN), + jobDesc.getVersion()); + recordCollection.add(RecordCategory.CONF_META, new RecordDataKey(Constants.FRAMEWORK_COLUMN), jobDesc + .getFramework().toString()); - // Create puts for all the parameters in the job configuration + // Create records for all the parameters in the job configuration Iterator> jobConfIterator = jobConf.iterator(); while (jobConfIterator.hasNext()) { Entry entry = jobConfIterator.next(); - // Prefix the job conf entry column with an indicator to - byte[] column = Bytes.add(jobConfColumnPrefix, - Bytes.toBytes(entry.getKey())); - jobPut.add(Constants.INFO_FAM_BYTES, column, - Bytes.toBytes(entry.getValue())); + recordCollection.add(RecordCategory.CONF, new RecordDataKey(entry.getKey()), entry.getValue()); } // ensure pool/queuename is set correctly - setHravenQueueNamePut(jobConf, jobPut, jobKey, jobConfColumnPrefix); - - puts.add(jobPut); + setHravenQueueNameRecord(jobConf, recordCollection, jobKey); - return puts; + return recordCollection; } /** diff --git a/hraven-core/src/main/java/com/twitter/hraven/util/EnumWritable.java b/hraven-core/src/main/java/com/twitter/hraven/util/EnumWritable.java new file mode 100644 index 0000000..08ea12f --- /dev/null +++ b/hraven-core/src/main/java/com/twitter/hraven/util/EnumWritable.java @@ -0,0 +1,32 @@ +package com.twitter.hraven.util; + +import java.io.*; +import org.apache.hadoop.io.*; + +public class EnumWritable> implements WritableComparable { + private Class cls; + private E value; + + @SuppressWarnings("unchecked") + public EnumWritable(E value) { + this.cls = (Class) value.getClass(); + this.value = value; + } + + public E getValue() { + return value; + } + + public void readFields(DataInput input) throws IOException { + value = WritableUtils.readEnum(input, cls); + } + + public void write(DataOutput output) throws IOException { + WritableUtils.writeEnum(output, value); + } + + @Override + public int compareTo(E o) { + return this.value.compareTo(o); + } +} \ No newline at end of file diff --git a/hraven-core/src/main/java/com/twitter/hraven/util/Serializer.java b/hraven-core/src/main/java/com/twitter/hraven/util/Serializer.java new file mode 100644 index 0000000..0e878f6 --- /dev/null +++ b/hraven-core/src/main/java/com/twitter/hraven/util/Serializer.java @@ -0,0 +1,22 @@ +package com.twitter.hraven.util; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; + +public class Serializer { + public static byte[] serialize(Object obj) throws IOException { + ByteArrayOutputStream b = new ByteArrayOutputStream(); + ObjectOutputStream o = new ObjectOutputStream(b); + o.writeObject(obj); + return b.toByteArray(); + } + + public static Object deserialize(byte[] bytes) throws IOException, ClassNotFoundException { + ByteArrayInputStream b = new ByteArrayInputStream(bytes); + ObjectInputStream o = new ObjectInputStream(b); + return o.readObject(); + } +} \ No newline at end of file diff --git a/hraven-core/src/main/java/com/twitter/hraven/util/StringUtil.java b/hraven-core/src/main/java/com/twitter/hraven/util/StringUtil.java index aaf1bd1..0839326 100644 --- a/hraven-core/src/main/java/com/twitter/hraven/util/StringUtil.java +++ b/hraven-core/src/main/java/com/twitter/hraven/util/StringUtil.java @@ -36,6 +36,7 @@ public class StringUtil { public static String cleanseToken(String token) { if (token == null || token.length() == 0) { return token; }; + token = token.trim(); String cleansed = token.replaceAll(SPACE, UNDERSCORE); cleansed = cleansed.replaceAll(Constants.SEP, UNDERSCORE); diff --git a/hraven-core/src/test/java/com/twitter/hraven/datasource/TestJobHistoryService.java b/hraven-core/src/test/java/com/twitter/hraven/datasource/TestJobHistoryService.java index 1c8905e..71d9a3c 100644 --- a/hraven-core/src/test/java/com/twitter/hraven/datasource/TestJobHistoryService.java +++ b/hraven-core/src/test/java/com/twitter/hraven/datasource/TestJobHistoryService.java @@ -40,7 +40,11 @@ import com.twitter.hraven.GenerateFlowTestData; import com.twitter.hraven.HadoopVersion; import com.twitter.hraven.JobDetails; +import com.twitter.hraven.JobHistoryRecordCollection; +import com.twitter.hraven.JobHistoryRecord; import com.twitter.hraven.JobKey; +import com.twitter.hraven.RecordCategory; +import com.twitter.hraven.RecordDataKey; import com.twitter.hraven.datasource.JobHistoryByIdService; import com.twitter.hraven.datasource.JobHistoryService; import com.twitter.hraven.datasource.HRavenTestUtil; @@ -311,22 +315,6 @@ public void testRemoveJob() throws Exception { } } - private void assertFoundOnce(byte[] column, Put jobPut, int expectedSize, - String expectedValue) { - boolean foundUserName = false; - List kv1 = jobPut.get(Constants.INFO_FAM_BYTES, column); - assertEquals(expectedSize, kv1.size()); - for (KeyValue kv : kv1) { - assertEquals(Bytes.toString(kv.getValue()), expectedValue); - // ensure we don't see the same put twice - assertFalse(foundUserName); - // now set this to true - foundUserName = true; - } - // ensure that we got the user name - assertTrue(foundUserName); - } - @Test public void testSetHravenQueueName() throws FileNotFoundException { @@ -339,19 +327,17 @@ public void testSetHravenQueueName() throws FileNotFoundException { String USERNAME = "user"; JobKey jobKey = new JobKey("cluster1", USERNAME, "Sleep", 1, "job_1329348432655_0001"); - byte[] jobKeyBytes = new JobKeyConverter().toBytes(jobKey); - Put jobPut = new Put(jobKeyBytes); - byte[] jobConfColumnPrefix = Bytes.toBytes(Constants.JOB_CONF_COLUMN_PREFIX - + Constants.SEP); + + JobHistoryRecordCollection recordCollection = new JobHistoryRecordCollection(jobKey); - assertEquals(jobPut.size(), 0); + assertEquals(recordCollection.size(), 0); // check queuename matches user name since the conf has // value "default" as the queuename - JobHistoryService.setHravenQueueNamePut(jobConf, jobPut, jobKey, jobConfColumnPrefix); - assertEquals(jobPut.size(), 1); - byte[] column = Bytes.add(jobConfColumnPrefix, Constants.HRAVEN_QUEUE_BYTES); - assertFoundOnce(column, jobPut, 1, USERNAME); + JobHistoryService.setHravenQueueNameRecord(jobConf, recordCollection, jobKey); + assertEquals(recordCollection.size(), 1); + assertEquals(recordCollection.getValue(RecordCategory.CONF_META, new RecordDataKey( + Constants.HRAVEN_QUEUE)), USERNAME); // populate the jobConf with all types of queue name parameters String expH2QName = "hadoop2queue"; @@ -363,36 +349,39 @@ public void testSetHravenQueueName() throws FileNotFoundException { // now check queuename is correctly set as hadoop2 queue name // even when the fairscheduler and capacity scheduler are set - jobPut = new Put(jobKeyBytes); - assertEquals(jobPut.size(), 0); - JobHistoryService.setHravenQueueNamePut(jobConf, jobPut, jobKey, jobConfColumnPrefix); - assertEquals(jobPut.size(), 1); - assertFoundOnce(column, jobPut, 1, expH2QName); + recordCollection = new JobHistoryRecordCollection(jobKey); + assertEquals(recordCollection.size(), 0); + JobHistoryService.setHravenQueueNameRecord(jobConf, recordCollection, jobKey); + assertEquals(recordCollection.size(), 1); + assertEquals(recordCollection.getValue(RecordCategory.CONF_META, new RecordDataKey( + Constants.HRAVEN_QUEUE)), expH2QName); // now unset hadoop2 queuename, expect fairscheduler name to be used as queuename jobConf.set(Constants.QUEUENAME_HADOOP2, ""); - jobPut = new Put(jobKeyBytes); - assertEquals(jobPut.size(), 0); - JobHistoryService.setHravenQueueNamePut(jobConf, jobPut, jobKey, jobConfColumnPrefix); - assertEquals(jobPut.size(), 1); - assertFoundOnce(column, jobPut, 1, expH1PoolName); + recordCollection = new JobHistoryRecordCollection(jobKey); + assertEquals(recordCollection.size(), 0); + JobHistoryService.setHravenQueueNameRecord(jobConf, recordCollection, jobKey); + assertEquals(recordCollection.size(), 1); + assertEquals(recordCollection.getValue(RecordCategory.CONF_META, new RecordDataKey( + Constants.HRAVEN_QUEUE)), expH1PoolName); // now unset fairscheduler name, expect capacity scheduler to be used as queuename jobConf.set(Constants.FAIR_SCHEDULER_POOLNAME_HADOOP1, ""); - jobPut = new Put(jobKeyBytes); - assertEquals(jobPut.size(), 0); - JobHistoryService.setHravenQueueNamePut(jobConf, jobPut, jobKey, jobConfColumnPrefix); - assertEquals(jobPut.size(), 1); - assertFoundOnce(column, jobPut, 1, capacityH1QName); + recordCollection = new JobHistoryRecordCollection(jobKey); + assertEquals(recordCollection.size(), 0); + JobHistoryService.setHravenQueueNameRecord(jobConf, recordCollection, jobKey); + assertEquals(recordCollection.size(), 1); + assertEquals(recordCollection.getValue(RecordCategory.CONF_META, new RecordDataKey( + Constants.HRAVEN_QUEUE)), capacityH1QName); // now unset capacity scheduler, expect default_queue to be used as queuename jobConf.set(Constants.CAPACITY_SCHEDULER_QUEUENAME_HADOOP1, ""); - jobPut = new Put(jobKeyBytes); - assertEquals(jobPut.size(), 0); - JobHistoryService.setHravenQueueNamePut(jobConf, jobPut, jobKey, jobConfColumnPrefix); - assertEquals(jobPut.size(), 1); - assertFoundOnce(column, jobPut, 1, Constants.DEFAULT_QUEUENAME); - + recordCollection = new JobHistoryRecordCollection(jobKey); + assertEquals(recordCollection.size(), 0); + JobHistoryService.setHravenQueueNameRecord(jobConf, recordCollection, jobKey); + assertEquals(recordCollection.size(), 1); + assertEquals(recordCollection.getValue(RecordCategory.CONF_META, new RecordDataKey( + Constants.HRAVEN_QUEUE)), Constants.DEFAULT_QUEUENAME); } private void assertJob(JobDetails expected, JobDetails actual) { diff --git a/hraven-etl/src/main/java/com/twitter/hraven/etl/JobFilePreprocessor.java b/hraven-etl/src/main/java/com/twitter/hraven/etl/JobFilePreprocessor.java index c34f85a..96e8161 100644 --- a/hraven-etl/src/main/java/com/twitter/hraven/etl/JobFilePreprocessor.java +++ b/hraven-etl/src/main/java/com/twitter/hraven/etl/JobFilePreprocessor.java @@ -18,6 +18,8 @@ import java.io.IOException; import java.util.Arrays; import java.util.Date; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; @@ -214,15 +216,13 @@ public int run(String[] args) throws Exception { // Output should be an hdfs path. FileSystem hdfs = FileSystem.get(hbaseConf); - // Grab the input path argument - String output = commandLine.getOptionValue("o"); - LOG.info(" output=" + output); - Path outputPath = new Path(output); - FileStatus outputFileStatus = hdfs.getFileStatus(outputPath); - - if (!outputFileStatus.isDir()) { - throw new IOException("Output is not a directory" - + outputFileStatus.getPath().getName()); + // Grab the output path argument + String processingDirectory = commandLine.getOptionValue("o"); + LOG.info("output: " + processingDirectory); + Path processingDirectoryPath = new Path(processingDirectory); + + if (!hdfs.exists(processingDirectoryPath)) { + hdfs.mkdirs(processingDirectoryPath); } // Grab the input path argument @@ -253,6 +253,8 @@ public int run(String[] args) throws Exception { } else { batchSize = DEFAULT_BATCH_SIZE; } + + LOG.info("Batch size: " + batchSize); boolean forceAllFiles = commandLine.hasOption("f"); LOG.info("forceAllFiles: " + forceAllFiles); @@ -267,7 +269,7 @@ public int run(String[] args) throws Exception { // Grab the cluster argument String cluster = commandLine.getOptionValue("c"); - LOG.info("cluster=" + cluster); + LOG.info("cluster: " + cluster); /** * Grab the size of huge files to be moved argument @@ -277,7 +279,7 @@ public int run(String[] args) throws Exception { * {@link https://github.com/twitter/hraven/issues/59} */ String maxFileSizeStr = commandLine.getOptionValue("s"); - LOG.info("maxFileSize=" + maxFileSizeStr); + LOG.info("maxFileSize: " + maxFileSizeStr); long maxFileSize = DEFAULT_RAW_FILE_SIZE_LIMIT; try { maxFileSize = Long.parseLong(maxFileSizeStr); @@ -297,7 +299,7 @@ public int run(String[] args) throws Exception { if (!forceAllFiles) { lastProcessRecord = processRecordService - .getLastSuccessfulProcessRecord(cluster); + .getLastSuccessfulProcessRecord(cluster, processingDirectory); } long minModificationTimeMillis = 0; @@ -344,7 +346,7 @@ public int run(String[] args) throws Exception { LOG.info("Batch count: " + batchCount); for (int b = 0; b < batchCount; b++) { processBatch(jobFileStatusses, b, batchSize, processRecordService, - cluster, outputPath); + cluster, processingDirectoryPath); } } finally { diff --git a/hraven-etl/src/main/java/com/twitter/hraven/etl/JobFileProcessor.java b/hraven-etl/src/main/java/com/twitter/hraven/etl/JobFileProcessor.java index 6ace3d9..67b2bc9 100644 --- a/hraven-etl/src/main/java/com/twitter/hraven/etl/JobFileProcessor.java +++ b/hraven-etl/src/main/java/com/twitter/hraven/etl/JobFileProcessor.java @@ -18,7 +18,11 @@ import static com.twitter.hraven.etl.ProcessState.LOADED; import static com.twitter.hraven.etl.ProcessState.PROCESSED; +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; import java.util.Date; import java.util.LinkedList; import java.util.List; @@ -28,6 +32,8 @@ import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicInteger; +import javax.annotation.Nullable; + import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; import org.apache.commons.cli.HelpFormatter; @@ -35,6 +41,7 @@ import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; import org.apache.commons.cli.PosixParser; +import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -45,7 +52,9 @@ import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.mapreduce.MultiTableOutputFormat; +import org.apache.hadoop.hbase.mapreduce.TableInputFormat; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; +import org.apache.hadoop.hbase.util.Base64; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.Tool; @@ -53,6 +62,8 @@ import org.apache.log4j.Level; import org.apache.log4j.Logger; +import com.google.common.base.Function; +import com.google.common.collect.Collections2; import com.twitter.hraven.Constants; import com.twitter.hraven.datasource.JobHistoryRawService; import com.twitter.hraven.etl.ProcessRecordService; @@ -73,6 +84,7 @@ public class JobFileProcessor extends Configured implements Tool { .format(new Date(System.currentTimeMillis())); private final AtomicInteger jobCounter = new AtomicInteger(0); + private List sinks; /** * Maximum number of files to process in one batch. @@ -119,7 +131,7 @@ private static CommandLine parseArgs(String[] args) throws ParseException { "r", "reprocess", false, - "Reprocess only those records that have been marked to be reprocessed. Otherwise process all rows indicated in the processing records, but successfully processed job files are skipped."); + "Process only those records that have been marked to be reprocessed. Otherwise process all rows indicated in the processing records, but successfully processed job files are skipped."); o.setRequired(false); options.addOption(o); @@ -165,6 +177,12 @@ private static CommandLine parseArgs(String[] args) throws ParseException { o.setArgName("machinetype"); o.setRequired(true); options.addOption(o); + + // Sinks + o = new Option("s", "sinks", true, "Comma seperated list of sinks (currently supported sinks: hbase, graphite)"); + o.setArgName("sinks"); + o.setRequired(true); + options.addOption(o); CommandLineParser parser = new PosixParser(); CommandLine commandLine = null; @@ -203,6 +221,33 @@ public int run(String[] args) throws Exception { // Grab the arguments we're looking for. CommandLine commandLine = parseArgs(otherArgs); + if (StringUtils.isNotBlank(commandLine.getOptionValue("s"))) { + String[] splits = commandLine.getOptionValue("s").split(","); + + if (splits.length > 0) { + sinks = + new ArrayList(Collections2.transform( + Arrays.asList(splits), new Function() { + + @Override + @Nullable + public Sink apply(@Nullable String input) { + try { + return Sink.valueOf(input); + } catch (IllegalArgumentException e) { + throw new IllegalArgumentException("Sink '" + input + "' is incorrect."); + } + } + })); + } + } + + if (sinks == null) { + throw new IllegalArgumentException("Incorrect value for sinks. Provide a comma-seperated list of sinks"); + } + + LOG.info("send data to sink=" + this.sinks.toString()); + // Grab the cluster argument String cluster = commandLine.getOptionValue("c"); LOG.info("cluster=" + cluster); @@ -456,7 +501,7 @@ private List getProcessRecords(Configuration conf, processRecords = processRecordService.getProcessRecords(cluster, LOADED, Integer.MAX_VALUE, processFileSubstring); - LOG.info("Processing " + processRecords.size() + " for: " + cluster); + LOG.info("Processing " + processRecords.size() + " processRecords for: " + cluster); } catch (IOException ioe) { caught = ioe; } finally { @@ -581,6 +626,13 @@ private List getJobRunners(Configuration conf, String cluster, } + static String convertScanToString(Scan scan) throws IOException { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(out); + scan.write(dos); + return Base64.encodeBytes(out.toByteArray()); + } + /** * @param conf * to use to create and run the job @@ -609,12 +661,25 @@ private Job getProcessingJob(Configuration conf, Scan scan, int totalJobCount) // This is a map-only class, skip reduce step job.setNumReduceTasks(0); job.setJarByClass(JobFileProcessor.class); + job.setMapperClass(JobFileTableMapper.class); + job.setInputFormatClass(TableInputFormat.class); + job.setMapOutputKeyClass(JobFileTableMapper.getOutputKeyClass()); + job.setMapOutputValueClass(JobFileTableMapper.getOutputValueClass()); + job.getConfiguration().set(TableInputFormat.INPUT_TABLE, Constants.HISTORY_RAW_TABLE); + job.getConfiguration().set(TableInputFormat.SCAN, + convertScanToString(scan)); + TableMapReduceUtil.addDependencyJars(job); + HBaseConfiguration.addHbaseResources(job.getConfiguration()); + + //TODO: find a better way. reason: just so that it doesn't default to TextOutputFormat job.setOutputFormatClass(MultiTableOutputFormat.class); - - TableMapReduceUtil.initTableMapperJob(Constants.HISTORY_RAW_TABLE, scan, - JobFileTableMapper.class, JobFileTableMapper.getOutputKeyClass(), - JobFileTableMapper.getOutputValueClass(), job); - + + for (Sink sink : sinks) { + sink.configureJob(job); + } + + job.getConfiguration().set(Constants.JOBCONF_SINKS, StringUtils.join(sinks, ",")); + return job; } diff --git a/hraven-etl/src/main/java/com/twitter/hraven/etl/JobHistoryFileParser.java b/hraven-etl/src/main/java/com/twitter/hraven/etl/JobHistoryFileParser.java index 044acc0..0ccc040 100644 --- a/hraven-etl/src/main/java/com/twitter/hraven/etl/JobHistoryFileParser.java +++ b/hraven-etl/src/main/java/com/twitter/hraven/etl/JobHistoryFileParser.java @@ -15,8 +15,7 @@ */ package com.twitter.hraven.etl; -import java.util.List; -import org.apache.hadoop.hbase.client.Put; +import java.util.Collection; import com.twitter.hraven.JobKey; import com.twitter.hraven.datasource.ProcessingException; @@ -48,16 +47,16 @@ public interface JobHistoryFileParser { * Return the generated list of job puts assembled when history file is * parsed * - * @return a list of jobPuts + * @return a collection of JobHistoryRecords */ - public List getJobPuts(); + public Collection getJobRecords(); /** * Return the generated list of task puts assembled when history file is * parsed * - * @return a list of taskPuts + * @return a list of JobHistoryTaskRecords */ - public List getTaskPuts(); + public Collection getTaskRecords(); } diff --git a/hraven-etl/src/main/java/com/twitter/hraven/etl/JobHistoryFileParserBase.java b/hraven-etl/src/main/java/com/twitter/hraven/etl/JobHistoryFileParserBase.java index 83e3f42..c958b1f 100644 --- a/hraven-etl/src/main/java/com/twitter/hraven/etl/JobHistoryFileParserBase.java +++ b/hraven-etl/src/main/java/com/twitter/hraven/etl/JobHistoryFileParserBase.java @@ -19,12 +19,15 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.util.Bytes; import com.twitter.hraven.Constants; import com.twitter.hraven.HadoopVersion; import com.twitter.hraven.JobHistoryKeys; +import com.twitter.hraven.JobHistoryRecord; +import com.twitter.hraven.JobKey; +import com.twitter.hraven.RecordCategory; +import com.twitter.hraven.RecordDataKey; import com.twitter.hraven.datasource.ProcessingException; import com.twitter.hraven.util.ByteUtil; @@ -54,15 +57,12 @@ protected JobHistoryFileParserBase(Configuration conf) { * * @return Put */ - public Put getHadoopVersionPut(HadoopVersion historyFileVersion, byte[] jobKeyBytes) { - Put pVersion = new Put(jobKeyBytes); - byte[] valueBytes = null; - valueBytes = Bytes.toBytes(historyFileVersion.toString()); - byte[] qualifier = Bytes.toBytes(JobHistoryKeys.hadoopversion.toString().toLowerCase()); - pVersion.add(Constants.INFO_FAM_BYTES, qualifier, valueBytes); - return pVersion; + public JobHistoryRecord getHadoopVersionRecord(HadoopVersion historyFileVersion, JobKey jobKey) { + return new JobHistoryRecord(RecordCategory.HISTORY_META, jobKey, + new RecordDataKey(JobHistoryKeys.hadoopversion.toString() + .toLowerCase()), historyFileVersion.toString()); } - + /** * extract the string around Xmx in the java child opts " -Xmx1024m -verbose:gc" * @param javaChildOptsStr diff --git a/hraven-etl/src/main/java/com/twitter/hraven/etl/JobHistoryFileParserHadoop1.java b/hraven-etl/src/main/java/com/twitter/hraven/etl/JobHistoryFileParserHadoop1.java index 8955175..529c88e 100644 --- a/hraven-etl/src/main/java/com/twitter/hraven/etl/JobHistoryFileParserHadoop1.java +++ b/hraven-etl/src/main/java/com/twitter/hraven/etl/JobHistoryFileParserHadoop1.java @@ -17,6 +17,7 @@ import java.io.ByteArrayInputStream; import java.io.IOException; +import java.util.Collection; import java.util.List; import org.apache.commons.logging.Log; @@ -25,6 +26,8 @@ import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.mapred.JobHistoryCopy; import com.twitter.hraven.Constants; +import com.twitter.hraven.JobHistoryRecordCollection; +import com.twitter.hraven.JobHistoryRecord; import com.twitter.hraven.JobKey; import com.twitter.hraven.datasource.ProcessingException; import com.twitter.hraven.mapreduce.JobHistoryListener; @@ -56,9 +59,9 @@ public void parse(byte[] historyFile, JobKey jobKey) throws ProcessingException jobHistoryListener = new JobHistoryListener(jobKey); JobHistoryCopy.parseHistoryFromIS(new ByteArrayInputStream(historyFile), jobHistoryListener); // set the hadoop version for this record - Put versionPut = getHadoopVersionPut(JobHistoryFileParserFactory.getHistoryFileVersion1(), - jobHistoryListener.getJobKeyBytes()); - jobHistoryListener.includeHadoopVersionPut(versionPut); + JobHistoryRecord versionRecord = getHadoopVersionRecord(JobHistoryFileParserFactory.getHistoryFileVersion1(), + jobHistoryListener.getJobKey()); + jobHistoryListener.includeHadoopVersionRecord(versionRecord); } catch (IOException ioe) { LOG.error(" Exception during parsing hadoop 1.0 file ", ioe); throw new ProcessingException( @@ -72,9 +75,9 @@ public void parse(byte[] historyFile, JobKey jobKey) throws ProcessingException * {@inheritDoc} */ @Override - public List getJobPuts() { + public Collection getJobRecords() { if (jobHistoryListener != null) { - return jobHistoryListener.getJobPuts(); + return jobHistoryListener.getJobRecords(); } else { return null; } @@ -84,9 +87,9 @@ public List getJobPuts() { * {@inheritDoc} */ @Override - public List getTaskPuts() { + public Collection getTaskRecords() { if (jobHistoryListener != null) { - return jobHistoryListener.getTaskPuts(); + return jobHistoryListener.getTaskRecords(); } else { return null; } diff --git a/hraven-etl/src/main/java/com/twitter/hraven/etl/JobHistoryFileParserHadoop2.java b/hraven-etl/src/main/java/com/twitter/hraven/etl/JobHistoryFileParserHadoop2.java index 34bfb06..4e9c1fb 100644 --- a/hraven-etl/src/main/java/com/twitter/hraven/etl/JobHistoryFileParserHadoop2.java +++ b/hraven-etl/src/main/java/com/twitter/hraven/etl/JobHistoryFileParserHadoop2.java @@ -10,12 +10,12 @@ import java.io.EOFException; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; import java.util.Map; import java.util.Set; @@ -26,14 +26,10 @@ import org.apache.avro.io.Decoder; import org.apache.avro.io.DecoderFactory; import org.apache.commons.configuration.ConversionException; -import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.mapred.JobHistoryCopy.RecordTypes; import org.codehaus.jettison.json.JSONArray; import org.codehaus.jettison.json.JSONException; @@ -42,7 +38,12 @@ import com.google.common.collect.Maps; import com.twitter.hraven.Constants; import com.twitter.hraven.JobHistoryKeys; +import com.twitter.hraven.JobHistoryRecord; +import com.twitter.hraven.JobHistoryRecordCollection; +import com.twitter.hraven.JobHistoryTaskRecord; import com.twitter.hraven.JobKey; +import com.twitter.hraven.RecordCategory; +import com.twitter.hraven.RecordDataKey; import com.twitter.hraven.TaskKey; import com.twitter.hraven.util.ByteArrayWrapper; import com.twitter.hraven.datasource.JobKeyConverter; @@ -59,8 +60,8 @@ public class JobHistoryFileParserHadoop2 extends JobHistoryFileParserBase { /** Job ID, minus the leading "job_" */ private String jobNumber = ""; private byte[] jobKeyBytes; - private List jobPuts = new LinkedList(); - private List taskPuts = new LinkedList(); + private Collection jobRecords; + private Collection taskRecords = new ArrayList(); boolean uberized = false; /** @@ -223,6 +224,11 @@ public void parse(byte[] historyFileContents, JobKey jobKey) throws ProcessingException { this.jobKey = jobKey; + + if (this.jobRecords == null) { + this.jobRecords = new JobHistoryRecordCollection(jobKey); + } + this.jobKeyBytes = jobKeyConv.toBytes(jobKey); setJobId(jobKey.getJobId().getJobIdString()); @@ -293,27 +299,24 @@ public void parse(byte[] historyFileContents, JobKey jobKey) * events are not so we need to look through the whole file to confirm * the job status and then generate the put */ - Put jobStatusPut = getJobStatusPut(); - this.jobPuts.add(jobStatusPut); + this.jobRecords.add(getJobStatusRecord()); // set the hadoop version for this record - Put versionPut = getHadoopVersionPut(JobHistoryFileParserFactory.getHistoryFileVersion2(), this.jobKeyBytes); - this.jobPuts.add(versionPut); + JobHistoryRecord versionRecord = + getHadoopVersionRecord(JobHistoryFileParserFactory.getHistoryFileVersion2(), this.jobKey); + this.jobRecords.add(versionRecord); - LOG.info("For " + this.jobKey + " #jobPuts " + jobPuts.size() + " #taskPuts: " - + taskPuts.size()); + LOG.info("For " + this.jobKey + " #jobPuts " + jobRecords.size() + " #taskPuts: " + + taskRecords.size()); } /** * generates a put for job status * @return Put that contains Job Status */ - private Put getJobStatusPut() { - Put pStatus = new Put(jobKeyBytes); - byte[] valueBytes = Bytes.toBytes(this.jobStatus); - byte[] qualifier = Bytes.toBytes(JobHistoryKeys.JOB_STATUS.toString().toLowerCase()); - pStatus.add(Constants.INFO_FAM_BYTES, qualifier, valueBytes); - return pStatus; + private JobHistoryRecord getJobStatusRecord() { + return new JobHistoryRecord(RecordCategory.HISTORY_META, this.jobKey, new RecordDataKey( + JobHistoryKeys.JOB_STATUS.toString().toLowerCase()), this.jobStatus); } /** @@ -377,7 +380,7 @@ private void understandSchema(String schema) throws JSONException { * "counts":[ { "name":"MAP_INPUT_RECORDS", "displayName":"Map input records", "value":10 }, { * "name":"MAP_OUTPUT_RECORDS", "displayName":"Map output records", "value":10 } ] } ] } */ - private void processCounters(Put p, JSONObject eventDetails, String key) { + private void processCounters(JSONObject eventDetails, String key) { try { JSONObject jsonCounters = eventDetails.getJSONObject(key); @@ -388,8 +391,21 @@ private void processCounters(Put p, JSONObject eventDetails, String key) { JSONArray counts = aCounter.getJSONArray(COUNTS); for (int j = 0; j < counts.length(); j++) { JSONObject countDetails = counts.getJSONObject(j); - populatePut(p, Constants.INFO_FAM_BYTES, counterMetaGroupName, aCounter.get(NAME) - .toString(), countDetails.get(NAME).toString(), countDetails.getLong(VALUE)); + //counterMetaGroupName; + String groupName = aCounter.get(NAME).toString(); + String counterName = countDetails.get(NAME).toString(); + Long counterValue = countDetails.getLong(VALUE); + + /** + * correct and populate map and reduce slot millis + */ + if ((Constants.SLOTS_MILLIS_MAPS.equals(counterName)) || + (Constants.SLOTS_MILLIS_REDUCES.equals(counterName))) { + counterValue = getStandardizedCounterValue(counterName, counterValue); + } + + this.jobRecords.add(new JobHistoryRecord(RecordCategory.HISTORY_COUNTER, this.jobKey, + new RecordDataKey(counterMetaGroupName, groupName, counterName), counterValue)); } } } catch (JSONException e) { @@ -403,57 +419,57 @@ private void processCounters(Put p, JSONObject eventDetails, String key) { * @throws JSONException */ private void - processAllTypes(Put p, Hadoop2RecordType recType, JSONObject eventDetails, String key) + processAllTypes(Hadoop2RecordType recType, JSONObject eventDetails, String dataKey, RecordAdder adder) throws JSONException { - if (COUNTER_NAMES.contains(key)) { - processCounters(p, eventDetails, key); + if (COUNTER_NAMES.contains(dataKey)) { + processCounters(eventDetails, dataKey); } else { - String type = fieldTypes.get(recType).get(key); + String type = fieldTypes.get(recType).get(dataKey); if (type.equalsIgnoreCase(TYPE_STRING)) { // look for job status if (JobHistoryKeys.JOB_STATUS.toString().equals( - JobHistoryKeys.HADOOP2_TO_HADOOP1_MAPPING.get(key))) { + JobHistoryKeys.HADOOP2_TO_HADOOP1_MAPPING.get(dataKey))) { // store it only if it's one of the terminal state events if ((recType.equals(Hadoop2RecordType.JobFinished)) || (recType.equals(Hadoop2RecordType.JobUnsuccessfulCompletion))) { - this.jobStatus = eventDetails.getString(key); + this.jobStatus = eventDetails.getString(dataKey); } } else { - String value = eventDetails.getString(key); - populatePut(p, Constants.INFO_FAM_BYTES, key, value); + String value = eventDetails.getString(dataKey); + populateRecord(dataKey, value, adder); } } else if (type.equalsIgnoreCase(TYPE_LONG)) { - long value = eventDetails.getLong(key); - populatePut(p, Constants.INFO_FAM_BYTES, key, value); + long value = eventDetails.getLong(dataKey); + populateRecord(dataKey, value, adder); // populate start time of the job for megabytemillis calculations if ((recType.equals(Hadoop2RecordType.JobInited)) && - LAUNCH_TIME_KEY_STR.equals(JobHistoryKeys.HADOOP2_TO_HADOOP1_MAPPING.get(key))) { + LAUNCH_TIME_KEY_STR.equals(JobHistoryKeys.HADOOP2_TO_HADOOP1_MAPPING.get(dataKey))) { this.startTime = value; } // populate end time of the job for megabytemillis calculations if ((recType.equals(Hadoop2RecordType.JobFinished)) || (recType.equals(Hadoop2RecordType.JobUnsuccessfulCompletion))) { - if (FINISH_TIME_KEY_STR.equals(JobHistoryKeys.HADOOP2_TO_HADOOP1_MAPPING.get(key))) { + if (FINISH_TIME_KEY_STR.equals(JobHistoryKeys.HADOOP2_TO_HADOOP1_MAPPING.get(dataKey))) { this.endTime = value; } } } else if (type.equalsIgnoreCase(TYPE_INT)) { - int value = eventDetails.getInt(key); - populatePut(p, Constants.INFO_FAM_BYTES, key, value); + int value = eventDetails.getInt(dataKey); + populateRecord(dataKey, value, adder); } else if (type.equalsIgnoreCase(TYPE_BOOLEAN)) { - boolean value = eventDetails.getBoolean(key); - populatePut(p, Constants.INFO_FAM_BYTES, key, Boolean.toString(value)); + boolean value = eventDetails.getBoolean(dataKey); + populateRecord(dataKey, Boolean.toString(value), adder); } else if (type.equalsIgnoreCase(TYPE_ARRAY_INTS)) { - String value = eventDetails.getString(key); - populatePut(p, Constants.INFO_FAM_BYTES, key, value); + String value = eventDetails.getString(dataKey); + populateRecord(dataKey, value, adder); } else if (type.equalsIgnoreCase(NULL_STRING)) { // usually seen in FAILED tasks - String value = eventDetails.getString(key); - populatePut(p, Constants.INFO_FAM_BYTES, key, value); + String value = eventDetails.getString(dataKey); + populateRecord(dataKey, value, adder); } else if (type.equalsIgnoreCase(TYPE_MAP_STRINGS)) { - JSONObject ms = new JSONObject(eventDetails.get(key).toString()); - populatePut(p, Constants.INFO_FAM_BYTES, key, ms.toString()); + JSONObject ms = new JSONObject(eventDetails.get(dataKey).toString()); + populateRecord(dataKey, ms.toString(), adder); } else { throw new ProcessingException("Encountered a new type " + type + " unable to complete processing " + this.jobKey); @@ -465,15 +481,19 @@ private void processCounters(Put p, JSONObject eventDetails, String key) { * iterate over the event details and prepare puts * @throws JSONException */ - private void iterateAndPreparePuts(JSONObject eventDetails, Put p, Hadoop2RecordType recType) + private void iterateAndAddRecords(JSONObject eventDetails, Hadoop2RecordType recType, RecordAdder adder) throws JSONException { Iterator keys = eventDetails.keys(); while (keys.hasNext()) { - String key = (String) keys.next(); - processAllTypes(p, recType, eventDetails, key); + String dataKey = (String) keys.next(); + processAllTypes(recType, eventDetails, dataKey, adder); } } + private interface RecordAdder { + public void addRecord(RecordDataKey key, Object value, boolean isNumeric); + } + /** * process individual records * @throws JSONException @@ -492,65 +512,95 @@ private void processRecords(Hadoop2RecordType recType, JSONObject eventDetails) case JobStatusChanged: case JobSubmitted: case JobUnsuccessfulCompletion: - Put pJob = new Put(this.jobKeyBytes); - iterateAndPreparePuts(eventDetails, pJob, recType); - this.jobPuts.add(pJob); + iterateAndAddRecords(eventDetails, recType, new RecordAdder() { + @Override + public void addRecord(RecordDataKey key, Object value, boolean isNumeric) { + jobRecords.add(new JobHistoryRecord(isNumeric ? RecordCategory.HISTORY_COUNTER + : RecordCategory.HISTORY_META, jobKey, key, value)); + } + }); break; case AMStarted: - byte[] amAttemptIdKeyBytes = - getAMKey(AM_ATTEMPT_PREFIX, eventDetails.getString(APPLICATION_ATTEMPTID)); - // generate a new put per AM Attempt - Put pAM = new Put(amAttemptIdKeyBytes); - pAM.add(Constants.INFO_FAM_BYTES, Constants.RECORD_TYPE_COL_BYTES, - Bytes.toBytes(RecordTypes.Task.toString())); - iterateAndPreparePuts(eventDetails, pAM, recType); - taskPuts.add(pAM); + final TaskKey amAttemptIdKey = + getAMKey(AM_ATTEMPT_PREFIX, eventDetails.getString(APPLICATION_ATTEMPTID)); + // generate a new record per AM Attempt + taskRecords.add(new JobHistoryTaskRecord(RecordCategory.HISTORY_TASK_META, amAttemptIdKey, + new RecordDataKey(Constants.RECORD_TYPE_COL), RecordTypes.Task.toString())); + + iterateAndAddRecords(eventDetails, recType, new RecordAdder() { + @Override + public void addRecord(RecordDataKey key, Object value, boolean isNumeric) { + taskRecords.add(new JobHistoryTaskRecord(isNumeric ? RecordCategory.HISTORY_TASK_COUNTER + : RecordCategory.HISTORY_TASK_META, amAttemptIdKey, key, value)); + } + }); break; case MapAttemptFinished: - byte[] taskMAttemptIdKeyBytes = - getTaskKey(TASK_ATTEMPT_PREFIX, this.jobNumber, eventDetails.getString(ATTEMPTID)); - Put pMTaskAttempt = new Put(taskMAttemptIdKeyBytes); - pMTaskAttempt.add(Constants.INFO_FAM_BYTES, Constants.RECORD_TYPE_COL_BYTES, - Bytes.toBytes(RecordTypes.MapAttempt.toString())); - iterateAndPreparePuts(eventDetails, pMTaskAttempt, recType); - this.taskPuts.add(pMTaskAttempt); + final TaskKey taskMAttemptIdKey = + getTaskKey(TASK_ATTEMPT_PREFIX, this.jobNumber, eventDetails.getString(ATTEMPTID)); + + taskRecords.add(new JobHistoryTaskRecord(RecordCategory.HISTORY_TASK_META, taskMAttemptIdKey, + new RecordDataKey(Constants.RECORD_TYPE_COL), RecordTypes.MapAttempt.toString())); + + iterateAndAddRecords(eventDetails, recType, new RecordAdder() { + @Override + public void addRecord(RecordDataKey key, Object value, boolean isNumeric) { + taskRecords.add(new JobHistoryTaskRecord(isNumeric ? RecordCategory.HISTORY_TASK_COUNTER + : RecordCategory.HISTORY_TASK_META, taskMAttemptIdKey, key, value)); + } + }); break; case ReduceAttemptFinished: - byte[] taskRAttemptIdKeyBytes = - getTaskKey(TASK_ATTEMPT_PREFIX, this.jobNumber, eventDetails.getString(ATTEMPTID)); - Put pRTaskAttempt = new Put(taskRAttemptIdKeyBytes); - pRTaskAttempt.add(Constants.INFO_FAM_BYTES, Constants.RECORD_TYPE_COL_BYTES, - Bytes.toBytes(RecordTypes.ReduceAttempt.toString())); - iterateAndPreparePuts(eventDetails, pRTaskAttempt, recType); - this.taskPuts.add(pRTaskAttempt); + final TaskKey taskRAttemptIdKey = + getTaskKey(TASK_ATTEMPT_PREFIX, this.jobNumber, eventDetails.getString(ATTEMPTID)); + taskRecords.add(new JobHistoryTaskRecord(RecordCategory.HISTORY_TASK_META, taskRAttemptIdKey, + new RecordDataKey(Constants.RECORD_TYPE_COL), RecordTypes.ReduceAttempt.toString())); + + iterateAndAddRecords(eventDetails, recType, new RecordAdder() { + @Override + public void addRecord(RecordDataKey key, Object value, boolean isNumeric) { + taskRecords.add(new JobHistoryTaskRecord(isNumeric ? RecordCategory.HISTORY_TASK_COUNTER + : RecordCategory.HISTORY_TASK_META, taskRAttemptIdKey, key, value)); + } + }); break; case TaskAttemptFinished: case TaskAttemptStarted: case TaskAttemptUnsuccessfulCompletion: - byte[] taskAttemptIdKeyBytes = - getTaskKey(TASK_ATTEMPT_PREFIX, this.jobNumber, eventDetails.getString(ATTEMPTID)); - Put pTaskAttempt = new Put(taskAttemptIdKeyBytes); - pTaskAttempt.add(Constants.INFO_FAM_BYTES, Constants.RECORD_TYPE_COL_BYTES, - Bytes.toBytes(RecordTypes.Task.toString())); - iterateAndPreparePuts(eventDetails, pTaskAttempt, recType); - taskPuts.add(pTaskAttempt); + final TaskKey taskAttemptIdKey = + getTaskKey(TASK_ATTEMPT_PREFIX, this.jobNumber, eventDetails.getString(ATTEMPTID)); + taskRecords.add(new JobHistoryTaskRecord(RecordCategory.HISTORY_TASK_META, taskAttemptIdKey, + new RecordDataKey(Constants.RECORD_TYPE_COL), RecordTypes.Task.toString())); + + iterateAndAddRecords(eventDetails, recType, new RecordAdder() { + @Override + public void addRecord(RecordDataKey key, Object value, boolean isNumeric) { + taskRecords.add(new JobHistoryTaskRecord(isNumeric ? RecordCategory.HISTORY_TASK_COUNTER + : RecordCategory.HISTORY_TASK_META, taskAttemptIdKey, key, value)); + } + }); break; case TaskFailed: case TaskStarted: case TaskUpdated: case TaskFinished: - byte[] taskIdKeyBytes = - getTaskKey(TASK_PREFIX, this.jobNumber, eventDetails.getString(TASKID)); - Put pTask = new Put(taskIdKeyBytes); - pTask.add(Constants.INFO_FAM_BYTES, Constants.RECORD_TYPE_COL_BYTES, - Bytes.toBytes(RecordTypes.Task.toString())); - iterateAndPreparePuts(eventDetails, pTask, recType); - taskPuts.add(pTask); + final TaskKey taskIdKey = + getTaskKey(TASK_PREFIX, this.jobNumber, eventDetails.getString(TASKID)); + taskRecords.add(new JobHistoryTaskRecord(RecordCategory.HISTORY_TASK_META, taskIdKey, + new RecordDataKey(Constants.RECORD_TYPE_COL), RecordTypes.Task.toString())); + + iterateAndAddRecords(eventDetails, recType, new RecordAdder() { + @Override + public void addRecord(RecordDataKey key, Object value, boolean isNumeric) { + taskRecords.add(new JobHistoryTaskRecord(isNumeric ? RecordCategory.HISTORY_TASK_COUNTER + : RecordCategory.HISTORY_TASK_META, taskIdKey, key, value)); + } + }); break; default: LOG.error("Check if recType was modified and has new members?"); @@ -587,12 +637,8 @@ private String getKey(String key) throws IllegalArgumentException { * @param String key * @param long value */ - private void populatePut(Put p, byte[] family, String key, long value) { - - byte[] valueBytes = null; - valueBytes = (value != 0L) ? Bytes.toBytes(value) : Constants.ZERO_LONG_BYTES; - byte[] qualifier = Bytes.toBytes(getKey(key).toLowerCase()); - p.add(family, qualifier, valueBytes); + private void populateRecord(String key, long value, RecordAdder adder) { + adder.addRecord(new RecordDataKey(getKey(key).toLowerCase()), value, true); } /** @@ -602,19 +648,19 @@ private void populatePut(Put p, byte[] family, String key, long value) { * keeping this function package level visible (unit testing) * @throws IllegalArgumentException if new key is encountered */ - byte[] getValue(String key, int value) { - byte[] valueBytes = null; + Object getValue(String key, int value) { + Object valueObject = null; Class clazz = JobHistoryKeys.KEY_TYPES.get(JobHistoryKeys.valueOf(key)); if (clazz == null) { throw new IllegalArgumentException(" unknown key " + key + " encountered while parsing " + this.jobKey); } if (Long.class.equals(clazz)) { - valueBytes = (value != 0L) ? Bytes.toBytes(new Long(value)) : Constants.ZERO_LONG_BYTES; + valueObject = (value != 0L) ? new Long(value) : 0L; } else { - valueBytes = (value != 0) ? Bytes.toBytes(value) : Constants.ZERO_INT_BYTES; + valueObject = (int)value; } - return valueBytes; + return valueObject; } /** @@ -624,14 +670,11 @@ byte[] getValue(String key, int value) { * @param String key * @param int value */ - private void populatePut(Put p, byte[] family, String key, int value) { - + private void populateRecord(String key, int value, RecordAdder adder) { String jobHistoryKey = getKey(key); - byte[] valueBytes = getValue(jobHistoryKey, value); - byte[] qualifier = Bytes.toBytes(jobHistoryKey.toLowerCase()); - p.add(family, qualifier, valueBytes); + adder.addRecord(new RecordDataKey(jobHistoryKey), getValue(jobHistoryKey, value), true); } - + /** * populates a put for string values * @param {@link Put} p @@ -639,72 +682,13 @@ private void populatePut(Put p, byte[] family, String key, int value) { * @param {@link String} key * @param String value */ - private void populatePut(Put p, byte[] family, String key, String value) { - byte[] valueBytes = null; - valueBytes = Bytes.toBytes(value); - byte[] qualifier = Bytes.toBytes(getKey(key).toLowerCase()); - p.add(family, qualifier, valueBytes); - } - - /** - * populates a put for {@link Counters} - * @param {@link Put} p - * @param {@link Constants} family - * @param String key - * @param String groupName - * @param String counterName - * @param long counterValue - */ - private void populatePut(Put p, byte[] family, String key, String groupName, String counterName, - Long counterValue) { - byte[] counterPrefix = null; - - try { - switch (JobHistoryKeys.valueOf(JobHistoryKeys.class, key)) { - case COUNTERS: - case TOTAL_COUNTERS: - case TASK_COUNTERS: - case TASK_ATTEMPT_COUNTERS: - counterPrefix = Bytes.add(Constants.COUNTER_COLUMN_PREFIX_BYTES, Constants.SEP_BYTES); - break; - case MAP_COUNTERS: - counterPrefix = Bytes.add(Constants.MAP_COUNTER_COLUMN_PREFIX_BYTES, Constants.SEP_BYTES); - break; - case REDUCE_COUNTERS: - counterPrefix = - Bytes.add(Constants.REDUCE_COUNTER_COLUMN_PREFIX_BYTES, Constants.SEP_BYTES); - break; - default: - throw new IllegalArgumentException("Unknown counter type " + key.toString()); - } - } catch (IllegalArgumentException iae) { - throw new ProcessingException("Unknown counter type " + key, iae); - } catch (NullPointerException npe) { - throw new ProcessingException("Null counter type " + key, npe); - } - - byte[] groupPrefix = Bytes.add(counterPrefix, Bytes.toBytes(groupName), Constants.SEP_BYTES); - byte[] qualifier = Bytes.add(groupPrefix, Bytes.toBytes(counterName)); - - /** - * correct and populate map and reduce slot millis - */ - if ((Constants.SLOTS_MILLIS_MAPS.equals(counterName)) || - (Constants.SLOTS_MILLIS_REDUCES.equals(counterName))) { - counterValue = getStandardizedCounterValue(counterName, counterValue); - } - - p.add(family, qualifier, Bytes.toBytes(counterValue)); - + private void populateRecord(String key, String value, RecordAdder adder) { + adder.addRecord(new RecordDataKey(getKey(key).toLowerCase()), value, false); } private long getMemoryMb(String key) { long memoryMb = 0L; - if (Constants.MAP_MEMORY_MB_CONF_KEY.equals(key)){ - memoryMb = this.jobConf.getLong(key, Constants.DEFAULT_MAP_MEMORY_MB); - }else if (Constants.REDUCE_MEMORY_MB_CONF_KEY.equals(key)){ - memoryMb = this.jobConf.getLong(key, Constants.DEFAULT_REDUCE_MEMORY_MB); - } + memoryMb = this.jobConf.getLong(key, 0L); if (memoryMb == 0L) { throw new ProcessingException( "While correcting slot millis, " + key + " was found to be 0 "); @@ -755,7 +739,7 @@ private Long getStandardizedCounterValue(String counterName, Long counterValue) * Returns the Task ID or Task Attempt ID, stripped of the leading job ID, appended to the job row * key. */ - public byte[] getTaskKey(String prefix, String jobNumber, String fullId) { + public TaskKey getTaskKey(String prefix, String jobNumber, String fullId) { String taskComponent = fullId; if (fullId == null) { taskComponent = ""; @@ -765,50 +749,31 @@ public byte[] getTaskKey(String prefix, String jobNumber, String fullId) { taskComponent = fullId.substring(expectedPrefix.length()); } } - return taskKeyConv.toBytes(new TaskKey(this.jobKey, taskComponent)); + return new TaskKey(this.jobKey, taskComponent); } /** * Returns the AM Attempt id stripped of the leading job ID, appended to the job row key. */ - public byte[] getAMKey(String prefix, String fullId) { - + public TaskKey getAMKey(String prefix, String fullId) { String taskComponent = prefix + fullId; - return taskKeyConv.toBytes(new TaskKey(this.jobKey, taskComponent)); + return new TaskKey(this.jobKey, taskComponent); } /** * {@inheritDoc} */ @Override - public List getJobPuts() { - return jobPuts; + public Collection getJobRecords() { + return jobRecords; } /** * {@inheritDoc} */ @Override - public List getTaskPuts() { - return taskPuts; - } - - /** - * utitlity function for printing all puts - */ - public void printAllPuts(List p) { - for (Put p1 : p) { - Map> d = p1.getFamilyMap(); - for (byte[] k : d.keySet()) { - System.out.println(" k " + Bytes.toString(k)); - } - for (List lkv : d.values()) { - for (KeyValue kv : lkv) { - System.out.println("\n row: " + taskKeyConv.fromBytes(kv.getRow()) - + "\n " + Bytes.toString(kv.getQualifier()) + ": " + Bytes.toString(kv.getValue())); - } - } - } + public Collection getTaskRecords() { + return taskRecords; } /** diff --git a/hraven-etl/src/main/java/com/twitter/hraven/etl/ProcessRecordService.java b/hraven-etl/src/main/java/com/twitter/hraven/etl/ProcessRecordService.java index fc067b2..85bdfc9 100644 --- a/hraven-etl/src/main/java/com/twitter/hraven/etl/ProcessRecordService.java +++ b/hraven-etl/src/main/java/com/twitter/hraven/etl/ProcessRecordService.java @@ -148,10 +148,10 @@ public void writeJobRecord(ProcessRecord processRecord) throws IOException { * state. * @throws IOException */ - public ProcessRecord getLastSuccessfulProcessRecord(String cluster) + public ProcessRecord getLastSuccessfulProcessRecord(String cluster, String processFileSubstring) throws IOException { List processRecords = getProcessRecords(cluster, NOT_EQUAL, - CREATED, 1, null); + CREATED, 1, processFileSubstring); if (processRecords.size() > 0) { return processRecords.get(0); } diff --git a/hraven-etl/src/main/java/com/twitter/hraven/etl/Sink.java b/hraven-etl/src/main/java/com/twitter/hraven/etl/Sink.java new file mode 100644 index 0000000..ec70048 --- /dev/null +++ b/hraven-etl/src/main/java/com/twitter/hraven/etl/Sink.java @@ -0,0 +1,33 @@ +package com.twitter.hraven.etl; + +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; +import com.twitter.hraven.mapreduce.GraphiteOutputFormat; +import com.twitter.hraven.mapreduce.HbaseOutputFormat; +import com.twitter.hraven.mapreduce.JobFileTableMapper; + +public enum Sink { + + HBASE { + + @Override + public void configureJob(Job job) { + MultipleOutputs.addNamedOutput(job, name(), HbaseOutputFormat.class, + JobFileTableMapper.getOutputKeyClass(), JobFileTableMapper.getOutputValueClass()); + } + + }, + + GRAPHITE { + + @Override + public void configureJob(Job job) { + MultipleOutputs.addNamedOutput(job, name(), GraphiteOutputFormat.class, + JobFileTableMapper.getOutputKeyClass(), JobFileTableMapper.getOutputValueClass()); + } + + }; + + public abstract void configureJob(Job job); + +} diff --git a/hraven-etl/src/main/java/com/twitter/hraven/mapreduce/GraphiteHistoryWriter.java b/hraven-etl/src/main/java/com/twitter/hraven/mapreduce/GraphiteHistoryWriter.java new file mode 100644 index 0000000..7e0b294 --- /dev/null +++ b/hraven-etl/src/main/java/com/twitter/hraven/mapreduce/GraphiteHistoryWriter.java @@ -0,0 +1,366 @@ +package com.twitter.hraven.mapreduce; + +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Calendar; +import java.util.Date; +import java.util.List; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.apache.commons.lang.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.util.Bytes; + +import com.kenai.jffi.Array; +import com.twitter.hraven.Constants; +import com.twitter.hraven.Framework; +import com.twitter.hraven.HravenService; +import com.twitter.hraven.JobHistoryKeys; +import com.twitter.hraven.JobHistoryRecordCollection; +import com.twitter.hraven.JobHistoryRecord; +import com.twitter.hraven.JobKey; +import com.twitter.hraven.RecordCategory; +import com.twitter.hraven.RecordDataKey; +import com.twitter.hraven.datasource.JobKeyConverter; +import com.twitter.hraven.util.ByteUtil; + +public class GraphiteHistoryWriter { + + private static Log LOG = LogFactory.getLog(GraphiteHistoryWriter.class); + + private final Pattern APPID_PATTERN_OOZIE_LAUNCHER = Pattern.compile("oozie:launcher:T=(.*):W=(.*):A=(.*):ID=(.*)"); + private final Pattern APPID_PATTERN_OOZIE_ACTION = Pattern.compile("oozie:action:T=(.*):W=(.*):A=(.*):ID=[0-9]{7}-[0-9]{15}-oozie-oozi-W(.*)"); + private final Pattern APPID_PATTERN_PIGJOB = Pattern.compile("PigLatin:(.*).pig"); + private static final String GRAPHITE_KEY_FILTER = "[./\\\\\\s,]"; + private static final int PIG_ALIAS_FINGERPRINT_LENGTH = 100; + + private static final String submitTimeKey = JobHistoryKeys.SUBMIT_TIME.toString(); + private static final String launchTimeKey = JobHistoryKeys.LAUNCH_TIME.toString(); + private static final String finishTimeKey = JobHistoryKeys.FINISH_TIME.toString(); + private static final String totalTimeKey = "total_time"; + private static final String runTimeKey = "run_time"; + + private HravenService service; + private JobHistoryRecordCollection recordCollection; + private String prefix; + private StringBuilder lines; + private List userFilter; + private List queueFilter; + private List excludedComponents; + private List doNotExcludeApps; + + private HTable keyMappingTable; + private HTable reverseKeyMappingTable; + + /** + * Writes a single {@link JobHistoryRecord} to the specified {@link HravenService} Passes the + * large multi record of which this record is a part of, so that we can get other contextual + * attributes to use in the graphite metric naming scheme + * @param graphiteKeyMappingTable + * @param serviceKey + * @param userFilter + * @param doNotExcludeApps + * @param jobRecord + * @param multiRecord + * @throws IOException + * @throws InterruptedException + */ + + public GraphiteHistoryWriter(HTable keyMappingTable, HTable reverseKeyMappingTable, String prefix, HravenService serviceKey, + JobHistoryRecordCollection recordCollection, StringBuilder sb, String userFilter, String queueFilter, String excludedComponents, String doNotExcludeApps) { + this.keyMappingTable = keyMappingTable; + this.reverseKeyMappingTable = reverseKeyMappingTable; + this.service = serviceKey; + this.recordCollection = recordCollection; + this.prefix = prefix; + this.lines = sb; + if (StringUtils.isNotEmpty(userFilter)) + this.userFilter = Arrays.asList(userFilter.split(",")); + if (StringUtils.isNotEmpty(queueFilter)) + this.queueFilter = Arrays.asList(queueFilter.split(",")); + if (StringUtils.isNotEmpty(excludedComponents)) + this.excludedComponents = Arrays.asList(excludedComponents.split(",")); + if (StringUtils.isNotEmpty(doNotExcludeApps)) + this.doNotExcludeApps = Arrays.asList(doNotExcludeApps.split(",")); + } + + public int write() throws IOException { + /* + * Send metrics in the format {PREFIX}.{cluster}.{user}.{appId}.{subAppId} {value} + * {submit_time} subAppId is formed differently for each framework. For pig, its the alias + * names and feature used in the job. appId will be parsed with a bunch of known patterns + * (oozie launcher jobs, pig jobs, etc.) + */ + + int lineCount = 0; + + boolean inDoNotExcludeApps = false; + + if (doNotExcludeApps != null) { + for (String appStr: this.doNotExcludeApps) { + if (recordCollection.getKey().getAppId().indexOf(appStr) != -1) { + inDoNotExcludeApps = true; + break; + } + } + } + + if ( + // exclude from further filters if appId matches list of doNotExcludeApps substrings + inDoNotExcludeApps || + + // or it must pass the user and queue filters, if not null + ( (userFilter == null || userFilter.contains(recordCollection.getKey().getUserName())) && + (queueFilter == null || queueFilter.contains(recordCollection.getValue(RecordCategory.CONF_META, + new RecordDataKey(Constants.HRAVEN_QUEUE)))) ) + ) { + + Framework framework = getFramework(recordCollection); + String metricsPathPrefix; + + String pigAliasFp = getPigAliasFingerprint(recordCollection); + String genAppId = genAppId(recordCollection, recordCollection.getKey().getAppId()); + + if (genAppId == null) { + genAppId = recordCollection.getKey().getAppId(); + LOG.error("Generated appId is null for app " + recordCollection.getKey().toString()); + } + + if (framework == Framework.PIG && pigAliasFp != null) { + // TODO: should ideally include app version too, but PIG-2587's pig.logical.plan.signature + // which hraven uses was available only from pig 0.11 + + metricsPathPrefix = + generatePathPrefix(prefix, + recordCollection.getKey().getCluster(), + recordCollection.getKey().getUserName(), + genAppId, + pigAliasFp) + .toString(); + } else { + metricsPathPrefix = + generatePathPrefix(prefix, + recordCollection.getKey().getCluster(), + recordCollection.getKey().getUserName(), + genAppId) + .toString(); + } + + try { + storeAppIdMapping(metricsPathPrefix); + } catch (IOException e) { + LOG.error("Failed to store mapping for app " + recordCollection.getKey().getAppId() + + " to '" + metricsPathPrefix + "'"); + } + + // Round the timestamp to second as Graphite accepts it in such + // a format. + int timestamp = Math.round(recordCollection.getSubmitTime() / 1000); + + // For now, relies on receiving job history and job conf as part of the same + // JobHistoryMultiRecord + for (JobHistoryRecord jobRecord : recordCollection) { + if (service == HravenService.JOB_HISTORY + && (jobRecord.getDataCategory() == RecordCategory.HISTORY_COUNTER || jobRecord + .getDataCategory() == RecordCategory.INFERRED) + && !(jobRecord.getDataKey().get(0).equalsIgnoreCase(submitTimeKey) + || jobRecord.getDataKey().get(0).equalsIgnoreCase(launchTimeKey) || jobRecord.getDataKey() + .get(0).equalsIgnoreCase(finishTimeKey))) { + + StringBuilder line = new StringBuilder(); + line.append(metricsPathPrefix); + + boolean ignoreRecord = false; + for (String comp : jobRecord.getDataKey().getComponents()) { + if (excludedComponents != null && excludedComponents.contains(comp)) { + ignoreRecord = true; + LOG.info("Excluding component '" + jobRecord.getDataKey().toString() + "' of app " + jobRecord.getKey().toString()); + break; + } + line.append(".").append(sanitize(comp)); + } + + if (ignoreRecord) + continue; + else + lineCount++; + + line.append(" ").append(jobRecord.getDataValue()).append(" ") + .append(timestamp).append("\n"); + lines.append(line); + } + } + + //handle run times + Long finishTime = (Long)recordCollection.getValue(RecordCategory.HISTORY_COUNTER, new RecordDataKey(finishTimeKey)); + if (finishTime == null) + finishTime = (Long)recordCollection.getValue(RecordCategory.HISTORY_COUNTER, new RecordDataKey(finishTimeKey.toLowerCase())); + Long launchTime = (Long)recordCollection.getValue(RecordCategory.HISTORY_COUNTER, new RecordDataKey(launchTimeKey)); + if (launchTime == null) + launchTime = (Long)recordCollection.getValue(RecordCategory.HISTORY_COUNTER, new RecordDataKey(launchTimeKey.toLowerCase())); + + if (finishTime != null && recordCollection.getSubmitTime() != null) { + lines.append(metricsPathPrefix + ".").append(totalTimeKey + " " + (finishTime-recordCollection.getSubmitTime()) + " " + timestamp + "\n"); + lineCount++; + } + + if (finishTime != null && launchTime != null) { + lines.append(metricsPathPrefix + ".").append(runTimeKey + " " + (finishTime-launchTime) + " " + timestamp + "\n"); + lineCount++; + } + } + + return lineCount; + } + + private void storeAppIdMapping(String metricsPathPrefix) throws IOException { + Put put = new Put(new JobKeyConverter().toBytes(recordCollection.getKey())); + put.add(Constants.INFO_FAM_BYTES, Constants.GRAPHITE_KEY_MAPPING_COLUMN_BYTES, Bytes.toBytes(metricsPathPrefix)); + keyMappingTable.put(put); + + put = new Put(Bytes.toBytes(metricsPathPrefix)); + + byte[] appIdBytes = ByteUtil.join(Constants.SEP_BYTES, + Bytes.toBytes(recordCollection.getKey().getCluster()), + Bytes.toBytes(recordCollection.getKey().getUserName()), + Bytes.toBytes(recordCollection.getKey().getAppId())); + + put.add(Constants.INFO_FAM_BYTES, Constants.GRAPHITE_KEY_MAPPING_COLUMN_BYTES, appIdBytes); + reverseKeyMappingTable.put(put); + } + + private String getPigAliasFingerprint(JobHistoryRecordCollection recordCollection) { + Object aliasRec = recordCollection.getValue(RecordCategory.CONF, new RecordDataKey("pig.alias")); + Object featureRec = recordCollection.getValue(RecordCategory.CONF, new RecordDataKey("pig.job.feature")); + + String alias = null; + String feature = null; + + if (aliasRec != null) { + alias = (String) aliasRec; + } + + if (featureRec != null) { + feature = (String) featureRec; + } + + if (alias != null) { + return (feature != null ? feature + ":" : "") + + StringUtils.abbreviate(alias, PIG_ALIAS_FINGERPRINT_LENGTH); + } + + return null; + } + + private String genAppId(JobHistoryRecordCollection recordCollection, String appId) { + String oozieActionName = getOozieActionName(recordCollection); + + if (getFramework(recordCollection) == Framework.PIG && APPID_PATTERN_PIGJOB.matcher(appId).matches()) { + // pig:{oozie-action-name}:{pigscript} + if (oozieActionName != null) { + appId = APPID_PATTERN_PIGJOB.matcher(appId).replaceAll("pig:" + oozieActionName + ":$1"); + } else { + appId = APPID_PATTERN_PIGJOB.matcher(appId).replaceAll("pig:$1"); + } + } else if (APPID_PATTERN_OOZIE_LAUNCHER.matcher(appId).matches()) { + // ozl:{oozie-workflow-name} + appId = APPID_PATTERN_OOZIE_LAUNCHER.matcher(appId).replaceAll("ozl:$1:$2:$3"); + } else if (APPID_PATTERN_OOZIE_ACTION.matcher(appId).matches()) { + // oza:{oozie-workflow-name}:{oozie-action-name} + appId = APPID_PATTERN_OOZIE_ACTION.matcher(appId).replaceAll("oza:$1:$2:$3:$4"); + } + + return appId; + } + + private Framework getFramework(JobHistoryRecordCollection recordCollection) { + Object rec = + recordCollection.getValue(RecordCategory.CONF_META, new RecordDataKey(Constants.FRAMEWORK_COLUMN)); + + if (rec != null) { + return Framework.valueOf((String) rec); + } + + return null; + } + + private String getOozieActionName(JobHistoryRecordCollection recordCollection) { + Object rec = recordCollection.getValue(RecordCategory.CONF, new RecordDataKey("oozie.action.id")); + + if (rec != null) { + String actionId = ((String) rec); + return actionId.substring(actionId.indexOf("@") + 1, actionId.length()); + } + + return null; + } + + /** + * Util method to generate metrix path prefix + * @return + * @throws UnsupportedEncodingException + */ + + private StringBuilder generatePathPrefix(String... args) { + StringBuilder prefix = new StringBuilder(); + boolean first = true; + for (String arg : args) { + if (!first) { + prefix.append("."); + } + + prefix.append(sanitize(arg)); + first = false; + } + return prefix; + } + + /** + * Util method to sanitize metrics for sending to graphite E.g. remove periods ("."), etc. + * @throws UnsupportedEncodingException + */ + public static String sanitize(String s) { + return s.replaceAll(GRAPHITE_KEY_FILTER, "_"); + } + + /** + * Output the {@link JobHistoryRecord} received in debug log + * @param serviceKey + * @param jobRecord + */ + + public static void logRecord(HravenService serviceKey, JobHistoryRecord jobRecord) { + StringBuilder line = new StringBuilder(); + String seperator = ", "; + String seperator2 = "|"; + + line.append("Service: " + serviceKey.name()); + + JobKey key = jobRecord.getKey(); + line.append(seperator).append("Cluster: " + key.getCluster()); + line.append(seperator).append("User: " + key.getUserName()); + line.append(seperator).append("AppId: " + key.getAppId()); + line.append(seperator).append("RunId: " + key.getRunId()); + line.append(seperator).append("JobId: " + key.getJobId()); + + line.append(seperator2); + line.append(seperator).append("Category: " + jobRecord.getDataCategory().name()); + + line.append(seperator).append("Key: "); + for (String comp : jobRecord.getDataKey().getComponents()) { + line.append(comp).append("."); + } + + line.append(seperator).append("Value: " + jobRecord.getDataValue()); + line.append(seperator).append("SubmitTime: " + jobRecord.getSubmitTime()); + + LOG.debug(line); + } +} diff --git a/hraven-etl/src/main/java/com/twitter/hraven/mapreduce/GraphiteOutputFormat.java b/hraven-etl/src/main/java/com/twitter/hraven/mapreduce/GraphiteOutputFormat.java new file mode 100644 index 0000000..7fbf31c --- /dev/null +++ b/hraven-etl/src/main/java/com/twitter/hraven/mapreduce/GraphiteOutputFormat.java @@ -0,0 +1,198 @@ +package com.twitter.hraven.mapreduce; + +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.io.PrintWriter; +import java.io.UnsupportedEncodingException; +import java.io.Writer; +import java.net.Socket; +import java.net.URLEncoder; +import java.util.regex.Pattern; + +import org.apache.commons.lang.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.OutputFormat; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +import com.twitter.hraven.Constants; +import com.twitter.hraven.HravenRecord; +import com.twitter.hraven.HravenService; +import com.twitter.hraven.JobHistoryRecordCollection; +import com.twitter.hraven.JobHistoryRecord; +import com.twitter.hraven.JobKey; +import com.twitter.hraven.RecordCategory; +import com.twitter.hraven.util.EnumWritable; + +/** + * @author angad.singh {@link OutputFormat} for sending metrics to graphite + */ + +public class GraphiteOutputFormat extends OutputFormat, HravenRecord> { + + private static Log LOG = LogFactory.getLog(GraphiteOutputFormat.class); + private static Writer writer; + + /** + * {@link OutputCommitter} which does nothing + */ + protected static class GraphiteOutputCommitter extends OutputCommitter { + + @Override + public void setupJob(JobContext jobContext) throws IOException { + } + + @Override + public void setupTask(TaskAttemptContext taskContext) throws IOException { + } + + @Override + public boolean needsTaskCommit(TaskAttemptContext taskContext) throws IOException { + return false; + } + + @Override + public void commitTask(TaskAttemptContext taskContext) throws IOException { + } + + @Override + public void abortTask(TaskAttemptContext taskContext) throws IOException { + } + + } + + protected static class GraphiteRecordWriter extends RecordWriter, HravenRecord> { + + // prepend this prefix to all metrics + private String METRIC_PREFIX; + + // filter jobs not submitted by this user + private String userFilter; + + // filter jobs not submitted in this queue + private String queueFilter; + + // exclude these metric path components (e.g MultiInputCounters - create a lot of redundant tree + // paths, and you wouldn't want to send them to graphite) + private String excludedComponents; + + // comma seperated list of app substrings to prevent from being excluded after above filters + private String doNotExcludeApps; + + private HTable keyMappingTable; + private HTable reverseKeyMappingTable; + + + public GraphiteRecordWriter(Configuration hbaseconfig, String host, int port, String prefix, String userFilter, String queueFilter, String excludedComponents, String doNotExcludeApps) throws IOException { + this.METRIC_PREFIX = prefix; + this.userFilter = userFilter; + this.queueFilter = queueFilter; + this.excludedComponents = excludedComponents; + this.doNotExcludeApps = doNotExcludeApps; + + keyMappingTable = new HTable(hbaseconfig, Constants.GRAPHITE_KEY_MAPPING_TABLE_BYTES); + keyMappingTable.setAutoFlush(false); + + reverseKeyMappingTable = new HTable(hbaseconfig, Constants.GRAPHITE_REVERSE_KEY_MAPPING_TABLE_BYTES); + reverseKeyMappingTable.setAutoFlush(false); + + try { + // Open an connection to Graphite server. + Socket socket = new Socket(host, port); + writer = new OutputStreamWriter(socket.getOutputStream()); + } catch (Exception e) { + throw new IOException("Error connecting to graphite, " + host + ":" + port, e); + } + } + + /** + * Split a {@link JobHistoryRecordCollection} into {@link JobHistoryRecord}s and call the + * {@link #writeRecord(HravenService, JobHistoryRecord)} method + */ + + @Override + public void write(EnumWritable serviceKey, HravenRecord value) throws IOException, + InterruptedException { + HravenService service = serviceKey.getValue(); + JobHistoryRecordCollection recordCollection; + + if (value instanceof JobHistoryRecordCollection) { + recordCollection = (JobHistoryRecordCollection) value; + } else { + recordCollection = new JobHistoryRecordCollection((JobHistoryRecord) value); + } + + StringBuilder output = new StringBuilder(); + int lines = 0; + + try { + GraphiteHistoryWriter graphiteWriter = + new GraphiteHistoryWriter(keyMappingTable, reverseKeyMappingTable, METRIC_PREFIX, service, recordCollection, output, userFilter, queueFilter, excludedComponents, doNotExcludeApps); + lines = graphiteWriter.write(); + } catch (Exception e) { + LOG.error("Error generating metrics for graphite", e); + } + + if (output.length() > 0) { + + try { + LOG.info("SendToGraphite: " + recordCollection.getKey().toString() + " : " + lines + " metrics"); + writer.write(output.toString()); + } catch (Exception e) { + LOG.error("Error sending metrics to graphite", e); + throw new IOException("Error sending metrics", e); + } + } + } + + @Override + public void close(TaskAttemptContext context) throws IOException, InterruptedException { + try { + LOG.info("flushing records and closing writer"); + writer.close(); + } catch (Exception e) { + throw new IOException("Error flush metrics to graphite", e); + } + keyMappingTable.close(); + reverseKeyMappingTable.close(); + } + } + + @Override + public void checkOutputSpecs(JobContext arg0) throws IOException, InterruptedException { + } + + @Override + public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, + InterruptedException { + return new GraphiteOutputCommitter(); + } + + /** + * Output a custom {@link GraphiteRecordWriter} to send metrics to graphite + */ + @Override + public RecordWriter, HravenRecord> getRecordWriter(TaskAttemptContext context) + throws IOException, InterruptedException { + Configuration conf = context.getConfiguration(); + return new GraphiteRecordWriter(HBaseConfiguration.create(conf), + conf.get(Constants.JOBCONF_GRAPHITE_HOST_KEY, Constants.GRAPHITE_DEFAULT_HOST), + conf.getInt(Constants.JOBCONF_GRAPHITE_PORT_KEY, Constants.GRAPHITE_DEFAULT_PORT), + conf.get(Constants.JOBCONF_GRAPHITE_PREFIX, Constants.GRAPHITE_DEFAULT_PREFIX), + conf.get(Constants.JOBCONF_GRAPHITE_USER_FILTER), + conf.get(Constants.JOBCONF_GRAPHITE_QUEUE_FILTER), + conf.get(Constants.JOBCONF_GRAPHITE_EXCLUDED_COMPONENTS), + conf.get(Constants.JOBCONF_GRAPHITE_DONOTEXCLUDE_APPS) + ); + } + +} diff --git a/hraven-etl/src/main/java/com/twitter/hraven/mapreduce/HbaseHistoryWriter.java b/hraven-etl/src/main/java/com/twitter/hraven/mapreduce/HbaseHistoryWriter.java new file mode 100644 index 0000000..bd105b7 --- /dev/null +++ b/hraven-etl/src/main/java/com/twitter/hraven/mapreduce/HbaseHistoryWriter.java @@ -0,0 +1,105 @@ +package com.twitter.hraven.mapreduce; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.util.Bytes; +import com.twitter.hraven.Constants; +import com.twitter.hraven.HravenRecord; +import com.twitter.hraven.JobHistoryKeys; +import com.twitter.hraven.JobHistoryRecord; +import com.twitter.hraven.RecordCategory; +import com.twitter.hraven.datasource.ProcessingException; + +public class HbaseHistoryWriter { + private static Log LOG = LogFactory.getLog(HbaseHistoryWriter.class); + + public static void addHistoryPuts(HravenRecord record, Put p) { + byte[] family = Constants.INFO_FAM_BYTES; + + JobHistoryKeys dataKey = null; + if (record.getDataKey() != null && record.getDataKey().get(0) != null) + try { + dataKey = JobHistoryKeys.valueOf(record.getDataKey().get(0)); + } catch (IllegalArgumentException e) { + // some keys other than JobHistoryKeys were added by + // JobHistoryListener. Ignore this exception. + } + + if (dataKey == null) { + byte[] qualifier = null; + if (record.getDataCategory() == RecordCategory.CONF) { + byte[] jobConfColumnPrefix = + Bytes.add(Constants.JOB_CONF_COLUMN_PREFIX_BYTES, Constants.SEP_BYTES); + qualifier = Bytes.add(jobConfColumnPrefix, Bytes.toBytes(record.getDataKey().toString())); + } else { + qualifier = Bytes.toBytes(record.getDataKey().toString().toLowerCase()); + } + + byte[] valueBytes = null; + + if (record.getDataValue() instanceof Long) { + valueBytes = Bytes.toBytes((Long)record.getDataValue()); + } else if (record.getDataValue() instanceof Double) { + valueBytes = Bytes.toBytes((Double)record.getDataValue()); + } else { + valueBytes = Bytes.toBytes(record.getDataValue().toString()); + } + + Bytes.toBytes(record.getDataValue().toString()); + p.add(family, qualifier, valueBytes); + } else if (dataKey == JobHistoryKeys.COUNTERS || dataKey == JobHistoryKeys.MAP_COUNTERS + || dataKey == JobHistoryKeys.REDUCE_COUNTERS) { + + String group = record.getDataKey().get(1); + String counterName = record.getDataKey().get(2); + byte[] counterPrefix = null; + + try { + switch (dataKey) { + case COUNTERS: + case TOTAL_COUNTERS: + case TASK_COUNTERS: + case TASK_ATTEMPT_COUNTERS: + counterPrefix = Bytes.add(Constants.COUNTER_COLUMN_PREFIX_BYTES, Constants.SEP_BYTES); + break; + case MAP_COUNTERS: + counterPrefix = Bytes.add(Constants.MAP_COUNTER_COLUMN_PREFIX_BYTES, Constants.SEP_BYTES); + case REDUCE_COUNTERS: + counterPrefix = + Bytes.add(Constants.REDUCE_COUNTER_COLUMN_PREFIX_BYTES, Constants.SEP_BYTES); + default: + throw new IllegalArgumentException("Unknown counter type " + dataKey.toString()); + } + } catch (IllegalArgumentException iae) { + throw new ProcessingException("Unknown counter type " + dataKey, iae); + } catch (NullPointerException npe) { + throw new ProcessingException("Null counter type " + dataKey, npe); + } + + byte[] groupPrefix = Bytes.add(counterPrefix, Bytes.toBytes(group), Constants.SEP_BYTES); + byte[] qualifier = Bytes.add(groupPrefix, Bytes.toBytes(counterName)); + + p.add(family, qualifier, Bytes.toBytes((Long) record.getDataValue())); + } else { + @SuppressWarnings("rawtypes") + Class clazz = JobHistoryKeys.KEY_TYPES.get(dataKey); + byte[] valueBytes = null; + + if (Integer.class.equals(clazz)) { + valueBytes = + (Integer) record.getDataValue() == 0 ? Constants.ZERO_INT_BYTES : Bytes + .toBytes((Integer) record.getDataValue()); + } else if (Long.class.equals(clazz)) { + valueBytes = + (Long) record.getDataValue() == 0 ? Constants.ZERO_LONG_BYTES : Bytes + .toBytes((Long) record.getDataValue()); + } else { + // keep the string representation by default + valueBytes = Bytes.toBytes((String) record.getDataValue()); + } + byte[] qualifier = Bytes.toBytes(dataKey.toString().toLowerCase()); + p.add(family, qualifier, valueBytes); + } + } +} diff --git a/hraven-etl/src/main/java/com/twitter/hraven/mapreduce/HbaseOutputFormat.java b/hraven-etl/src/main/java/com/twitter/hraven/mapreduce/HbaseOutputFormat.java new file mode 100644 index 0000000..2511c53 --- /dev/null +++ b/hraven-etl/src/main/java/com/twitter/hraven/mapreduce/HbaseOutputFormat.java @@ -0,0 +1,130 @@ +package com.twitter.hraven.mapreduce; + +import java.io.IOException; +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.mapreduce.MultiTableOutputFormat; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.OutputFormat; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +import com.twitter.hraven.Constants; +import com.twitter.hraven.HravenRecord; +import com.twitter.hraven.HravenService; +import com.twitter.hraven.JobHistoryRecordCollection; +import com.twitter.hraven.JobHistoryRecord; +import com.twitter.hraven.JobHistoryTaskRecord; +import com.twitter.hraven.RecordCategory; +import com.twitter.hraven.RecordDataKey; +import com.twitter.hraven.TaskKey; +import com.twitter.hraven.datasource.JobKeyConverter; +import com.twitter.hraven.datasource.TaskKeyConverter; +import com.twitter.hraven.util.EnumWritable; + +/** + * @author angad.singh Wrapper around Hbase's {@link MultiTableOutputFormat} Converts + * {@link HravenRecords} to Hbase {@link Put}s and writes them to {@link HTable}s + * corresponding to {@link HravenService} + */ + +public class HbaseOutputFormat extends OutputFormat, HravenRecord> { + + protected static class HravenHbaseRecordWriter extends RecordWriter, HravenRecord> { + + private RecordWriter recordWriter; + + public HravenHbaseRecordWriter(RecordWriter recordWriter) { + this.recordWriter = recordWriter; + } + + /** + * Writes a single {@link HravenRecord} to the specified {@link HravenService} + * @param serviceKey + * @param value + * @throws IOException + * @throws InterruptedException + */ + private void writeRecord(HravenService serviceKey, HravenRecord value) + throws IOException, InterruptedException { + ImmutableBytesWritable table = null; + Put put = null; + + switch (serviceKey) { + case JOB_HISTORY: + JobHistoryRecord rec = (JobHistoryRecord) value; + put = new Put(new JobKeyConverter().toBytes(rec.getKey())); + table = new ImmutableBytesWritable(Constants.HISTORY_TABLE_BYTES); + HbaseHistoryWriter.addHistoryPuts(rec, put); + break; + case JOB_HISTORY_TASK: + JobHistoryTaskRecord taskRec = (JobHistoryTaskRecord) value; + put = new Put(new TaskKeyConverter().toBytes((TaskKey) taskRec.getKey())); + table = new ImmutableBytesWritable(Constants.HISTORY_TASK_TABLE_BYTES); + HbaseHistoryWriter.addHistoryPuts(taskRec, put); + break; + } + + recordWriter.write(table, put); + } + + /** + * Split a {@link JobHistoryRecordCollection} into {@link JobHistoryRecord}s and call the + * {@link #writeRecord(HravenService, JobHistoryRecord)} method + */ + + @Override + public void write(EnumWritable serviceKey, HravenRecord value) throws IOException, + InterruptedException { + HravenService service = serviceKey.getValue(); + if (value instanceof JobHistoryRecordCollection) { + for (JobHistoryRecord record : (JobHistoryRecordCollection) value) { + writeRecord(service, record); + } + } else { + writeRecord(service, value); + } + } + + @Override + public void close(TaskAttemptContext context) throws IOException, InterruptedException { + recordWriter.close(context); + } + } + + private MultiTableOutputFormat outputFormat; + + /** + * Wrap around Hbase's {@link MultiTableOutputFormat} + */ + public HbaseOutputFormat() { + this.outputFormat = new MultiTableOutputFormat(); + } + + /** + * Wrap around {@link MultiTableOutputFormat}'s {@link MultiTableRecordWriter} + */ + @Override + public RecordWriter, HravenRecord> getRecordWriter(TaskAttemptContext context) + throws IOException, InterruptedException { + return new HravenHbaseRecordWriter(outputFormat.getRecordWriter(context)); + } + + @Override + public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException { + outputFormat.checkOutputSpecs(context); + } + + @Override + public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, + InterruptedException { + return outputFormat.getOutputCommitter(context); + } +} diff --git a/hraven-etl/src/main/java/com/twitter/hraven/mapreduce/JobFileTableMapper.java b/hraven-etl/src/main/java/com/twitter/hraven/mapreduce/JobFileTableMapper.java index b6f7112..6804c32 100644 --- a/hraven-etl/src/main/java/com/twitter/hraven/mapreduce/JobFileTableMapper.java +++ b/hraven-etl/src/main/java/com/twitter/hraven/mapreduce/JobFileTableMapper.java @@ -19,9 +19,14 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; +import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.Properties; +import javax.annotation.Nullable; + +import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -33,14 +38,23 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; + +import com.google.common.base.Function; +import com.google.common.collect.Collections2; import com.twitter.hraven.Constants; +import com.twitter.hraven.HravenRecord; +import com.twitter.hraven.HravenService; import com.twitter.hraven.JobDesc; import com.twitter.hraven.JobDescFactory; +import com.twitter.hraven.JobHistoryRecordCollection; +import com.twitter.hraven.JobHistoryRecord; +import com.twitter.hraven.JobHistoryTaskRecord; import com.twitter.hraven.JobKey; import com.twitter.hraven.QualifiedJobId; +import com.twitter.hraven.RecordCategory; +import com.twitter.hraven.RecordDataKey; import com.twitter.hraven.datasource.AppVersionService; import com.twitter.hraven.datasource.JobHistoryByIdService; import com.twitter.hraven.datasource.JobHistoryRawService; @@ -53,18 +67,21 @@ import com.twitter.hraven.etl.JobHistoryFileParserBase; import com.twitter.hraven.etl.JobHistoryFileParserFactory; import com.twitter.hraven.etl.ProcessRecordService; +import com.twitter.hraven.etl.Sink; +import com.twitter.hraven.util.EnumWritable; /** * Takes in results from a scan from {@link ProcessRecordService * @getHistoryRawTableScan}, process both job file and history file and emit out - * as puts for the {@link Constants#HISTORY_TABLE} + * as {@link HravenService}, {@link HravenRecord}. {@link Sink}s will process these + * and convert to their respective storage formats *

* As a side-affect we'll load an index record into the * {@link Constants#HISTORY_BY_JOBID_TABLE} as well. * */ public class JobFileTableMapper extends - TableMapper { + TableMapper { private static Log LOG = LogFactory.getLog(JobFileTableMapper.class); @@ -76,6 +93,14 @@ public class JobFileTableMapper extends Constants.HISTORY_RAW_TABLE_BYTES); private static JobKeyConverter jobKeyConv = new JobKeyConverter(); + private MultipleOutputs mos; + + /** + * Define the sinks to output to + */ + + private ArrayList sinks; + /** * Used to create secondary index. */ @@ -96,46 +121,68 @@ public class JobFileTableMapper extends /** * @return the key class for the job output data. */ - public static Class> getOutputKeyClass() { - return ImmutableBytesWritable.class; + public static Class getOutputKeyClass() { + return EnumWritable.class; } /** * @return the value class for the job output data. */ - public static Class getOutputValueClass() { - return Put.class; + public static Class getOutputValueClass() { + return HravenRecord.class; } @Override protected void setup( - Mapper.Context context) + Mapper.Context context) throws java.io.IOException, InterruptedException { Configuration myConf = context.getConfiguration(); jobHistoryByIdService = new JobHistoryByIdService(myConf); appVersionService = new AppVersionService(myConf); rawService = new JobHistoryRawService(myConf); - keyCount = 0; + + sinks = + new ArrayList(Collections2.transform(Arrays.asList(StringUtils.split(context + .getConfiguration().get(Constants.JOBCONF_SINKS), ',')), new Function() { + + @Override + @Nullable + public Sink apply(@Nullable String input) { + return Sink.valueOf(input); + } + })); + + mos = new MultipleOutputs(context); } @Override protected void map( ImmutableBytesWritable key, Result value, - Mapper.Context context) + Mapper.Context context) throws java.io.IOException, InterruptedException { keyCount++; boolean success = true; QualifiedJobId qualifiedJobId = null; try { + + + /** + * STEP 0 + * + * init and extract meaningful data out of raw value + * + * **/ + qualifiedJobId = rawService.getQualifiedJobIdFromResult(value); context.progress(); Configuration jobConf = rawService.createConfigurationFromResult(value); context.progress(); + //figure out submit time byte[] jobhistoryraw = rawService.getJobHistoryRawFromResult(value); long submitTimeMillis = JobHistoryFileParserBase.getSubmitTimeMillisFromJobHistory( jobhistoryraw); @@ -151,22 +198,29 @@ protected void map( Put submitTimePut = rawService.getJobSubmitTimePut(value.getRow(), submitTimeMillis); - context.write(RAW_TABLE, submitTimePut); - + + rawService.getTable().put(submitTimePut); + + /** + * STEP 1 + * + * process and extract job xml/conf + * + * **/ JobDesc jobDesc = JobDescFactory.createJobDesc(qualifiedJobId, submitTimeMillis, jobConf); JobKey jobKey = new JobKey(jobDesc); context.progress(); - + // TODO: remove sysout String msg = "JobDesc (" + keyCount + "): " + jobDesc + " submitTimeMillis: " + submitTimeMillis; LOG.info(msg); - List puts = JobHistoryService.getHbasePuts(jobDesc, jobConf); + JobHistoryRecordCollection confRecordCollection = JobHistoryService.getConfRecord(jobDesc, jobConf); - LOG.info("Writing " + puts.size() + " JobConf puts to " - + Constants.HISTORY_TABLE); + LOG.info("Sending JobConf records to " + + HravenService.JOB_HISTORY + " service"); // TODO: // For Scalding just convert the flowID as a Hex number. Use that for the @@ -176,13 +230,18 @@ protected void map( // Scan should get the first (lowest job-id) then grab the start-time from // the Job. - // Emit the puts - for (Put put : puts) { - context.write(JOB_TABLE, put); - context.progress(); - } + //1.1 Emit the records for job xml/conf/"JobDesc" + //Don't sink config seperately - merge with all other records and then sink + //sink(HravenService.JOB_HISTORY, confRecord); + context.progress(); - // Write secondary index(es) + /** + * STEP 2 + * + * Write secondary index(es) + * + * **/ + LOG.info("Writing secondary indexes"); jobHistoryByIdService.writeIndexes(jobKey); context.progress(); @@ -190,6 +249,14 @@ protected void map( jobDesc.getAppId(), jobDesc.getVersion(), jobDesc.getRunId()); context.progress(); + /** + * STEP 3 + * + * Process and extact actual job history + * + * **/ + + //3.1: get job history KeyValue keyValue = value.getColumnLatest(Constants.RAW_FAM_BYTES, Constants.JOBHISTORY_COL_BYTES); @@ -200,44 +267,25 @@ protected void map( } else { historyFileContents = keyValue.getValue(); } + + //3.2: parse job history JobHistoryFileParser historyFileParser = JobHistoryFileParserFactory .createJobHistoryFileParser(historyFileContents, jobConf); historyFileParser.parse(historyFileContents, jobKey); context.progress(); - puts = historyFileParser.getJobPuts(); - if (puts == null) { - throw new ProcessingException( - " Unable to get job puts for this record!" + jobKey); - } - LOG.info("Writing " + puts.size() + " Job puts to " - + Constants.HISTORY_TABLE); - - // Emit the puts - for (Put put : puts) { - context.write(JOB_TABLE, put); - // TODO: we should not have to do this, but need to confirm that - // TableRecordWriter does this for us. - context.progress(); - } - - puts = historyFileParser.getTaskPuts(); - if (puts == null) { - throw new ProcessingException( - " Unable to get task puts for this record!" + jobKey); - } - LOG.info("Writing " + puts.size() + " Job puts to " - + Constants.HISTORY_TASK_TABLE); - - for (Put put : puts) { - context.write(TASK_TABLE, put); - // TODO: we should not have to do this, but need to confirm that - // TableRecordWriter does this for us. - context.progress(); - } + //3.3: get and write job related data + JobHistoryRecordCollection jobHistoryRecords = (JobHistoryRecordCollection) historyFileParser.getJobRecords(); + jobHistoryRecords.setSubmitTime(submitTimeMillis); + jobHistoryRecords.mergeWith(confRecordCollection); + + LOG.info("Sending " + jobHistoryRecords.size() + " Job history records to " + + HravenService.JOB_HISTORY + " service"); + + context.progress(); - /** post processing steps on job puts and job conf puts */ + //3.4: post processing steps on job records and job conf records Long mbMillis = historyFileParser.getMegaByteMillis(); context.progress(); if (mbMillis == null) { @@ -245,21 +293,37 @@ protected void map( + jobKey); } - Put mbPut = getMegaByteMillisPut(mbMillis, jobKey); - LOG.info("Writing mega byte millis puts to " + Constants.HISTORY_TABLE); - context.write(JOB_TABLE, mbPut); + JobHistoryRecord mbRecord = getMegaByteMillisRecord(mbMillis, jobKey); + LOG.info("Send mega byte millis records to " + HravenService.JOB_HISTORY + " service"); + + jobHistoryRecords.add(mbRecord); context.progress(); - /** post processing steps to get cost of the job */ + //3.5: post processing steps to get cost of the job */ Double jobCost = getJobCost(mbMillis, context.getConfiguration()); context.progress(); if (jobCost == null) { throw new ProcessingException(" Unable to get job cost calculation for this record!" + jobKey); } - Put jobCostPut = getJobCostPut(jobCost, jobKey); - LOG.info("Writing jobCost puts to " + Constants.HISTORY_TABLE); - context.write(JOB_TABLE, jobCostPut); + JobHistoryRecord jobCostRecord = getJobCostRecord(jobCost, jobKey); + LOG.info("Send jobCost records to " + HravenService.JOB_HISTORY + " service"); + jobHistoryRecords.add(jobCostRecord); + + //Sink the merged record + sink(HravenService.JOB_HISTORY, jobHistoryRecords); + context.progress(); + + //3.6: get and write task related data + ArrayList taskHistoryRecords = (ArrayList) historyFileParser.getTaskRecords(); + + LOG.info("Sending " + taskHistoryRecords.size() + " Task history records to " + + HravenService.JOB_HISTORY_TASK + " service"); + + for (JobHistoryTaskRecord taskRecord: taskHistoryRecords) { + sink(HravenService.JOB_HISTORY_TASK, taskRecord); + } + context.progress(); } catch (RowKeyParseException rkpe) { @@ -280,6 +344,12 @@ protected void map( success = false; } + /** + * STEP 4 + * + * Finalization + * + * **/ if (success) { // Update counter to indicate failure. HadoopCompat.incrementCounter(context.getCounter(ProcessingCounter.RAW_ROW_SUCCESS_COUNT), 1); @@ -296,21 +366,28 @@ protected void map( // row, with one succeeding and one failing, there could be a race where the // raw does not properly indicate the true status (which is questionable in // any case with multiple simultaneous runs with different outcome). - context.write(RAW_TABLE, successPut); - + rawService.getTable().put(successPut); } - /** - * generates a put for the megabytemillis + private void sink(HravenService service, HravenRecord record) + throws IOException, InterruptedException { + + for (Sink sink: sinks) { + mos.write(sink.name(), new EnumWritable(service), record); + } + } + +/** + * generates a record for the megabytemillis * @param mbMillis * @param jobKey - * @return the put with megabytemillis + * @return the record with megabytemillis */ - private Put getMegaByteMillisPut(Long mbMillis, JobKey jobKey) { - Put pMb = new Put(jobKeyConv.toBytes(jobKey)); - pMb.add(Constants.INFO_FAM_BYTES, Constants.MEGABYTEMILLIS_BYTES, Bytes.toBytes(mbMillis)); - return pMb; - } + private JobHistoryRecord getMegaByteMillisRecord(Long mbMillis, + JobKey jobKey) { + return new JobHistoryRecord(RecordCategory.INFERRED, jobKey, + new RecordDataKey(Constants.MEGABYTEMILLIS), mbMillis); + } /** * looks for cost file in distributed cache @@ -400,24 +477,28 @@ private Double getJobCost(Long mbMillis, Configuration currentConf) { } /** - * generates a put for the job cost + * generates a record for the job cost * @param jobCost * @param jobKey - * @return the put with job cost + * @return the record with job cost */ - private Put getJobCostPut(Double jobCost, JobKey jobKey) { - Put pJobCost = new Put(jobKeyConv.toBytes(jobKey)); - pJobCost.add(Constants.INFO_FAM_BYTES, Constants.JOBCOST_BYTES, Bytes.toBytes(jobCost)); - return pJobCost; + + private JobHistoryRecord getJobCostRecord(Double jobCost, JobKey jobKey) { + return new JobHistoryRecord(RecordCategory.INFERRED, jobKey, + new RecordDataKey(Constants.JOBCOST), jobCost); } @Override protected void cleanup( - Mapper.Context context) + Mapper.Context context) throws java.io.IOException, InterruptedException { IOException caught = null; - + + //Close the MultipleOutputs instance + //to call close on all wrapped OutputFormats's RecordWriters + mos.close(); + if (jobHistoryByIdService != null) { try { jobHistoryByIdService.close(); diff --git a/hraven-etl/src/main/java/com/twitter/hraven/mapreduce/JobHistoryListener.java b/hraven-etl/src/main/java/com/twitter/hraven/mapreduce/JobHistoryListener.java index 320e05e..4e5e7cd 100644 --- a/hraven-etl/src/main/java/com/twitter/hraven/mapreduce/JobHistoryListener.java +++ b/hraven-etl/src/main/java/com/twitter/hraven/mapreduce/JobHistoryListener.java @@ -17,10 +17,14 @@ import java.io.IOException; import java.text.ParseException; +import java.util.ArrayList; +import java.util.Collection; import java.util.LinkedList; import java.util.List; import java.util.Map; +import javax.annotation.Nullable; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.client.Put; @@ -30,9 +34,17 @@ import org.apache.hadoop.mapred.JobHistoryCopy.Listener; import org.apache.hadoop.mapred.JobHistoryCopy.RecordTypes; +import com.google.common.base.Function; import com.twitter.hraven.Constants; +import com.twitter.hraven.HravenRecord; +import com.twitter.hraven.JobHistoryRecordCollection; import com.twitter.hraven.JobHistoryKeys; +import com.twitter.hraven.JobHistoryRawRecord; +import com.twitter.hraven.JobHistoryRecord; +import com.twitter.hraven.JobHistoryTaskRecord; +import com.twitter.hraven.RecordDataKey; import com.twitter.hraven.JobKey; +import com.twitter.hraven.RecordCategory; import com.twitter.hraven.TaskKey; import com.twitter.hraven.datasource.JobKeyConverter; import com.twitter.hraven.datasource.TaskKeyConverter; @@ -47,17 +59,14 @@ public class JobHistoryListener implements Listener { private String jobId; /** Job ID, minus the leading "job_" */ private String jobNumber = ""; - private final byte[] jobKeyBytes; /** explicitly initializing map millis and * reduce millis in case it's not found */ private long mapSlotMillis = 0L; private long reduceSlotMillis = 0L; - private List jobPuts = new LinkedList(); - private List taskPuts = new LinkedList(); - private JobKeyConverter jobKeyConv = new JobKeyConverter(); - private TaskKeyConverter taskKeyConv = new TaskKeyConverter(); + private Collection jobRecords; + private Collection taskRecords; /** * Constructor for listener to be used to read in a Job History File. While @@ -72,7 +81,8 @@ public JobHistoryListener(JobKey jobKey) { throw new IllegalArgumentException(msg); } this.jobKey = jobKey; - this.jobKeyBytes = jobKeyConv.toBytes(jobKey); + this.jobRecords = new JobHistoryRecordCollection(jobKey); + this.taskRecords = new ArrayList(); setJobId(jobKey.getJobId().getJobIdString()); } @@ -96,9 +106,12 @@ public void handle(RecordTypes recType, Map values) // skip other types ; } - //System.out.println("Reading: " + recType.toString()); } + private interface RecordGenerator { + public HravenRecord getRecord(RecordDataKey key, Object value, boolean isNumeric); + } + private void handleJob(Map values) { String id = values.get(JobHistoryKeys.JOBID); @@ -110,13 +123,15 @@ private void handleJob(Map values) { LOG.error(msg); throw new ImportException(msg); } - // add job ID to values to put - Put p = new Put(this.jobKeyBytes); + for (Map.Entry e : values.entrySet()) { - addKeyValues(p, Constants.INFO_FAM_BYTES, e.getKey(), e.getValue()); + addRecords(jobRecords, e.getKey(), e.getValue(), new RecordGenerator() { + @Override + public JobHistoryRecord getRecord(RecordDataKey key, Object value, boolean isNumeric) { + return new JobHistoryRecord(isNumeric ? RecordCategory.HISTORY_COUNTER : RecordCategory.HISTORY_META, jobKey, key, value); + } + }); } - this.jobPuts.add(p); - } /** @@ -124,10 +139,10 @@ private void handleJob(Map values) { * @param pVersion * @throws IllegalArgumentException if put is null */ - public void includeHadoopVersionPut(Put pVersion) { + public void includeHadoopVersionRecord(JobHistoryRecord pVersion) { // set the hadoop version for this record if (pVersion != null) { - this.jobPuts.add(pVersion); + this.jobRecords.add(pVersion); } else { String msg = "Hadoop Version put cannot be null"; LOG.error(msg); @@ -136,75 +151,76 @@ public void includeHadoopVersionPut(Put pVersion) { } private void handleTask(Map values) { - byte[] taskIdKeyBytes = getTaskKey("task_", this.jobNumber, values.get(JobHistoryKeys.TASKID)); - Put p = new Put(taskIdKeyBytes); + final TaskKey taskKey = getTaskKey("task_", this.jobNumber, values.get(JobHistoryKeys.TASKID)); - p.add(Constants.INFO_FAM_BYTES, Constants.RECORD_TYPE_COL_BYTES, - Bytes.toBytes(RecordTypes.Task.toString())); - for (Map.Entry e : values.entrySet()) { - addKeyValues(p, Constants.INFO_FAM_BYTES, e.getKey(), e.getValue()); + this.taskRecords.add(new JobHistoryTaskRecord(RecordCategory.HISTORY_TASK_META, taskKey, + new RecordDataKey(Constants.RECORD_TYPE_COL), RecordTypes.Task.toString())); + + for (Map.Entry e : values.entrySet()) { + addRecords(taskRecords, e.getKey(), e.getValue(), new RecordGenerator() { + @Override + public JobHistoryTaskRecord getRecord(RecordDataKey key, Object value, boolean isNumeric) { + return new JobHistoryTaskRecord(isNumeric ? RecordCategory.HISTORY_TASK_COUNTER + : RecordCategory.HISTORY_TASK_META, taskKey, key, value); + } + }); } - this.taskPuts.add(p); } private void handleMapAttempt(Map values) { - byte[] taskIdKeyBytes = getTaskKey("attempt_", this.jobNumber, values.get(JobHistoryKeys.TASK_ATTEMPT_ID)); - Put p = new Put(taskIdKeyBytes); + final TaskKey taskKey = + getTaskKey("attempt_", this.jobNumber, values.get(JobHistoryKeys.TASK_ATTEMPT_ID)); - p.add(Constants.INFO_FAM_BYTES, Constants.RECORD_TYPE_COL_BYTES, - Bytes.toBytes(RecordTypes.MapAttempt.toString())); - for (Map.Entry e : values.entrySet()) { - addKeyValues(p, Constants.INFO_FAM_BYTES, e.getKey(), e.getValue()); - } + this.taskRecords.add(new JobHistoryTaskRecord(RecordCategory.HISTORY_TASK_META, taskKey, + new RecordDataKey(Constants.RECORD_TYPE_COL), RecordTypes.MapAttempt.toString())); - this.taskPuts.add(p); + for (Map.Entry e : values.entrySet()) { + addRecords(taskRecords, e.getKey(), e.getValue(), new RecordGenerator() { + @Override + public JobHistoryTaskRecord getRecord(RecordDataKey key, Object value, boolean isNumeric) { + return new JobHistoryTaskRecord(isNumeric ? RecordCategory.HISTORY_TASK_COUNTER + : RecordCategory.HISTORY_TASK_META, taskKey, key, value); + } + }); + } } private void handleReduceAttempt(Map values) { - byte[] taskIdKeyBytes = getTaskKey("attempt_", this.jobNumber, values.get(JobHistoryKeys.TASK_ATTEMPT_ID)); - Put p = new Put(taskIdKeyBytes); + final TaskKey taskKey = + getTaskKey("attempt_", this.jobNumber, values.get(JobHistoryKeys.TASK_ATTEMPT_ID)); - p.add(Constants.INFO_FAM_BYTES, Constants.RECORD_TYPE_COL_BYTES, - Bytes.toBytes(RecordTypes.ReduceAttempt.toString())); - for (Map.Entry e : values.entrySet()) { - addKeyValues(p, Constants.INFO_FAM_BYTES, e.getKey(), e.getValue()); - } + this.taskRecords.add(new JobHistoryTaskRecord(RecordCategory.HISTORY_TASK_META, taskKey, + new RecordDataKey(Constants.RECORD_TYPE_COL), RecordTypes.ReduceAttempt.toString())); - this.taskPuts.add(p); + for (Map.Entry e : values.entrySet()) { + addRecords(taskRecords, e.getKey(), e.getValue(), new RecordGenerator() { + @Override + public JobHistoryTaskRecord getRecord(RecordDataKey key, Object value, boolean isNumeric) { + return new JobHistoryTaskRecord(isNumeric ? RecordCategory.HISTORY_TASK_COUNTER + : RecordCategory.HISTORY_TASK_META, taskKey, key, value); + } + }); + } } - private void addKeyValues(Put p, byte[] family, JobHistoryKeys key, String value) { + private void addRecords(Collection recordCollection, JobHistoryKeys key, String value, + RecordGenerator generator) { if (key == JobHistoryKeys.COUNTERS || key == JobHistoryKeys.MAP_COUNTERS || key == JobHistoryKeys.REDUCE_COUNTERS) { try { Counters counters = Counters.fromEscapedCompactString(value); - /* - * Name counter columns as: - * g!groupname!countername - */ - byte[] counterPrefix = null; - if (key == JobHistoryKeys.COUNTERS) { - counterPrefix = Bytes.add(Constants.COUNTER_COLUMN_PREFIX_BYTES, - Constants.SEP_BYTES); - } else if (key == JobHistoryKeys.MAP_COUNTERS) { - counterPrefix = Bytes.add(Constants.MAP_COUNTER_COLUMN_PREFIX_BYTES, - Constants.SEP_BYTES); - } else if (key == JobHistoryKeys.REDUCE_COUNTERS) { - counterPrefix = Bytes.add(Constants.REDUCE_COUNTER_COLUMN_PREFIX_BYTES, - Constants.SEP_BYTES); - } else { - throw new IllegalArgumentException("Unknown counter type "+key.toString()); - } for (Counters.Group group : counters) { - byte[] groupPrefix = Bytes.add( - counterPrefix, Bytes.toBytes(group.getName()), Constants.SEP_BYTES); for (Counters.Counter counter : group) { + RecordDataKey dataKey = new RecordDataKey(key.toString()); + dataKey.add(group.getName()); String counterName = counter.getName(); long counterValue = counter.getValue(); - byte[] qualifier = Bytes.add(groupPrefix, Bytes.toBytes(counterName)); - p.add(family, qualifier, Bytes.toBytes(counterValue)); - // get the map and reduce slot millis for megabytemillis calculations + dataKey.add(counterName); + recordCollection.add(generator.getRecord(dataKey, counterValue, true)); + + // get the map and reduce slot millis for megabytemillis + // calculations if (Constants.SLOTS_MILLIS_MAPS.equals(counterName)) { this.mapSlotMillis = counterValue; } @@ -214,34 +230,31 @@ private void addKeyValues(Put p, byte[] family, JobHistoryKeys key, String value } } } catch (ParseException pe) { - LOG.error("Counters could not be parsed from string'"+value+"'", pe); + LOG.error("Counters could not be parsed from string'" + value + "'", pe); } } else { @SuppressWarnings("rawtypes") Class clazz = JobHistoryKeys.KEY_TYPES.get(key); - byte[] valueBytes = null; + Object valueObj = value; + boolean isNumeric = false; if (Integer.class.equals(clazz)) { + isNumeric = true; try { - valueBytes = (value != null && value.trim().length() > 0) ? - Bytes.toBytes(Integer.parseInt(value)) : Constants.ZERO_INT_BYTES; + valueObj = (value != null && value.trim().length() > 0) ? Integer.parseInt(value) : 0; } catch (NumberFormatException nfe) { - // us a default value - valueBytes = Constants.ZERO_INT_BYTES; + valueObj = 0; } } else if (Long.class.equals(clazz)) { + isNumeric = true; try { - valueBytes = (value != null && value.trim().length() > 0) ? - Bytes.toBytes(Long.parseLong(value)) : Constants.ZERO_LONG_BYTES; + valueObj = (value != null && value.trim().length() > 0) ? Long.parseLong(value) : 0; } catch (NumberFormatException nfe) { - // us a default value - valueBytes = Constants.ZERO_LONG_BYTES; + valueObj = 0; } - } else { - // keep the string representation by default - valueBytes = Bytes.toBytes(value); } - byte[] qualifier = Bytes.toBytes(key.toString().toLowerCase()); - p.add(family, qualifier, valueBytes); + + recordCollection.add(generator.getRecord(new RecordDataKey(key.toString()), + valueObj, isNumeric)); } } @@ -260,7 +273,7 @@ private void setJobId(String id) { * Returns the Task ID or Task Attempt ID, stripped of the leading job ID, * appended to the job row key. */ - public byte[] getTaskKey(String prefix, String jobNumber, String fullId) { + public TaskKey getTaskKey(String prefix, String jobNumber, String fullId) { String taskComponent = fullId; if (fullId == null) { taskComponent = ""; @@ -272,15 +285,15 @@ public byte[] getTaskKey(String prefix, String jobNumber, String fullId) { } } - return taskKeyConv.toBytes(new TaskKey(this.jobKey, taskComponent)); + return new TaskKey(this.jobKey, taskComponent); } /** * getter for jobKeyBytes * @return the byte array of jobKeyBytes */ - public byte[] getJobKeyBytes() { - return this.jobKeyBytes; + public JobKey getJobKey() { + return this.jobKey; } /** @@ -289,12 +302,12 @@ public byte[] getJobKeyBytes() { * is called with this listener. * @return a non-null (possibly empty) list of jobPuts */ - public List getJobPuts() { - return this.jobPuts; + public Collection getJobRecords() { + return this.jobRecords; } - public List getTaskPuts() { - return this.taskPuts; + public Collection getTaskRecords() { + return this.taskRecords; } public Long getMapSlotMillis() { diff --git a/hraven-etl/src/test/java/com/twitter/hraven/etl/TestJobHistoryFileParserHadoop1.java b/hraven-etl/src/test/java/com/twitter/hraven/etl/TestJobHistoryFileParserHadoop1.java index 411b0c4..9a3357a 100644 --- a/hraven-etl/src/test/java/com/twitter/hraven/etl/TestJobHistoryFileParserHadoop1.java +++ b/hraven-etl/src/test/java/com/twitter/hraven/etl/TestJobHistoryFileParserHadoop1.java @@ -30,6 +30,8 @@ import com.google.common.io.Files; import com.twitter.hraven.Constants; +import com.twitter.hraven.JobHistoryRecordCollection; +import com.twitter.hraven.JobHistoryRecord; import com.twitter.hraven.JobKey; public class TestJobHistoryFileParserHadoop1 { @@ -64,8 +66,8 @@ public void testMegaByteMillis() throws IOException { JobKey jobKey = new JobKey("cluster1", "user1", "Sleep", 1, "job_201311192236_3583"); historyFileParser.parse(contents, jobKey); - List jobPuts = historyFileParser.getJobPuts(); - assertNotNull(jobPuts); + JobHistoryRecordCollection jobRecords = (JobHistoryRecordCollection)historyFileParser.getJobRecords(); + assertNotNull(jobRecords); Long mbMillis = historyFileParser.getMegaByteMillis(); Long expValue = 2981062L; diff --git a/hraven-etl/src/test/java/com/twitter/hraven/etl/TestJobHistoryFileParserHadoop2.java b/hraven-etl/src/test/java/com/twitter/hraven/etl/TestJobHistoryFileParserHadoop2.java index ea9960d..1de4c38 100644 --- a/hraven-etl/src/test/java/com/twitter/hraven/etl/TestJobHistoryFileParserHadoop2.java +++ b/hraven-etl/src/test/java/com/twitter/hraven/etl/TestJobHistoryFileParserHadoop2.java @@ -23,13 +23,19 @@ import com.twitter.hraven.Constants; import com.twitter.hraven.HadoopVersion; import com.twitter.hraven.JobHistoryKeys; +import com.twitter.hraven.JobHistoryRecord; +import com.twitter.hraven.JobHistoryRecordCollection; +import com.twitter.hraven.JobHistoryTaskRecord; import com.twitter.hraven.JobKey; +import com.twitter.hraven.RecordCategory; +import com.twitter.hraven.RecordDataKey; import com.twitter.hraven.datasource.JobKeyConverter; import com.twitter.hraven.datasource.ProcessingException; import com.twitter.hraven.datasource.TaskKeyConverter; import java.io.File; import java.io.IOException; import java.util.Arrays; +import java.util.Collection; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -64,72 +70,30 @@ public void testCreateJobHistoryFileParserCorrectCreation() throws IOException { JobKey jobKey = new JobKey("cluster1", "user", "Sleep", 1, "job_1329348432655_0001"); historyFileParser.parse(contents, jobKey); - List jobPuts = historyFileParser.getJobPuts(); - assertEquals(6, jobPuts.size()); + JobHistoryRecordCollection jobRecords = (JobHistoryRecordCollection)historyFileParser.getJobRecords(); + assertEquals(156, jobRecords.size()); - JobKeyConverter jobKeyConv = new JobKeyConverter(); assertEquals("cluster1!user!Sleep!1!job_1329348432655_0001", - jobKeyConv.fromBytes(jobPuts.get(0).getRow()).toString()); + jobRecords.getKey().toString()); // check hadoop version - boolean foundVersion2 = false; - for (Put p : jobPuts) { - List kv2 = p.get(Constants.INFO_FAM_BYTES, - Bytes.toBytes(JobHistoryKeys.hadoopversion.toString())); - if (kv2.size() == 0) { - // we are interested in hadoop version put only - // hence continue - continue; - } - assertEquals(1, kv2.size()); - Map> d = p.getFamilyMap(); - for (List lkv : d.values()) { - for (KeyValue kv : lkv) { - // ensure we have a hadoop2 version as the value - assertEquals(Bytes.toString(kv.getValue()), - HadoopVersion.TWO.toString()); - - // ensure we don't see the same put twice - assertFalse(foundVersion2); - // now set this to true - foundVersion2 = true; - } - } - } - // ensure that we got the hadoop2 version put - assertTrue(foundVersion2); + Object versionRecord = + jobRecords.getValue(RecordCategory.HISTORY_META, new RecordDataKey( + JobHistoryKeys.hadoopversion.toString().toLowerCase())); + + assertNotNull(versionRecord); + assertEquals((String)versionRecord, HadoopVersion.TWO.toString()); // check job status - boolean foundJobStatus = false; - for (Put p : jobPuts) { - List kv2 = - p.get(Constants.INFO_FAM_BYTES, - Bytes.toBytes(JobHistoryKeys.JOB_STATUS.toString().toLowerCase())); - if (kv2.size() == 0) { - // we are interested in JobStatus put only - // hence continue - continue; - } - assertEquals(1, kv2.size()); - - for (KeyValue kv : kv2) { - // ensure we have a job status value as the value - assertEquals(Bytes.toString(kv.getValue()), - JobHistoryFileParserHadoop2.JOB_STATUS_SUCCEEDED); - - // ensure we don't see the same put twice - assertFalse(foundJobStatus); - // now set this to true - foundJobStatus = true; - } - } - // ensure that we got the JobStatus put - assertTrue(foundJobStatus); - - List taskPuts = historyFileParser.getTaskPuts(); - assertEquals(taskPuts.size(), 45); + Object statusRecord = + jobRecords.getValue(RecordCategory.HISTORY_META, new RecordDataKey( + JobHistoryKeys.JOB_STATUS.toString().toLowerCase())); + + assertNotNull(statusRecord); + assertEquals((String)statusRecord, JobHistoryFileParserHadoop2.JOB_STATUS_SUCCEEDED); - TaskKeyConverter taskKeyConv = new TaskKeyConverter(); + List taskRecords = (List) historyFileParser.getTaskRecords(); + assertEquals(382, taskRecords.size()); Set putRowKeys = new HashSet(Arrays.asList( @@ -158,9 +122,8 @@ public void testCreateJobHistoryFileParserCorrectCreation() throws IOException { "cluster1!user!Sleep!1!job_1329348432655_0001!m_000001_0")); String tKey; - for (Put p : taskPuts) { - tKey = taskKeyConv.fromBytes(p.getRow()).toString(); - assertTrue(putRowKeys.contains(tKey)); + for (JobHistoryTaskRecord p: taskRecords) { + assertTrue(putRowKeys.contains(p.getKey().toString())); } // check post processing for megabytemillis @@ -227,15 +190,15 @@ public void testLongExpGetValuesIntBytes() { String[] keysToBeChecked = {"totalMaps", "totalReduces", "finishedMaps", "finishedReduces", "failedMaps", "failedReduces"}; - byte[] byteValue = null; + Long longValue = null; int intValue10 = 10; long longValue10 = 10L; JobHistoryFileParserHadoop2 jh = new JobHistoryFileParserHadoop2(null); for(String key: keysToBeChecked) { - byteValue = jh.getValue(JobHistoryKeys.HADOOP2_TO_HADOOP1_MAPPING.get(key), intValue10); - assertEquals(Bytes.toLong(byteValue), longValue10); + longValue = (Long) jh.getValue(JobHistoryKeys.HADOOP2_TO_HADOOP1_MAPPING.get(key), intValue10); + assertEquals(longValue.longValue(), longValue10); } } @@ -246,14 +209,14 @@ public void testLongExpGetValuesIntBytes() { public void testIntExpGetValuesIntBytes() { String[] keysToBeChecked = {"httpPort"}; - byte[] byteValue = null; + Integer intValue = null; int intValue10 = 10; JobHistoryFileParserHadoop2 jh = new JobHistoryFileParserHadoop2(null); for(String key: keysToBeChecked) { - byteValue = jh.getValue(JobHistoryKeys.HADOOP2_TO_HADOOP1_MAPPING.get(key), intValue10); - assertEquals(Bytes.toInt(byteValue), intValue10); + intValue = (Integer) jh.getValue(JobHistoryKeys.HADOOP2_TO_HADOOP1_MAPPING.get(key), intValue10); + assertEquals(intValue.intValue(), intValue10); } } } diff --git a/hraven-etl/src/test/java/com/twitter/hraven/etl/TestJobHistoryListener.java b/hraven-etl/src/test/java/com/twitter/hraven/etl/TestJobHistoryListener.java index 90af968..4dfd0bf 100644 --- a/hraven-etl/src/test/java/com/twitter/hraven/etl/TestJobHistoryListener.java +++ b/hraven-etl/src/test/java/com/twitter/hraven/etl/TestJobHistoryListener.java @@ -30,6 +30,8 @@ import com.twitter.hraven.Constants; import com.twitter.hraven.JobHistoryKeys; +import com.twitter.hraven.JobHistoryRecord; +import com.twitter.hraven.JobHistoryRecordCollection; import com.twitter.hraven.JobKey; import com.twitter.hraven.HadoopVersion; import com.twitter.hraven.mapreduce.JobHistoryListener; @@ -50,39 +52,33 @@ public void checkHadoopVersionSet() { JobKey jobKey = new JobKey("cluster1", "user", "Sleep", 1, "job_1329348432655_0001"); JobHistoryListener jobHistoryListener = new JobHistoryListener(jobKey); - assertEquals(jobHistoryListener.getJobPuts().size(), 0); + assertEquals(jobHistoryListener.getJobRecords().size(), 0); JobHistoryFileParserHadoop1 jh = new JobHistoryFileParserHadoop1(null); - Put versionPut = jh.getHadoopVersionPut( + JobHistoryRecord versionRecord = jh.getHadoopVersionRecord( HadoopVersion.ONE, - jobHistoryListener.getJobKeyBytes()); - jobHistoryListener.includeHadoopVersionPut(versionPut); - assertEquals(jobHistoryListener.getJobPuts().size(), 1); + jobHistoryListener.getJobKey()); + jobHistoryListener.includeHadoopVersionRecord(versionRecord); + assertEquals(jobHistoryListener.getJobRecords().size(), 1); // check hadoop version boolean foundVersion1 = false; - for (Put p : jobHistoryListener.getJobPuts()) { - List kv2 = p.get(Constants.INFO_FAM_BYTES, - Bytes.toBytes(JobHistoryKeys.hadoopversion.toString())); - if (kv2.size() == 0) { + for (JobHistoryRecord p : (JobHistoryRecordCollection)jobHistoryListener.getJobRecords()) { + if (!p.getDataKey().get(0).equals(JobHistoryKeys.hadoopversion.toString().toLowerCase())) { // we are interested in hadoop version put only // hence continue continue; } - assertEquals(1, kv2.size()); - Map> d = p.getFamilyMap(); - for (List lkv : d.values()) { - for (KeyValue kv : lkv) { - // ensure we have a hadoop2 version as the value - assertEquals(Bytes.toString(kv.getValue()), - HadoopVersion.ONE.toString() ); - // ensure we don't see the same put twice - assertFalse(foundVersion1); - // now set this to true - foundVersion1 = true; - } - } + assert(p.getDataValue() != null); + + // ensure we have a hadoop2 version as the value + assertEquals(p.getDataValue(), + HadoopVersion.ONE.toString()); + // ensure we don't see the same put twice + assertFalse(foundVersion1); + // now set this to true + foundVersion1 = true; } // ensure that we got the hadoop1 version put assertTrue(foundVersion1);