diff --git a/bin/etl/hraven-etl.sh b/bin/etl/hraven-etl.sh index b2e2caa..f6f38db 100755 --- a/bin/etl/hraven-etl.sh +++ b/bin/etl/hraven-etl.sh @@ -42,6 +42,8 @@ hbaseconfdir=${HBASE_CONF_DIR:-$HBASE_HOME/conf} # HDFS directories for processing and loading job history data historyRawDir=/yarn/history/done/ historyProcessingDir=/hraven/processing/ +sinks=GRAPHITE,HBASE +jobFileProcessorConfOpts=-Dhraven.sink.graphite.userfilter=rmcuser -Dhraven.sink.graphite.queuefilter=userplatform -Dhraven.sink.graphite.excludedcomponents=MultiInputCounters ####################################################### #If costfile is empty, fill it with default values @@ -71,4 +73,4 @@ $home/jobFilePreprocessor.sh $hadoopconfdir $historyRawDir $historyProcessingDir $home/jobFileLoader.sh $hadoopconfdir $mapredmaxsplitsize $schedulerpoolname $cluster $historyProcessingDir # Process -$home/jobFileProcessor.sh $hbaseconfdir $schedulerpoolname $historyProcessingDir $cluster $threads $batchsize $machinetype $costfile +$home/jobFileProcessor.sh $hbaseconfdir $schedulerpoolname $historyProcessingDir $cluster $threads $batchsize $machinetype $costfile $sinks $jobFileProcessorConfOpts diff --git a/bin/etl/jobFileProcessor.sh b/bin/etl/jobFileProcessor.sh index a657a5d..76e23dc 100755 --- a/bin/etl/jobFileProcessor.sh +++ b/bin/etl/jobFileProcessor.sh @@ -21,9 +21,9 @@ # [schedulerpoolname] [historyprocessingdir] [cluster] [threads] [batchsize] [machinetype] [costfile] # a sample cost file can be found in the conf dir as sampleCostDetails.properties -if [ $# -ne 8 ] +if [ $# -ne 10 ] then - echo "Usage: `basename $0` [hbaseconfdir] [schedulerpoolname] [historyprocessingdir] [cluster] [threads] [batchsize] [machinetype] [costfile]" + echo "Usage: `basename $0` [hbaseconfdir] [schedulerpoolname] [historyprocessingdir] [cluster] [threads] [batchsize] [machinetype] [costfile] [sinks] [jobfileprocessorconf]" exit 1 fi @@ -40,5 +40,5 @@ fi create_pidfile $HRAVEN_PID_DIR trap 'cleanup_pidfile_and_exit $HRAVEN_PID_DIR' INT TERM EXIT -hadoop --config $1 jar $hravenEtlJar com.twitter.hraven.etl.JobFileProcessor -libjars=$LIBJARS -Dmapred.fairscheduler.pool=$2 -d -p $3 -c $4 -t $5 -b $6 -m $7 -z $8 +hadoop --config $1 jar $hravenEtlJar com.twitter.hraven.etl.JobFileProcessor -libjars=$LIBJARS -Dmapred.fairscheduler.pool=$2 ${10} -d -p $3 -c $4 -t $5 -b $6 -m $7 -z $8 -s $9 diff --git a/hraven-core/src/main/java/com/twitter/hraven/AppKey.java b/hraven-core/src/main/java/com/twitter/hraven/AppKey.java index 53e8fce..bda6606 100644 --- a/hraven-core/src/main/java/com/twitter/hraven/AppKey.java +++ b/hraven-core/src/main/java/com/twitter/hraven/AppKey.java @@ -16,27 +16,33 @@ package com.twitter.hraven; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + import org.apache.commons.lang.builder.CompareToBuilder; import org.apache.commons.lang.builder.HashCodeBuilder; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.WritableComparable; import org.codehaus.jackson.annotate.JsonCreator; import org.codehaus.jackson.annotate.JsonProperty; -public class AppKey implements Comparable { +public class AppKey implements WritableComparable { /** * The cluster on which the application ran */ - protected final String cluster; + protected String cluster; /** * Who ran the application on Hadoop */ - protected final String userName; + protected String userName; /** * The thing that identifies an application, * such as Pig script identifier, or Scalding identifier. */ - protected final String appId; + protected String appId; @JsonCreator public AppKey(@JsonProperty("cluster") String cluster, @JsonProperty("userName") String userName, @@ -111,4 +117,18 @@ public int hashCode() { .toHashCode(); } + @Override + public void write(DataOutput out) throws IOException { + Text.writeString(out, this.cluster); + Text.writeString(out, this.userName); + Text.writeString(out, this.appId); + } + + @Override + public void readFields(DataInput in) throws IOException { + this.cluster = Text.readString(in); + this.userName = Text.readString(in); + this.appId = Text.readString(in); + } + } diff --git a/hraven-core/src/main/java/com/twitter/hraven/Constants.java b/hraven-core/src/main/java/com/twitter/hraven/Constants.java index 9e8653a..6e22f61 100644 --- a/hraven-core/src/main/java/com/twitter/hraven/Constants.java +++ b/hraven-core/src/main/java/com/twitter/hraven/Constants.java @@ -31,6 +31,8 @@ public class Constants { public static final char SEP_CHAR = '!'; public static final String SEP = "" + SEP_CHAR; public static final byte[] SEP_BYTES = Bytes.toBytes(SEP); + + public static final String PERIOD_SEP_CHAR = "."; // common default values public static final byte[] EMPTY_BYTES = new byte[0]; @@ -426,4 +428,29 @@ public class Constants { /** name of the properties file used for cluster to cluster identifier mapping */ public static final String HRAVEN_CLUSTER_PROPERTIES_FILENAME = "hRavenClusters.properties"; + + public static final String JOBCONF_SINKS = "hraven.sinks"; + + public static final String JOBCONF_GRAPHITE_HOST_KEY = "hraven.sink.graphite.host"; + public static final String JOBCONF_GRAPHITE_PORT_KEY = "hraven.sink.graphite.port"; + public static final String JOBCONF_GRAPHITE_PREFIX = "hraven.sink.graphite.prefix"; + public static final String JOBCONF_GRAPHITE_USER_FILTER = "hraven.sink.graphite.userfilter"; + public static final String JOBCONF_GRAPHITE_QUEUE_FILTER = "hraven.sink.graphite.queuefilter"; + public static final String JOBCONF_GRAPHITE_EXCLUDED_COMPONENTS = "hraven.sink.graphite.excludedcomponents"; + public static final String JOBCONF_GRAPHITE_DONOTEXCLUDE_APPS = "hraven.sink.graphite.donotexcludeapps"; + + public static final int GRAPHITE_DEFAULT_PORT = 2003; + public static final String GRAPHITE_DEFAULT_HOST = "localhost"; + public static final String GRAPHITE_DEFAULT_PREFIX = "DEFAULT"; + + public static final String GRAPHITE_KEY_MAPPING_TABLE = PREFIX + "graphite_key_mapping"; + public static final byte[] GRAPHITE_KEY_MAPPING_TABLE_BYTES = Bytes.toBytes(GRAPHITE_KEY_MAPPING_TABLE); + + public static final String GRAPHITE_REVERSE_KEY_MAPPING_TABLE = PREFIX + "graphite_key_mapping_r"; + public static final byte[] GRAPHITE_REVERSE_KEY_MAPPING_TABLE_BYTES = Bytes.toBytes(GRAPHITE_REVERSE_KEY_MAPPING_TABLE); + + public static final String GRAPHITE_KEY_MAPPING_COLUMN = "k"; + public static final byte[] GRAPHITE_KEY_MAPPING_COLUMN_BYTES = Bytes.toBytes(GRAPHITE_KEY_MAPPING_COLUMN); + + } diff --git a/hraven-core/src/main/java/com/twitter/hraven/FlowKey.java b/hraven-core/src/main/java/com/twitter/hraven/FlowKey.java index ad87e04..1322dc1 100644 --- a/hraven-core/src/main/java/com/twitter/hraven/FlowKey.java +++ b/hraven-core/src/main/java/com/twitter/hraven/FlowKey.java @@ -15,12 +15,19 @@ */ package com.twitter.hraven; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + import org.apache.commons.lang.builder.CompareToBuilder; import org.apache.commons.lang.builder.HashCodeBuilder; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.WritableComparable; import org.codehaus.jackson.annotate.JsonCreator; import org.codehaus.jackson.annotate.JsonProperty; -public class FlowKey extends AppKey implements Comparable { +public class FlowKey extends AppKey implements WritableComparable { /** * Identifying one single run of a version of an app. Smaller values indicate @@ -111,4 +118,17 @@ public int hashCode(){ .toHashCode(); } + @Override + public void write(DataOutput out) throws IOException { + super.write(out); + new LongWritable(this.runId).write(out); + } + + @Override + public void readFields(DataInput in) throws IOException { + super.readFields(in); + LongWritable lw = new LongWritable(); + lw.readFields(in); + this.runId = lw.get(); + } } diff --git a/hraven-core/src/main/java/com/twitter/hraven/HravenRecord.java b/hraven-core/src/main/java/com/twitter/hraven/HravenRecord.java new file mode 100644 index 0000000..b338571 --- /dev/null +++ b/hraven-core/src/main/java/com/twitter/hraven/HravenRecord.java @@ -0,0 +1,172 @@ +package com.twitter.hraven; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Writable; + +import com.twitter.hraven.util.EnumWritable; +import com.twitter.hraven.util.Serializer; + +/** + * + * @author angad.singh + * + * {@link JobFileTableMapper outputs this as value. It corresponds to the + * Put record which was earlier emitted + * + * @param key type + * @param type of dataValue object to be stored + */ + +public abstract class HravenRecord implements Writable{ + private K key; + private RecordCategory dataCategory; + private RecordDataKey dataKey; + private V dataValue; + private Long submitTime; + + public HravenRecord() { + + } + + public K getKey() { + return key; + } + + public void setKey(K key) { + this.key = key; + } + + public RecordCategory getDataCategory() { + return dataCategory; + } + + public void setDataCategory(RecordCategory dataCategory) { + this.dataCategory = dataCategory; + } + + public RecordDataKey getDataKey() { + return dataKey; + } + + public void setDataKey(RecordDataKey dataKey) { + this.dataKey = dataKey; + } + + public V getDataValue() { + return dataValue; + } + + public void setDataValue(V dataValue) { + this.dataValue = dataValue; + } + + public Long getSubmitTime() { + return submitTime; + } + + public void setSubmitTime(Long submitTime) { + this.submitTime = submitTime; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((dataCategory == null) ? 0 : dataCategory.hashCode()); + result = prime * result + ((dataKey == null) ? 0 : dataKey.hashCode()); + result = prime * result + ((dataValue == null) ? 0 : dataValue.hashCode()); + result = prime * result + ((key == null) ? 0 : key.hashCode()); + result = prime * result + (int) (submitTime ^ (submitTime >>> 32)); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + HravenRecord other = (HravenRecord) obj; + if (dataCategory != other.dataCategory) { + return false; + } + if (dataKey == null) { + if (other.dataKey != null) { + return false; + } + } else if (!dataKey.equals(other.dataKey)) { + return false; + } + if (dataValue == null) { + if (other.dataValue != null) { + return false; + } + } else if (!dataValue.equals(other.dataValue)) { + return false; + } + if (key == null) { + if (other.key != null) { + return false; + } + } else if (!key.equals(other.key)) { + return false; + } + if (submitTime != other.submitTime) { + return false; + } + return true; + } + + @Override + public String toString() { + return "HravenRecord [key=" + key + ", dataCategory=" + dataCategory + ", dataKey=" + dataKey + + ", dataValue=" + dataValue + ", submitTime=" + submitTime + "]"; + } + + @Override + public void write(DataOutput out) throws IOException { + //key + this.key.write(out); + //dataCategory + new EnumWritable(this.dataCategory).write(out); + //dataKey + this.dataKey.write(out); + //dataValue + byte[] b = Serializer.serialize(this.dataValue); + out.writeInt(b.length); + out.write(b); + //submitTime + new LongWritable(this.submitTime).write(out); + } + + @Override + public void readFields(DataInput in) throws IOException { + //key + this.key.readFields(in); + //dataCategory + new EnumWritable(this.dataCategory).readFields(in); + //dataKey + this.dataKey.readFields(in); + //dataValue + byte[] b = new byte[in.readInt()]; + in.readFully(b); + try { + this.dataValue = (V) Serializer.deserialize(b); + } catch (ClassNotFoundException e) { + throw new RuntimeException("Failure in deserializing HravenRecord.dataValue"); + } + //submitTime + LongWritable lw = new LongWritable(); + lw.readFields(in); + this.submitTime = lw.get(); + } +} diff --git a/hraven-core/src/main/java/com/twitter/hraven/HravenService.java b/hraven-core/src/main/java/com/twitter/hraven/HravenService.java new file mode 100644 index 0000000..886aed9 --- /dev/null +++ b/hraven-core/src/main/java/com/twitter/hraven/HravenService.java @@ -0,0 +1,32 @@ +package com.twitter.hraven; + +/** + * + * @author angad.singh + * + * {@link JobFileTableMapper} outputs this as key. It corresponds to the + * Hbase table which was earlier emitted + */ + +public enum HravenService { + JOB_HISTORY_RAW { + @Override + public HravenRecord getNewRecord() { + return new JobHistoryRawRecord(); + } + }, + JOB_HISTORY { + @Override + public HravenRecord getNewRecord() { + return new JobHistoryRecord(); + } + }, + JOB_HISTORY_TASK { + @Override + public HravenRecord getNewRecord() { + return new JobHistoryTaskRecord(); + } + }; + + public abstract HravenRecord getNewRecord(); +} diff --git a/hraven-core/src/main/java/com/twitter/hraven/JobHistoryRawRecord.java b/hraven-core/src/main/java/com/twitter/hraven/JobHistoryRawRecord.java new file mode 100644 index 0000000..ab65a3f --- /dev/null +++ b/hraven-core/src/main/java/com/twitter/hraven/JobHistoryRawRecord.java @@ -0,0 +1,40 @@ +package com.twitter.hraven; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.io.Text; + +public class JobHistoryRawRecord extends HravenRecord { + + public JobHistoryRawRecord(RecordCategory dataCategory, String key, RecordDataKey dataKey, + Object dataValue) { + this.setKey(new Text(key)); + this.setDataCategory(dataCategory); + this.setDataKey(dataKey); + this.setDataValue(dataValue); + } + + public JobHistoryRawRecord() { + + } + + public JobHistoryRawRecord(String rawKey) { + this.setKey(new Text(rawKey)); + } + + public void set(RecordCategory category, RecordDataKey key, String value) { + this.setDataCategory(category); + this.setDataKey(key); + this.setDataValue(value); + } + + public void write(DataOutput out) throws IOException { + super.write(out); + } + + public void readFields(DataInput in) throws IOException { + super.readFields(in); + } +} diff --git a/hraven-core/src/main/java/com/twitter/hraven/JobHistoryRecord.java b/hraven-core/src/main/java/com/twitter/hraven/JobHistoryRecord.java new file mode 100644 index 0000000..7507c6f --- /dev/null +++ b/hraven-core/src/main/java/com/twitter/hraven/JobHistoryRecord.java @@ -0,0 +1,51 @@ +package com.twitter.hraven; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +/** + * @author angad.singh Abstraction of a record to be stored in the {@link HravenService#JOB_HISTORY} + * service. Was earlier directly written as an Hbase put + */ + +public class JobHistoryRecord extends HravenRecord { + + public JobHistoryRecord(RecordCategory dataCategory, JobKey key, RecordDataKey dataKey, + Object dataValue) { + this.setKey(key); + this.setDataCategory(dataCategory); + this.setDataKey(dataKey); + this.setDataValue(dataValue); + } + + public JobHistoryRecord(RecordCategory dataCategory, JobKey key, RecordDataKey dataKey, + Object dataValue, Long submitTime) { + this(dataCategory, key, dataKey, dataValue); + setSubmitTime(submitTime); + } + + public JobHistoryRecord() { + + } + + public JobHistoryRecord(JobKey jobKey) { + this.setKey(jobKey); + } + + public void set(RecordCategory category, RecordDataKey key, String value) { + this.setDataCategory(category); + this.setDataKey(key); + this.setDataValue(value); + } + + @Override + public void write(DataOutput out) throws IOException { + super.write(out); + } + + @Override + public void readFields(DataInput in) throws IOException { + super.readFields(in); + } +} diff --git a/hraven-core/src/main/java/com/twitter/hraven/JobHistoryRecordCollection.java b/hraven-core/src/main/java/com/twitter/hraven/JobHistoryRecordCollection.java new file mode 100644 index 0000000..b926e9e --- /dev/null +++ b/hraven-core/src/main/java/com/twitter/hraven/JobHistoryRecordCollection.java @@ -0,0 +1,245 @@ +package com.twitter.hraven; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; + +/** + * @author angad.singh Store multiple {@link JobHistoryRecord}s in a 2 level HashMap Supports + * iteration to get individual {@link JobHistoryRecord}s + */ + +public class JobHistoryRecordCollection extends HravenRecord implements + Collection { + + private Map> valueMap; + + public JobHistoryRecordCollection() { + valueMap = new HashMap>(); + } + + public JobHistoryRecordCollection(JobKey jobKey) { + setKey(jobKey); + valueMap = new HashMap>(); + } + + public JobHistoryRecordCollection(JobHistoryRecord record) { + valueMap = new HashMap>(); + setKey(record.getKey()); + setSubmitTime(record.getSubmitTime()); + add(record); + } + + public boolean add(RecordCategory category, RecordDataKey key, Object value) { + if (valueMap.containsKey(category)) { + valueMap.get(category).put(key, value); + } else { + HashMap categoryMap = new HashMap(); + valueMap.put(category, categoryMap); + categoryMap.put(key, value); + } + + return true; + } + + public boolean add(JobHistoryRecord record) { + return add(record.getDataCategory(), record.getDataKey(), record.getDataValue()); + } + + public Map> getValueMap() { + return valueMap; + } + + public Object getValue(RecordCategory category, RecordDataKey key) { + return valueMap.containsKey(category) ? valueMap.get(category).get(key) : null; + } + + public int size() { + int size = 0; + for (Entry> catMap : valueMap.entrySet()) { + size += catMap.getValue().size(); + } + + return size; + } + + /** + * Be able to iterate easily to get individual {@link JobHistoryRecord}s + */ + + @Override + public Iterator iterator() { + + return new Iterator() { + + private Iterator>> catIterator; + private Iterator> dataIterator; + Entry> nextCat; + Entry nextData; + + { + initIterators(); + } + + private void initIterators() { + if (catIterator == null) { + catIterator = valueMap.entrySet().iterator(); + } + + if (catIterator.hasNext()) { + nextCat = catIterator.next(); + dataIterator = nextCat.getValue().entrySet().iterator(); + } + } + + @Override + public boolean hasNext() { + if (dataIterator == null) { + initIterators(); + } + return dataIterator == null ? false : dataIterator.hasNext() || catIterator.hasNext(); + } + + @Override + public JobHistoryRecord next() { + if (!dataIterator.hasNext()) { + nextCat = catIterator.next(); + dataIterator = nextCat.getValue().entrySet().iterator(); + } + + nextData = dataIterator.next(); + + return new JobHistoryRecord(nextCat.getKey(), getKey(), nextData.getKey(), + nextData.getValue(), getSubmitTime()); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + + }; + } + + public void mergeWith(JobHistoryRecordCollection confRecord) { + for (JobHistoryRecord record : confRecord) { + add(record); + } + } + + @Override + public int hashCode() { + final int prime = 31; + int result = super.hashCode(); + result = prime * result + ((valueMap == null) ? 0 : valueMap.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (!super.equals(obj)) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + JobHistoryRecordCollection other = (JobHistoryRecordCollection) obj; + if (valueMap == null) { + if (other.valueMap != null) { + return false; + } + } else if (!valueMap.equals(other.valueMap)) { + return false; + } + return true; + } + + @Override + public String toString() { + return "JobHistoryRecordCollection [key=" + getKey() + ", dataCategory=" + getDataCategory() + ", dataKey=" + getDataKey() + + ", dataValue=" + getDataValue() + ", submitTime=" + getSubmitTime() + ", valueMap=" + valueMap + "]"; + } + + @Override + public boolean isEmpty() { + return valueMap.isEmpty(); + } + + @Override + public boolean contains(Object o) { + throw new UnsupportedOperationException(); + } + + @Override + public Object[] toArray() { + throw new UnsupportedOperationException(); + } + + @Override + public T[] toArray(T[] a) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean remove(Object o) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean containsAll(Collection c) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean addAll(Collection recordCol) { + for (JobHistoryRecord record : recordCol) { + add(record); + } + return true; + } + + @Override + public boolean removeAll(Collection c) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean retainAll(Collection c) { + throw new UnsupportedOperationException(); + } + + @Override + public void clear() { + this.valueMap = null; + } + + public void write(DataOutput out) throws IOException { + super.write(out); + + out.writeInt(this.size()); + for (JobHistoryRecord r: this) { + r.write(out); + } + } + + public void readFields(DataInput in) throws IOException { + super.readFields(in); + + int size = in.readInt(); + int i = 0; + while (i < size) { + JobHistoryRecord r = new JobHistoryRecord(); + r.readFields(in); + this.add(r); + i++; + } + } +} \ No newline at end of file diff --git a/hraven-core/src/main/java/com/twitter/hraven/JobHistoryTaskRecord.java b/hraven-core/src/main/java/com/twitter/hraven/JobHistoryTaskRecord.java new file mode 100644 index 0000000..24f5bc7 --- /dev/null +++ b/hraven-core/src/main/java/com/twitter/hraven/JobHistoryTaskRecord.java @@ -0,0 +1,49 @@ +package com.twitter.hraven; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +/** + * @author angad.singh Abstraction of a record to be stored in the + * {@link HravenService#JOB_HISTORY_TASK} service. + */ + +public class JobHistoryTaskRecord extends HravenRecord { + + public JobHistoryTaskRecord(RecordCategory dataCategory, TaskKey key, RecordDataKey dataKey, + Object dataValue) { + this.setKey(key); + this.setDataCategory(dataCategory); + this.setDataKey(dataKey); + this.setDataValue(dataValue); + } + + public JobHistoryTaskRecord(RecordCategory dataCategory, TaskKey key, RecordDataKey dataKey, + Object dataValue, Long submitTime) { + this(dataCategory, key, dataKey, dataValue); + setSubmitTime(submitTime); + } + + public JobHistoryTaskRecord() { + + } + + public JobHistoryTaskRecord(TaskKey jobKey) { + this.setKey(jobKey); + } + + public void set(RecordCategory category, RecordDataKey key, String value) { + this.setDataCategory(category); + this.setDataKey(key); + this.setDataValue(value); + } + + public void write(DataOutput out) throws IOException { + super.write(out); + } + + public void readFields(DataInput in) throws IOException { + super.readFields(in); + } +} diff --git a/hraven-core/src/main/java/com/twitter/hraven/JobId.java b/hraven-core/src/main/java/com/twitter/hraven/JobId.java index 887a0a5..0457796 100644 --- a/hraven-core/src/main/java/com/twitter/hraven/JobId.java +++ b/hraven-core/src/main/java/com/twitter/hraven/JobId.java @@ -15,8 +15,14 @@ */ package com.twitter.hraven; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + import org.apache.commons.lang.builder.CompareToBuilder; import org.apache.commons.lang.builder.HashCodeBuilder; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.WritableComparable; import org.codehaus.jackson.annotate.JsonCreator; import org.codehaus.jackson.annotate.JsonProperty; @@ -25,7 +31,7 @@ * apart. The jobtracker ID is parsed as: job_[epoch]_[sequence] * */ -public class JobId implements Comparable { +public class JobId implements WritableComparable { protected static final String JOB_ID_SEP = "_"; /** * The jobtracker start time from the job ID, obtained from parsing the @@ -136,4 +142,19 @@ public int hashCode(){ .append(this.jobSequence) .toHashCode(); } + + @Override + public void write(DataOutput out) throws IOException { + new LongWritable(this.jobEpoch).write(out); + new LongWritable(this.jobSequence).write(out); + } + + @Override + public void readFields(DataInput in) throws IOException { + LongWritable lw = new LongWritable(); + lw.readFields(in); + this.jobEpoch = lw.get(); + lw.readFields(in); + this.jobSequence = lw.get(); + } } diff --git a/hraven-core/src/main/java/com/twitter/hraven/JobKey.java b/hraven-core/src/main/java/com/twitter/hraven/JobKey.java index 8a9d427..7f52734 100644 --- a/hraven-core/src/main/java/com/twitter/hraven/JobKey.java +++ b/hraven-core/src/main/java/com/twitter/hraven/JobKey.java @@ -15,8 +15,13 @@ */ package com.twitter.hraven; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + import org.apache.commons.lang.builder.CompareToBuilder; import org.apache.commons.lang.builder.HashCodeBuilder; +import org.apache.hadoop.io.WritableComparable; import org.codehaus.jackson.annotate.JsonCreator; import org.codehaus.jackson.annotate.JsonProperty; @@ -24,7 +29,7 @@ * Represents the row key for a given job. Row keys are stored as: username ! * appid ! version ! runid ! jobid */ -public class JobKey extends FlowKey implements Comparable{ +public class JobKey extends FlowKey implements WritableComparable{ /** * Fully qualified cluster + parsed job identifier @@ -142,4 +147,16 @@ public int hashCode(){ .toHashCode(); } + @Override + public void write(DataOutput out) throws IOException { + super.write(out); + this.jobId.write(out); + } + + @Override + public void readFields(DataInput in) throws IOException { + super.readFields(in); + this.jobId.readFields(in); + } + } diff --git a/hraven-core/src/main/java/com/twitter/hraven/QualifiedJobId.java b/hraven-core/src/main/java/com/twitter/hraven/QualifiedJobId.java index 36f762f..4374e98 100644 --- a/hraven-core/src/main/java/com/twitter/hraven/QualifiedJobId.java +++ b/hraven-core/src/main/java/com/twitter/hraven/QualifiedJobId.java @@ -15,6 +15,12 @@ */ package com.twitter.hraven; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; import org.codehaus.jackson.annotate.JsonCreator; import org.codehaus.jackson.annotate.JsonProperty; @@ -25,12 +31,12 @@ * current one). This class represents the fully qualified job identifier. * */ -public class QualifiedJobId extends JobId { +public class QualifiedJobId extends JobId implements Writable { /** * The Hadoop cluster on which the job ran. */ - private final String cluster; + private String cluster; /** * Constructor. @@ -57,4 +63,16 @@ public String getCluster() { return cluster; } + @Override + public void write(DataOutput out) throws IOException { + super.write(out); + Text.writeString(out, this.cluster); + } + + @Override + public void readFields(DataInput in) throws IOException { + super.readFields(in); + this.cluster = Text.readString(in); + } + } diff --git a/hraven-core/src/main/java/com/twitter/hraven/RecordCategory.java b/hraven-core/src/main/java/com/twitter/hraven/RecordCategory.java new file mode 100644 index 0000000..31e3380 --- /dev/null +++ b/hraven-core/src/main/java/com/twitter/hraven/RecordCategory.java @@ -0,0 +1,6 @@ +package com.twitter.hraven; + +public enum RecordCategory { + HISTORY_COUNTER, HISTORY_META, HISTORY_TASK_COUNTER, HISTORY_TASK_META, CONF, CONF_META, META, + INFERRED +} \ No newline at end of file diff --git a/hraven-core/src/main/java/com/twitter/hraven/RecordDataKey.java b/hraven-core/src/main/java/com/twitter/hraven/RecordDataKey.java new file mode 100644 index 0000000..e940f14 --- /dev/null +++ b/hraven-core/src/main/java/com/twitter/hraven/RecordDataKey.java @@ -0,0 +1,82 @@ +package com.twitter.hraven; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.apache.hadoop.io.ArrayWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.util.StringUtils; + +public class RecordDataKey implements Writable { + private List components; + + public RecordDataKey(String... components) { + this.components = Arrays.asList(components); + } + + public RecordDataKey(String firstComponent) { + this.components = new ArrayList(); + this.components.add(firstComponent); + } + + public void add(String component) { + this.components.add(component); + } + + public String get(int index) { + return components.get(index); + } + + public List getComponents() { + return components; + } + + @Override + public int hashCode() { + return 1 + ((components == null) ? 0 : components.hashCode()); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + RecordDataKey other = (RecordDataKey) obj; + if (components == null) { + if (other.components != null) { + return false; + } + } else if (!components.equals(other.components)) { + return false; + } + return true; + } + + @Override + public String toString() { + return StringUtils.join(Constants.PERIOD_SEP_CHAR, components); + } + + @Override + public void write(DataOutput out) throws IOException { + new ArrayWritable((String[])this.components.toArray()).write(out); + } + + @Override + public void readFields(DataInput in) throws IOException { + ArrayWritable a = new ArrayWritable(Text.class); + a.readFields(in); + this.components = Arrays.asList(a.toStrings()); + } +} diff --git a/hraven-core/src/main/java/com/twitter/hraven/TaskKey.java b/hraven-core/src/main/java/com/twitter/hraven/TaskKey.java index 479500d..16a47f2 100644 --- a/hraven-core/src/main/java/com/twitter/hraven/TaskKey.java +++ b/hraven-core/src/main/java/com/twitter/hraven/TaskKey.java @@ -15,13 +15,18 @@ */ package com.twitter.hraven; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + import org.apache.commons.lang.builder.CompareToBuilder; import org.apache.commons.lang.builder.HashCodeBuilder; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.WritableComparable; import org.codehaus.jackson.annotate.JsonCreator; import org.codehaus.jackson.annotate.JsonProperty; import org.codehaus.jackson.map.annotate.JsonSerialize; - /** * Represents the row key for an individual job task. This key shares all the * same components from the job key, with the additional of the task ID: @@ -32,7 +37,7 @@ @JsonSerialize( include=JsonSerialize.Inclusion.NON_NULL ) -public class TaskKey extends JobKey implements Comparable { +public class TaskKey extends JobKey implements WritableComparable { private String taskId; @JsonCreator @@ -83,4 +88,16 @@ public int hashCode(){ .append(this.taskId) .toHashCode(); } + + @Override + public void write(DataOutput out) throws IOException { + super.write(out); + Text.writeString(out, this.taskId); + } + + @Override + public void readFields(DataInput in) throws IOException { + super.readFields(in); + this.taskId = Text.readString(in); + } } diff --git a/hraven-core/src/main/java/com/twitter/hraven/datasource/JobHistoryRawService.java b/hraven-core/src/main/java/com/twitter/hraven/datasource/JobHistoryRawService.java index 4c6fef7..539dd7f 100644 --- a/hraven-core/src/main/java/com/twitter/hraven/datasource/JobHistoryRawService.java +++ b/hraven-core/src/main/java/com/twitter/hraven/datasource/JobHistoryRawService.java @@ -551,4 +551,8 @@ public byte[] getJobHistoryRawFromResult(Result value) throws MissingColumnInRes byte[] jobHistoryRaw = keyValue.getValue(); return jobHistoryRaw; } + + public HTable getTable() { + return rawTable; + } } diff --git a/hraven-core/src/main/java/com/twitter/hraven/datasource/JobHistoryService.java b/hraven-core/src/main/java/com/twitter/hraven/datasource/JobHistoryService.java index 12e83bd..d46525c 100644 --- a/hraven-core/src/main/java/com/twitter/hraven/datasource/JobHistoryService.java +++ b/hraven-core/src/main/java/com/twitter/hraven/datasource/JobHistoryService.java @@ -680,8 +680,7 @@ public static CounterMap parseCounters(byte[] prefix, * * @throws IllegalArgumentException if neither config param is found */ - static void setHravenQueueNamePut(Configuration jobConf, Put jobPut, - JobKey jobKey, byte[] jobConfColumnPrefix) { + static void setHravenQueueNameRecord(Configuration jobConf, JobHistoryRecordCollection recordCollection, JobKey jobKey) { String hRavenQueueName = HadoopConfUtil.getQueueName(jobConf); if (hRavenQueueName.equalsIgnoreCase(Constants.DEFAULT_VALUE_QUEUENAME)){ @@ -692,9 +691,8 @@ static void setHravenQueueNamePut(Configuration jobConf, Put jobPut, // set the "queue" property defined by hRaven // this makes it independent of hadoop version config parameters - byte[] column = Bytes.add(jobConfColumnPrefix, Constants.HRAVEN_QUEUE_BYTES); - jobPut.add(Constants.INFO_FAM_BYTES, column, - Bytes.toBytes(hRavenQueueName)); + recordCollection.add(RecordCategory.CONF_META, new RecordDataKey( + Constants.HRAVEN_QUEUE), hRavenQueueName); } /** @@ -708,40 +706,28 @@ static void setHravenQueueNamePut(Configuration jobConf, Put jobPut, * the job configuration * @return puts for the given job configuration */ - public static List getHbasePuts(JobDesc jobDesc, Configuration jobConf) { - List puts = new LinkedList(); - + public static JobHistoryRecordCollection getConfRecord(JobDesc jobDesc, Configuration jobConf) { JobKey jobKey = new JobKey(jobDesc); - byte[] jobKeyBytes = new JobKeyConverter().toBytes(jobKey); // Add all columns to one put - Put jobPut = new Put(jobKeyBytes); - jobPut.add(Constants.INFO_FAM_BYTES, Constants.VERSION_COLUMN_BYTES, - Bytes.toBytes(jobDesc.getVersion())); - jobPut.add(Constants.INFO_FAM_BYTES, Constants.FRAMEWORK_COLUMN_BYTES, - Bytes.toBytes(jobDesc.getFramework().toString())); + JobHistoryRecordCollection recordCollection = new JobHistoryRecordCollection(jobKey); - // Avoid doing string to byte conversion inside loop. - byte[] jobConfColumnPrefix = Bytes.toBytes(Constants.JOB_CONF_COLUMN_PREFIX - + Constants.SEP); + recordCollection.add(RecordCategory.CONF_META, new RecordDataKey(Constants.VERSION_COLUMN), + jobDesc.getVersion()); + recordCollection.add(RecordCategory.CONF_META, new RecordDataKey(Constants.FRAMEWORK_COLUMN), jobDesc + .getFramework().toString()); - // Create puts for all the parameters in the job configuration + // Create records for all the parameters in the job configuration Iterator> jobConfIterator = jobConf.iterator(); while (jobConfIterator.hasNext()) { Entry entry = jobConfIterator.next(); - // Prefix the job conf entry column with an indicator to - byte[] column = Bytes.add(jobConfColumnPrefix, - Bytes.toBytes(entry.getKey())); - jobPut.add(Constants.INFO_FAM_BYTES, column, - Bytes.toBytes(entry.getValue())); + recordCollection.add(RecordCategory.CONF, new RecordDataKey(entry.getKey()), entry.getValue()); } // ensure pool/queuename is set correctly - setHravenQueueNamePut(jobConf, jobPut, jobKey, jobConfColumnPrefix); - - puts.add(jobPut); + setHravenQueueNameRecord(jobConf, recordCollection, jobKey); - return puts; + return recordCollection; } /** diff --git a/hraven-core/src/main/java/com/twitter/hraven/util/EnumWritable.java b/hraven-core/src/main/java/com/twitter/hraven/util/EnumWritable.java new file mode 100644 index 0000000..08ea12f --- /dev/null +++ b/hraven-core/src/main/java/com/twitter/hraven/util/EnumWritable.java @@ -0,0 +1,32 @@ +package com.twitter.hraven.util; + +import java.io.*; +import org.apache.hadoop.io.*; + +public class EnumWritable> implements WritableComparable { + private Class cls; + private E value; + + @SuppressWarnings("unchecked") + public EnumWritable(E value) { + this.cls = (Class) value.getClass(); + this.value = value; + } + + public E getValue() { + return value; + } + + public void readFields(DataInput input) throws IOException { + value = WritableUtils.readEnum(input, cls); + } + + public void write(DataOutput output) throws IOException { + WritableUtils.writeEnum(output, value); + } + + @Override + public int compareTo(E o) { + return this.value.compareTo(o); + } +} \ No newline at end of file diff --git a/hraven-core/src/main/java/com/twitter/hraven/util/Serializer.java b/hraven-core/src/main/java/com/twitter/hraven/util/Serializer.java new file mode 100644 index 0000000..0e878f6 --- /dev/null +++ b/hraven-core/src/main/java/com/twitter/hraven/util/Serializer.java @@ -0,0 +1,22 @@ +package com.twitter.hraven.util; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; + +public class Serializer { + public static byte[] serialize(Object obj) throws IOException { + ByteArrayOutputStream b = new ByteArrayOutputStream(); + ObjectOutputStream o = new ObjectOutputStream(b); + o.writeObject(obj); + return b.toByteArray(); + } + + public static Object deserialize(byte[] bytes) throws IOException, ClassNotFoundException { + ByteArrayInputStream b = new ByteArrayInputStream(bytes); + ObjectInputStream o = new ObjectInputStream(b); + return o.readObject(); + } +} \ No newline at end of file diff --git a/hraven-core/src/main/java/com/twitter/hraven/util/StringUtil.java b/hraven-core/src/main/java/com/twitter/hraven/util/StringUtil.java index aaf1bd1..0839326 100644 --- a/hraven-core/src/main/java/com/twitter/hraven/util/StringUtil.java +++ b/hraven-core/src/main/java/com/twitter/hraven/util/StringUtil.java @@ -36,6 +36,7 @@ public class StringUtil { public static String cleanseToken(String token) { if (token == null || token.length() == 0) { return token; }; + token = token.trim(); String cleansed = token.replaceAll(SPACE, UNDERSCORE); cleansed = cleansed.replaceAll(Constants.SEP, UNDERSCORE); diff --git a/hraven-core/src/test/java/com/twitter/hraven/datasource/TestJobHistoryService.java b/hraven-core/src/test/java/com/twitter/hraven/datasource/TestJobHistoryService.java index 1c8905e..71d9a3c 100644 --- a/hraven-core/src/test/java/com/twitter/hraven/datasource/TestJobHistoryService.java +++ b/hraven-core/src/test/java/com/twitter/hraven/datasource/TestJobHistoryService.java @@ -40,7 +40,11 @@ import com.twitter.hraven.GenerateFlowTestData; import com.twitter.hraven.HadoopVersion; import com.twitter.hraven.JobDetails; +import com.twitter.hraven.JobHistoryRecordCollection; +import com.twitter.hraven.JobHistoryRecord; import com.twitter.hraven.JobKey; +import com.twitter.hraven.RecordCategory; +import com.twitter.hraven.RecordDataKey; import com.twitter.hraven.datasource.JobHistoryByIdService; import com.twitter.hraven.datasource.JobHistoryService; import com.twitter.hraven.datasource.HRavenTestUtil; @@ -311,22 +315,6 @@ public void testRemoveJob() throws Exception { } } - private void assertFoundOnce(byte[] column, Put jobPut, int expectedSize, - String expectedValue) { - boolean foundUserName = false; - List kv1 = jobPut.get(Constants.INFO_FAM_BYTES, column); - assertEquals(expectedSize, kv1.size()); - for (KeyValue kv : kv1) { - assertEquals(Bytes.toString(kv.getValue()), expectedValue); - // ensure we don't see the same put twice - assertFalse(foundUserName); - // now set this to true - foundUserName = true; - } - // ensure that we got the user name - assertTrue(foundUserName); - } - @Test public void testSetHravenQueueName() throws FileNotFoundException { @@ -339,19 +327,17 @@ public void testSetHravenQueueName() throws FileNotFoundException { String USERNAME = "user"; JobKey jobKey = new JobKey("cluster1", USERNAME, "Sleep", 1, "job_1329348432655_0001"); - byte[] jobKeyBytes = new JobKeyConverter().toBytes(jobKey); - Put jobPut = new Put(jobKeyBytes); - byte[] jobConfColumnPrefix = Bytes.toBytes(Constants.JOB_CONF_COLUMN_PREFIX - + Constants.SEP); + + JobHistoryRecordCollection recordCollection = new JobHistoryRecordCollection(jobKey); - assertEquals(jobPut.size(), 0); + assertEquals(recordCollection.size(), 0); // check queuename matches user name since the conf has // value "default" as the queuename - JobHistoryService.setHravenQueueNamePut(jobConf, jobPut, jobKey, jobConfColumnPrefix); - assertEquals(jobPut.size(), 1); - byte[] column = Bytes.add(jobConfColumnPrefix, Constants.HRAVEN_QUEUE_BYTES); - assertFoundOnce(column, jobPut, 1, USERNAME); + JobHistoryService.setHravenQueueNameRecord(jobConf, recordCollection, jobKey); + assertEquals(recordCollection.size(), 1); + assertEquals(recordCollection.getValue(RecordCategory.CONF_META, new RecordDataKey( + Constants.HRAVEN_QUEUE)), USERNAME); // populate the jobConf with all types of queue name parameters String expH2QName = "hadoop2queue"; @@ -363,36 +349,39 @@ public void testSetHravenQueueName() throws FileNotFoundException { // now check queuename is correctly set as hadoop2 queue name // even when the fairscheduler and capacity scheduler are set - jobPut = new Put(jobKeyBytes); - assertEquals(jobPut.size(), 0); - JobHistoryService.setHravenQueueNamePut(jobConf, jobPut, jobKey, jobConfColumnPrefix); - assertEquals(jobPut.size(), 1); - assertFoundOnce(column, jobPut, 1, expH2QName); + recordCollection = new JobHistoryRecordCollection(jobKey); + assertEquals(recordCollection.size(), 0); + JobHistoryService.setHravenQueueNameRecord(jobConf, recordCollection, jobKey); + assertEquals(recordCollection.size(), 1); + assertEquals(recordCollection.getValue(RecordCategory.CONF_META, new RecordDataKey( + Constants.HRAVEN_QUEUE)), expH2QName); // now unset hadoop2 queuename, expect fairscheduler name to be used as queuename jobConf.set(Constants.QUEUENAME_HADOOP2, ""); - jobPut = new Put(jobKeyBytes); - assertEquals(jobPut.size(), 0); - JobHistoryService.setHravenQueueNamePut(jobConf, jobPut, jobKey, jobConfColumnPrefix); - assertEquals(jobPut.size(), 1); - assertFoundOnce(column, jobPut, 1, expH1PoolName); + recordCollection = new JobHistoryRecordCollection(jobKey); + assertEquals(recordCollection.size(), 0); + JobHistoryService.setHravenQueueNameRecord(jobConf, recordCollection, jobKey); + assertEquals(recordCollection.size(), 1); + assertEquals(recordCollection.getValue(RecordCategory.CONF_META, new RecordDataKey( + Constants.HRAVEN_QUEUE)), expH1PoolName); // now unset fairscheduler name, expect capacity scheduler to be used as queuename jobConf.set(Constants.FAIR_SCHEDULER_POOLNAME_HADOOP1, ""); - jobPut = new Put(jobKeyBytes); - assertEquals(jobPut.size(), 0); - JobHistoryService.setHravenQueueNamePut(jobConf, jobPut, jobKey, jobConfColumnPrefix); - assertEquals(jobPut.size(), 1); - assertFoundOnce(column, jobPut, 1, capacityH1QName); + recordCollection = new JobHistoryRecordCollection(jobKey); + assertEquals(recordCollection.size(), 0); + JobHistoryService.setHravenQueueNameRecord(jobConf, recordCollection, jobKey); + assertEquals(recordCollection.size(), 1); + assertEquals(recordCollection.getValue(RecordCategory.CONF_META, new RecordDataKey( + Constants.HRAVEN_QUEUE)), capacityH1QName); // now unset capacity scheduler, expect default_queue to be used as queuename jobConf.set(Constants.CAPACITY_SCHEDULER_QUEUENAME_HADOOP1, ""); - jobPut = new Put(jobKeyBytes); - assertEquals(jobPut.size(), 0); - JobHistoryService.setHravenQueueNamePut(jobConf, jobPut, jobKey, jobConfColumnPrefix); - assertEquals(jobPut.size(), 1); - assertFoundOnce(column, jobPut, 1, Constants.DEFAULT_QUEUENAME); - + recordCollection = new JobHistoryRecordCollection(jobKey); + assertEquals(recordCollection.size(), 0); + JobHistoryService.setHravenQueueNameRecord(jobConf, recordCollection, jobKey); + assertEquals(recordCollection.size(), 1); + assertEquals(recordCollection.getValue(RecordCategory.CONF_META, new RecordDataKey( + Constants.HRAVEN_QUEUE)), Constants.DEFAULT_QUEUENAME); } private void assertJob(JobDetails expected, JobDetails actual) { diff --git a/hraven-etl/src/main/java/com/twitter/hraven/etl/JobFilePreprocessor.java b/hraven-etl/src/main/java/com/twitter/hraven/etl/JobFilePreprocessor.java index c34f85a..96e8161 100644 --- a/hraven-etl/src/main/java/com/twitter/hraven/etl/JobFilePreprocessor.java +++ b/hraven-etl/src/main/java/com/twitter/hraven/etl/JobFilePreprocessor.java @@ -18,6 +18,8 @@ import java.io.IOException; import java.util.Arrays; import java.util.Date; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; @@ -214,15 +216,13 @@ public int run(String[] args) throws Exception { // Output should be an hdfs path. FileSystem hdfs = FileSystem.get(hbaseConf); - // Grab the input path argument - String output = commandLine.getOptionValue("o"); - LOG.info(" output=" + output); - Path outputPath = new Path(output); - FileStatus outputFileStatus = hdfs.getFileStatus(outputPath); - - if (!outputFileStatus.isDir()) { - throw new IOException("Output is not a directory" - + outputFileStatus.getPath().getName()); + // Grab the output path argument + String processingDirectory = commandLine.getOptionValue("o"); + LOG.info("output: " + processingDirectory); + Path processingDirectoryPath = new Path(processingDirectory); + + if (!hdfs.exists(processingDirectoryPath)) { + hdfs.mkdirs(processingDirectoryPath); } // Grab the input path argument @@ -253,6 +253,8 @@ public int run(String[] args) throws Exception { } else { batchSize = DEFAULT_BATCH_SIZE; } + + LOG.info("Batch size: " + batchSize); boolean forceAllFiles = commandLine.hasOption("f"); LOG.info("forceAllFiles: " + forceAllFiles); @@ -267,7 +269,7 @@ public int run(String[] args) throws Exception { // Grab the cluster argument String cluster = commandLine.getOptionValue("c"); - LOG.info("cluster=" + cluster); + LOG.info("cluster: " + cluster); /** * Grab the size of huge files to be moved argument @@ -277,7 +279,7 @@ public int run(String[] args) throws Exception { * {@link https://github.com/twitter/hraven/issues/59} */ String maxFileSizeStr = commandLine.getOptionValue("s"); - LOG.info("maxFileSize=" + maxFileSizeStr); + LOG.info("maxFileSize: " + maxFileSizeStr); long maxFileSize = DEFAULT_RAW_FILE_SIZE_LIMIT; try { maxFileSize = Long.parseLong(maxFileSizeStr); @@ -297,7 +299,7 @@ public int run(String[] args) throws Exception { if (!forceAllFiles) { lastProcessRecord = processRecordService - .getLastSuccessfulProcessRecord(cluster); + .getLastSuccessfulProcessRecord(cluster, processingDirectory); } long minModificationTimeMillis = 0; @@ -344,7 +346,7 @@ public int run(String[] args) throws Exception { LOG.info("Batch count: " + batchCount); for (int b = 0; b < batchCount; b++) { processBatch(jobFileStatusses, b, batchSize, processRecordService, - cluster, outputPath); + cluster, processingDirectoryPath); } } finally { diff --git a/hraven-etl/src/main/java/com/twitter/hraven/etl/JobFileProcessor.java b/hraven-etl/src/main/java/com/twitter/hraven/etl/JobFileProcessor.java index 6ace3d9..67b2bc9 100644 --- a/hraven-etl/src/main/java/com/twitter/hraven/etl/JobFileProcessor.java +++ b/hraven-etl/src/main/java/com/twitter/hraven/etl/JobFileProcessor.java @@ -18,7 +18,11 @@ import static com.twitter.hraven.etl.ProcessState.LOADED; import static com.twitter.hraven.etl.ProcessState.PROCESSED; +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; import java.util.Date; import java.util.LinkedList; import java.util.List; @@ -28,6 +32,8 @@ import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicInteger; +import javax.annotation.Nullable; + import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; import org.apache.commons.cli.HelpFormatter; @@ -35,6 +41,7 @@ import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; import org.apache.commons.cli.PosixParser; +import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -45,7 +52,9 @@ import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.mapreduce.MultiTableOutputFormat; +import org.apache.hadoop.hbase.mapreduce.TableInputFormat; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; +import org.apache.hadoop.hbase.util.Base64; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.Tool; @@ -53,6 +62,8 @@ import org.apache.log4j.Level; import org.apache.log4j.Logger; +import com.google.common.base.Function; +import com.google.common.collect.Collections2; import com.twitter.hraven.Constants; import com.twitter.hraven.datasource.JobHistoryRawService; import com.twitter.hraven.etl.ProcessRecordService; @@ -73,6 +84,7 @@ public class JobFileProcessor extends Configured implements Tool { .format(new Date(System.currentTimeMillis())); private final AtomicInteger jobCounter = new AtomicInteger(0); + private List sinks; /** * Maximum number of files to process in one batch. @@ -119,7 +131,7 @@ private static CommandLine parseArgs(String[] args) throws ParseException { "r", "reprocess", false, - "Reprocess only those records that have been marked to be reprocessed. Otherwise process all rows indicated in the processing records, but successfully processed job files are skipped."); + "Process only those records that have been marked to be reprocessed. Otherwise process all rows indicated in the processing records, but successfully processed job files are skipped."); o.setRequired(false); options.addOption(o); @@ -165,6 +177,12 @@ private static CommandLine parseArgs(String[] args) throws ParseException { o.setArgName("machinetype"); o.setRequired(true); options.addOption(o); + + // Sinks + o = new Option("s", "sinks", true, "Comma seperated list of sinks (currently supported sinks: hbase, graphite)"); + o.setArgName("sinks"); + o.setRequired(true); + options.addOption(o); CommandLineParser parser = new PosixParser(); CommandLine commandLine = null; @@ -203,6 +221,33 @@ public int run(String[] args) throws Exception { // Grab the arguments we're looking for. CommandLine commandLine = parseArgs(otherArgs); + if (StringUtils.isNotBlank(commandLine.getOptionValue("s"))) { + String[] splits = commandLine.getOptionValue("s").split(","); + + if (splits.length > 0) { + sinks = + new ArrayList(Collections2.transform( + Arrays.asList(splits), new Function() { + + @Override + @Nullable + public Sink apply(@Nullable String input) { + try { + return Sink.valueOf(input); + } catch (IllegalArgumentException e) { + throw new IllegalArgumentException("Sink '" + input + "' is incorrect."); + } + } + })); + } + } + + if (sinks == null) { + throw new IllegalArgumentException("Incorrect value for sinks. Provide a comma-seperated list of sinks"); + } + + LOG.info("send data to sink=" + this.sinks.toString()); + // Grab the cluster argument String cluster = commandLine.getOptionValue("c"); LOG.info("cluster=" + cluster); @@ -456,7 +501,7 @@ private List getProcessRecords(Configuration conf, processRecords = processRecordService.getProcessRecords(cluster, LOADED, Integer.MAX_VALUE, processFileSubstring); - LOG.info("Processing " + processRecords.size() + " for: " + cluster); + LOG.info("Processing " + processRecords.size() + " processRecords for: " + cluster); } catch (IOException ioe) { caught = ioe; } finally { @@ -581,6 +626,13 @@ private List getJobRunners(Configuration conf, String cluster, } + static String convertScanToString(Scan scan) throws IOException { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(out); + scan.write(dos); + return Base64.encodeBytes(out.toByteArray()); + } + /** * @param conf * to use to create and run the job @@ -609,12 +661,25 @@ private Job getProcessingJob(Configuration conf, Scan scan, int totalJobCount) // This is a map-only class, skip reduce step job.setNumReduceTasks(0); job.setJarByClass(JobFileProcessor.class); + job.setMapperClass(JobFileTableMapper.class); + job.setInputFormatClass(TableInputFormat.class); + job.setMapOutputKeyClass(JobFileTableMapper.getOutputKeyClass()); + job.setMapOutputValueClass(JobFileTableMapper.getOutputValueClass()); + job.getConfiguration().set(TableInputFormat.INPUT_TABLE, Constants.HISTORY_RAW_TABLE); + job.getConfiguration().set(TableInputFormat.SCAN, + convertScanToString(scan)); + TableMapReduceUtil.addDependencyJars(job); + HBaseConfiguration.addHbaseResources(job.getConfiguration()); + + //TODO: find a better way. reason: just so that it doesn't default to TextOutputFormat job.setOutputFormatClass(MultiTableOutputFormat.class); - - TableMapReduceUtil.initTableMapperJob(Constants.HISTORY_RAW_TABLE, scan, - JobFileTableMapper.class, JobFileTableMapper.getOutputKeyClass(), - JobFileTableMapper.getOutputValueClass(), job); - + + for (Sink sink : sinks) { + sink.configureJob(job); + } + + job.getConfiguration().set(Constants.JOBCONF_SINKS, StringUtils.join(sinks, ",")); + return job; } diff --git a/hraven-etl/src/main/java/com/twitter/hraven/etl/JobHistoryFileParser.java b/hraven-etl/src/main/java/com/twitter/hraven/etl/JobHistoryFileParser.java index 044acc0..0ccc040 100644 --- a/hraven-etl/src/main/java/com/twitter/hraven/etl/JobHistoryFileParser.java +++ b/hraven-etl/src/main/java/com/twitter/hraven/etl/JobHistoryFileParser.java @@ -15,8 +15,7 @@ */ package com.twitter.hraven.etl; -import java.util.List; -import org.apache.hadoop.hbase.client.Put; +import java.util.Collection; import com.twitter.hraven.JobKey; import com.twitter.hraven.datasource.ProcessingException; @@ -48,16 +47,16 @@ public interface JobHistoryFileParser { * Return the generated list of job puts assembled when history file is * parsed * - * @return a list of jobPuts + * @return a collection of JobHistoryRecords */ - public List getJobPuts(); + public Collection getJobRecords(); /** * Return the generated list of task puts assembled when history file is * parsed * - * @return a list of taskPuts + * @return a list of JobHistoryTaskRecords */ - public List getTaskPuts(); + public Collection getTaskRecords(); } diff --git a/hraven-etl/src/main/java/com/twitter/hraven/etl/JobHistoryFileParserBase.java b/hraven-etl/src/main/java/com/twitter/hraven/etl/JobHistoryFileParserBase.java index 83e3f42..c958b1f 100644 --- a/hraven-etl/src/main/java/com/twitter/hraven/etl/JobHistoryFileParserBase.java +++ b/hraven-etl/src/main/java/com/twitter/hraven/etl/JobHistoryFileParserBase.java @@ -19,12 +19,15 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.util.Bytes; import com.twitter.hraven.Constants; import com.twitter.hraven.HadoopVersion; import com.twitter.hraven.JobHistoryKeys; +import com.twitter.hraven.JobHistoryRecord; +import com.twitter.hraven.JobKey; +import com.twitter.hraven.RecordCategory; +import com.twitter.hraven.RecordDataKey; import com.twitter.hraven.datasource.ProcessingException; import com.twitter.hraven.util.ByteUtil; @@ -54,15 +57,12 @@ protected JobHistoryFileParserBase(Configuration conf) { * * @return Put */ - public Put getHadoopVersionPut(HadoopVersion historyFileVersion, byte[] jobKeyBytes) { - Put pVersion = new Put(jobKeyBytes); - byte[] valueBytes = null; - valueBytes = Bytes.toBytes(historyFileVersion.toString()); - byte[] qualifier = Bytes.toBytes(JobHistoryKeys.hadoopversion.toString().toLowerCase()); - pVersion.add(Constants.INFO_FAM_BYTES, qualifier, valueBytes); - return pVersion; + public JobHistoryRecord getHadoopVersionRecord(HadoopVersion historyFileVersion, JobKey jobKey) { + return new JobHistoryRecord(RecordCategory.HISTORY_META, jobKey, + new RecordDataKey(JobHistoryKeys.hadoopversion.toString() + .toLowerCase()), historyFileVersion.toString()); } - + /** * extract the string around Xmx in the java child opts " -Xmx1024m -verbose:gc" * @param javaChildOptsStr diff --git a/hraven-etl/src/main/java/com/twitter/hraven/etl/JobHistoryFileParserHadoop1.java b/hraven-etl/src/main/java/com/twitter/hraven/etl/JobHistoryFileParserHadoop1.java index 8955175..529c88e 100644 --- a/hraven-etl/src/main/java/com/twitter/hraven/etl/JobHistoryFileParserHadoop1.java +++ b/hraven-etl/src/main/java/com/twitter/hraven/etl/JobHistoryFileParserHadoop1.java @@ -17,6 +17,7 @@ import java.io.ByteArrayInputStream; import java.io.IOException; +import java.util.Collection; import java.util.List; import org.apache.commons.logging.Log; @@ -25,6 +26,8 @@ import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.mapred.JobHistoryCopy; import com.twitter.hraven.Constants; +import com.twitter.hraven.JobHistoryRecordCollection; +import com.twitter.hraven.JobHistoryRecord; import com.twitter.hraven.JobKey; import com.twitter.hraven.datasource.ProcessingException; import com.twitter.hraven.mapreduce.JobHistoryListener; @@ -56,9 +59,9 @@ public void parse(byte[] historyFile, JobKey jobKey) throws ProcessingException jobHistoryListener = new JobHistoryListener(jobKey); JobHistoryCopy.parseHistoryFromIS(new ByteArrayInputStream(historyFile), jobHistoryListener); // set the hadoop version for this record - Put versionPut = getHadoopVersionPut(JobHistoryFileParserFactory.getHistoryFileVersion1(), - jobHistoryListener.getJobKeyBytes()); - jobHistoryListener.includeHadoopVersionPut(versionPut); + JobHistoryRecord versionRecord = getHadoopVersionRecord(JobHistoryFileParserFactory.getHistoryFileVersion1(), + jobHistoryListener.getJobKey()); + jobHistoryListener.includeHadoopVersionRecord(versionRecord); } catch (IOException ioe) { LOG.error(" Exception during parsing hadoop 1.0 file ", ioe); throw new ProcessingException( @@ -72,9 +75,9 @@ public void parse(byte[] historyFile, JobKey jobKey) throws ProcessingException * {@inheritDoc} */ @Override - public List getJobPuts() { + public Collection getJobRecords() { if (jobHistoryListener != null) { - return jobHistoryListener.getJobPuts(); + return jobHistoryListener.getJobRecords(); } else { return null; } @@ -84,9 +87,9 @@ public List getJobPuts() { * {@inheritDoc} */ @Override - public List getTaskPuts() { + public Collection getTaskRecords() { if (jobHistoryListener != null) { - return jobHistoryListener.getTaskPuts(); + return jobHistoryListener.getTaskRecords(); } else { return null; } diff --git a/hraven-etl/src/main/java/com/twitter/hraven/etl/JobHistoryFileParserHadoop2.java b/hraven-etl/src/main/java/com/twitter/hraven/etl/JobHistoryFileParserHadoop2.java index 34bfb06..4e9c1fb 100644 --- a/hraven-etl/src/main/java/com/twitter/hraven/etl/JobHistoryFileParserHadoop2.java +++ b/hraven-etl/src/main/java/com/twitter/hraven/etl/JobHistoryFileParserHadoop2.java @@ -10,12 +10,12 @@ import java.io.EOFException; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; import java.util.Map; import java.util.Set; @@ -26,14 +26,10 @@ import org.apache.avro.io.Decoder; import org.apache.avro.io.DecoderFactory; import org.apache.commons.configuration.ConversionException; -import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.mapred.JobHistoryCopy.RecordTypes; import org.codehaus.jettison.json.JSONArray; import org.codehaus.jettison.json.JSONException; @@ -42,7 +38,12 @@ import com.google.common.collect.Maps; import com.twitter.hraven.Constants; import com.twitter.hraven.JobHistoryKeys; +import com.twitter.hraven.JobHistoryRecord; +import com.twitter.hraven.JobHistoryRecordCollection; +import com.twitter.hraven.JobHistoryTaskRecord; import com.twitter.hraven.JobKey; +import com.twitter.hraven.RecordCategory; +import com.twitter.hraven.RecordDataKey; import com.twitter.hraven.TaskKey; import com.twitter.hraven.util.ByteArrayWrapper; import com.twitter.hraven.datasource.JobKeyConverter; @@ -59,8 +60,8 @@ public class JobHistoryFileParserHadoop2 extends JobHistoryFileParserBase { /** Job ID, minus the leading "job_" */ private String jobNumber = ""; private byte[] jobKeyBytes; - private List jobPuts = new LinkedList(); - private List taskPuts = new LinkedList(); + private Collection jobRecords; + private Collection taskRecords = new ArrayList(); boolean uberized = false; /** @@ -223,6 +224,11 @@ public void parse(byte[] historyFileContents, JobKey jobKey) throws ProcessingException { this.jobKey = jobKey; + + if (this.jobRecords == null) { + this.jobRecords = new JobHistoryRecordCollection(jobKey); + } + this.jobKeyBytes = jobKeyConv.toBytes(jobKey); setJobId(jobKey.getJobId().getJobIdString()); @@ -293,27 +299,24 @@ public void parse(byte[] historyFileContents, JobKey jobKey) * events are not so we need to look through the whole file to confirm * the job status and then generate the put */ - Put jobStatusPut = getJobStatusPut(); - this.jobPuts.add(jobStatusPut); + this.jobRecords.add(getJobStatusRecord()); // set the hadoop version for this record - Put versionPut = getHadoopVersionPut(JobHistoryFileParserFactory.getHistoryFileVersion2(), this.jobKeyBytes); - this.jobPuts.add(versionPut); + JobHistoryRecord versionRecord = + getHadoopVersionRecord(JobHistoryFileParserFactory.getHistoryFileVersion2(), this.jobKey); + this.jobRecords.add(versionRecord); - LOG.info("For " + this.jobKey + " #jobPuts " + jobPuts.size() + " #taskPuts: " - + taskPuts.size()); + LOG.info("For " + this.jobKey + " #jobPuts " + jobRecords.size() + " #taskPuts: " + + taskRecords.size()); } /** * generates a put for job status * @return Put that contains Job Status */ - private Put getJobStatusPut() { - Put pStatus = new Put(jobKeyBytes); - byte[] valueBytes = Bytes.toBytes(this.jobStatus); - byte[] qualifier = Bytes.toBytes(JobHistoryKeys.JOB_STATUS.toString().toLowerCase()); - pStatus.add(Constants.INFO_FAM_BYTES, qualifier, valueBytes); - return pStatus; + private JobHistoryRecord getJobStatusRecord() { + return new JobHistoryRecord(RecordCategory.HISTORY_META, this.jobKey, new RecordDataKey( + JobHistoryKeys.JOB_STATUS.toString().toLowerCase()), this.jobStatus); } /** @@ -377,7 +380,7 @@ private void understandSchema(String schema) throws JSONException { * "counts":[ { "name":"MAP_INPUT_RECORDS", "displayName":"Map input records", "value":10 }, { * "name":"MAP_OUTPUT_RECORDS", "displayName":"Map output records", "value":10 } ] } ] } */ - private void processCounters(Put p, JSONObject eventDetails, String key) { + private void processCounters(JSONObject eventDetails, String key) { try { JSONObject jsonCounters = eventDetails.getJSONObject(key); @@ -388,8 +391,21 @@ private void processCounters(Put p, JSONObject eventDetails, String key) { JSONArray counts = aCounter.getJSONArray(COUNTS); for (int j = 0; j < counts.length(); j++) { JSONObject countDetails = counts.getJSONObject(j); - populatePut(p, Constants.INFO_FAM_BYTES, counterMetaGroupName, aCounter.get(NAME) - .toString(), countDetails.get(NAME).toString(), countDetails.getLong(VALUE)); + //counterMetaGroupName; + String groupName = aCounter.get(NAME).toString(); + String counterName = countDetails.get(NAME).toString(); + Long counterValue = countDetails.getLong(VALUE); + + /** + * correct and populate map and reduce slot millis + */ + if ((Constants.SLOTS_MILLIS_MAPS.equals(counterName)) || + (Constants.SLOTS_MILLIS_REDUCES.equals(counterName))) { + counterValue = getStandardizedCounterValue(counterName, counterValue); + } + + this.jobRecords.add(new JobHistoryRecord(RecordCategory.HISTORY_COUNTER, this.jobKey, + new RecordDataKey(counterMetaGroupName, groupName, counterName), counterValue)); } } } catch (JSONException e) { @@ -403,57 +419,57 @@ private void processCounters(Put p, JSONObject eventDetails, String key) { * @throws JSONException */ private void - processAllTypes(Put p, Hadoop2RecordType recType, JSONObject eventDetails, String key) + processAllTypes(Hadoop2RecordType recType, JSONObject eventDetails, String dataKey, RecordAdder adder) throws JSONException { - if (COUNTER_NAMES.contains(key)) { - processCounters(p, eventDetails, key); + if (COUNTER_NAMES.contains(dataKey)) { + processCounters(eventDetails, dataKey); } else { - String type = fieldTypes.get(recType).get(key); + String type = fieldTypes.get(recType).get(dataKey); if (type.equalsIgnoreCase(TYPE_STRING)) { // look for job status if (JobHistoryKeys.JOB_STATUS.toString().equals( - JobHistoryKeys.HADOOP2_TO_HADOOP1_MAPPING.get(key))) { + JobHistoryKeys.HADOOP2_TO_HADOOP1_MAPPING.get(dataKey))) { // store it only if it's one of the terminal state events if ((recType.equals(Hadoop2RecordType.JobFinished)) || (recType.equals(Hadoop2RecordType.JobUnsuccessfulCompletion))) { - this.jobStatus = eventDetails.getString(key); + this.jobStatus = eventDetails.getString(dataKey); } } else { - String value = eventDetails.getString(key); - populatePut(p, Constants.INFO_FAM_BYTES, key, value); + String value = eventDetails.getString(dataKey); + populateRecord(dataKey, value, adder); } } else if (type.equalsIgnoreCase(TYPE_LONG)) { - long value = eventDetails.getLong(key); - populatePut(p, Constants.INFO_FAM_BYTES, key, value); + long value = eventDetails.getLong(dataKey); + populateRecord(dataKey, value, adder); // populate start time of the job for megabytemillis calculations if ((recType.equals(Hadoop2RecordType.JobInited)) && - LAUNCH_TIME_KEY_STR.equals(JobHistoryKeys.HADOOP2_TO_HADOOP1_MAPPING.get(key))) { + LAUNCH_TIME_KEY_STR.equals(JobHistoryKeys.HADOOP2_TO_HADOOP1_MAPPING.get(dataKey))) { this.startTime = value; } // populate end time of the job for megabytemillis calculations if ((recType.equals(Hadoop2RecordType.JobFinished)) || (recType.equals(Hadoop2RecordType.JobUnsuccessfulCompletion))) { - if (FINISH_TIME_KEY_STR.equals(JobHistoryKeys.HADOOP2_TO_HADOOP1_MAPPING.get(key))) { + if (FINISH_TIME_KEY_STR.equals(JobHistoryKeys.HADOOP2_TO_HADOOP1_MAPPING.get(dataKey))) { this.endTime = value; } } } else if (type.equalsIgnoreCase(TYPE_INT)) { - int value = eventDetails.getInt(key); - populatePut(p, Constants.INFO_FAM_BYTES, key, value); + int value = eventDetails.getInt(dataKey); + populateRecord(dataKey, value, adder); } else if (type.equalsIgnoreCase(TYPE_BOOLEAN)) { - boolean value = eventDetails.getBoolean(key); - populatePut(p, Constants.INFO_FAM_BYTES, key, Boolean.toString(value)); + boolean value = eventDetails.getBoolean(dataKey); + populateRecord(dataKey, Boolean.toString(value), adder); } else if (type.equalsIgnoreCase(TYPE_ARRAY_INTS)) { - String value = eventDetails.getString(key); - populatePut(p, Constants.INFO_FAM_BYTES, key, value); + String value = eventDetails.getString(dataKey); + populateRecord(dataKey, value, adder); } else if (type.equalsIgnoreCase(NULL_STRING)) { // usually seen in FAILED tasks - String value = eventDetails.getString(key); - populatePut(p, Constants.INFO_FAM_BYTES, key, value); + String value = eventDetails.getString(dataKey); + populateRecord(dataKey, value, adder); } else if (type.equalsIgnoreCase(TYPE_MAP_STRINGS)) { - JSONObject ms = new JSONObject(eventDetails.get(key).toString()); - populatePut(p, Constants.INFO_FAM_BYTES, key, ms.toString()); + JSONObject ms = new JSONObject(eventDetails.get(dataKey).toString()); + populateRecord(dataKey, ms.toString(), adder); } else { throw new ProcessingException("Encountered a new type " + type + " unable to complete processing " + this.jobKey); @@ -465,15 +481,19 @@ private void processCounters(Put p, JSONObject eventDetails, String key) { * iterate over the event details and prepare puts * @throws JSONException */ - private void iterateAndPreparePuts(JSONObject eventDetails, Put p, Hadoop2RecordType recType) + private void iterateAndAddRecords(JSONObject eventDetails, Hadoop2RecordType recType, RecordAdder adder) throws JSONException { Iterator keys = eventDetails.keys(); while (keys.hasNext()) { - String key = (String) keys.next(); - processAllTypes(p, recType, eventDetails, key); + String dataKey = (String) keys.next(); + processAllTypes(recType, eventDetails, dataKey, adder); } } + private interface RecordAdder { + public void addRecord(RecordDataKey key, Object value, boolean isNumeric); + } + /** * process individual records * @throws JSONException @@ -492,65 +512,95 @@ private void processRecords(Hadoop2RecordType recType, JSONObject eventDetails) case JobStatusChanged: case JobSubmitted: case JobUnsuccessfulCompletion: - Put pJob = new Put(this.jobKeyBytes); - iterateAndPreparePuts(eventDetails, pJob, recType); - this.jobPuts.add(pJob); + iterateAndAddRecords(eventDetails, recType, new RecordAdder() { + @Override + public void addRecord(RecordDataKey key, Object value, boolean isNumeric) { + jobRecords.add(new JobHistoryRecord(isNumeric ? RecordCategory.HISTORY_COUNTER + : RecordCategory.HISTORY_META, jobKey, key, value)); + } + }); break; case AMStarted: - byte[] amAttemptIdKeyBytes = - getAMKey(AM_ATTEMPT_PREFIX, eventDetails.getString(APPLICATION_ATTEMPTID)); - // generate a new put per AM Attempt - Put pAM = new Put(amAttemptIdKeyBytes); - pAM.add(Constants.INFO_FAM_BYTES, Constants.RECORD_TYPE_COL_BYTES, - Bytes.toBytes(RecordTypes.Task.toString())); - iterateAndPreparePuts(eventDetails, pAM, recType); - taskPuts.add(pAM); + final TaskKey amAttemptIdKey = + getAMKey(AM_ATTEMPT_PREFIX, eventDetails.getString(APPLICATION_ATTEMPTID)); + // generate a new record per AM Attempt + taskRecords.add(new JobHistoryTaskRecord(RecordCategory.HISTORY_TASK_META, amAttemptIdKey, + new RecordDataKey(Constants.RECORD_TYPE_COL), RecordTypes.Task.toString())); + + iterateAndAddRecords(eventDetails, recType, new RecordAdder() { + @Override + public void addRecord(RecordDataKey key, Object value, boolean isNumeric) { + taskRecords.add(new JobHistoryTaskRecord(isNumeric ? RecordCategory.HISTORY_TASK_COUNTER + : RecordCategory.HISTORY_TASK_META, amAttemptIdKey, key, value)); + } + }); break; case MapAttemptFinished: - byte[] taskMAttemptIdKeyBytes = - getTaskKey(TASK_ATTEMPT_PREFIX, this.jobNumber, eventDetails.getString(ATTEMPTID)); - Put pMTaskAttempt = new Put(taskMAttemptIdKeyBytes); - pMTaskAttempt.add(Constants.INFO_FAM_BYTES, Constants.RECORD_TYPE_COL_BYTES, - Bytes.toBytes(RecordTypes.MapAttempt.toString())); - iterateAndPreparePuts(eventDetails, pMTaskAttempt, recType); - this.taskPuts.add(pMTaskAttempt); + final TaskKey taskMAttemptIdKey = + getTaskKey(TASK_ATTEMPT_PREFIX, this.jobNumber, eventDetails.getString(ATTEMPTID)); + + taskRecords.add(new JobHistoryTaskRecord(RecordCategory.HISTORY_TASK_META, taskMAttemptIdKey, + new RecordDataKey(Constants.RECORD_TYPE_COL), RecordTypes.MapAttempt.toString())); + + iterateAndAddRecords(eventDetails, recType, new RecordAdder() { + @Override + public void addRecord(RecordDataKey key, Object value, boolean isNumeric) { + taskRecords.add(new JobHistoryTaskRecord(isNumeric ? RecordCategory.HISTORY_TASK_COUNTER + : RecordCategory.HISTORY_TASK_META, taskMAttemptIdKey, key, value)); + } + }); break; case ReduceAttemptFinished: - byte[] taskRAttemptIdKeyBytes = - getTaskKey(TASK_ATTEMPT_PREFIX, this.jobNumber, eventDetails.getString(ATTEMPTID)); - Put pRTaskAttempt = new Put(taskRAttemptIdKeyBytes); - pRTaskAttempt.add(Constants.INFO_FAM_BYTES, Constants.RECORD_TYPE_COL_BYTES, - Bytes.toBytes(RecordTypes.ReduceAttempt.toString())); - iterateAndPreparePuts(eventDetails, pRTaskAttempt, recType); - this.taskPuts.add(pRTaskAttempt); + final TaskKey taskRAttemptIdKey = + getTaskKey(TASK_ATTEMPT_PREFIX, this.jobNumber, eventDetails.getString(ATTEMPTID)); + taskRecords.add(new JobHistoryTaskRecord(RecordCategory.HISTORY_TASK_META, taskRAttemptIdKey, + new RecordDataKey(Constants.RECORD_TYPE_COL), RecordTypes.ReduceAttempt.toString())); + + iterateAndAddRecords(eventDetails, recType, new RecordAdder() { + @Override + public void addRecord(RecordDataKey key, Object value, boolean isNumeric) { + taskRecords.add(new JobHistoryTaskRecord(isNumeric ? RecordCategory.HISTORY_TASK_COUNTER + : RecordCategory.HISTORY_TASK_META, taskRAttemptIdKey, key, value)); + } + }); break; case TaskAttemptFinished: case TaskAttemptStarted: case TaskAttemptUnsuccessfulCompletion: - byte[] taskAttemptIdKeyBytes = - getTaskKey(TASK_ATTEMPT_PREFIX, this.jobNumber, eventDetails.getString(ATTEMPTID)); - Put pTaskAttempt = new Put(taskAttemptIdKeyBytes); - pTaskAttempt.add(Constants.INFO_FAM_BYTES, Constants.RECORD_TYPE_COL_BYTES, - Bytes.toBytes(RecordTypes.Task.toString())); - iterateAndPreparePuts(eventDetails, pTaskAttempt, recType); - taskPuts.add(pTaskAttempt); + final TaskKey taskAttemptIdKey = + getTaskKey(TASK_ATTEMPT_PREFIX, this.jobNumber, eventDetails.getString(ATTEMPTID)); + taskRecords.add(new JobHistoryTaskRecord(RecordCategory.HISTORY_TASK_META, taskAttemptIdKey, + new RecordDataKey(Constants.RECORD_TYPE_COL), RecordTypes.Task.toString())); + + iterateAndAddRecords(eventDetails, recType, new RecordAdder() { + @Override + public void addRecord(RecordDataKey key, Object value, boolean isNumeric) { + taskRecords.add(new JobHistoryTaskRecord(isNumeric ? RecordCategory.HISTORY_TASK_COUNTER + : RecordCategory.HISTORY_TASK_META, taskAttemptIdKey, key, value)); + } + }); break; case TaskFailed: case TaskStarted: case TaskUpdated: case TaskFinished: - byte[] taskIdKeyBytes = - getTaskKey(TASK_PREFIX, this.jobNumber, eventDetails.getString(TASKID)); - Put pTask = new Put(taskIdKeyBytes); - pTask.add(Constants.INFO_FAM_BYTES, Constants.RECORD_TYPE_COL_BYTES, - Bytes.toBytes(RecordTypes.Task.toString())); - iterateAndPreparePuts(eventDetails, pTask, recType); - taskPuts.add(pTask); + final TaskKey taskIdKey = + getTaskKey(TASK_PREFIX, this.jobNumber, eventDetails.getString(TASKID)); + taskRecords.add(new JobHistoryTaskRecord(RecordCategory.HISTORY_TASK_META, taskIdKey, + new RecordDataKey(Constants.RECORD_TYPE_COL), RecordTypes.Task.toString())); + + iterateAndAddRecords(eventDetails, recType, new RecordAdder() { + @Override + public void addRecord(RecordDataKey key, Object value, boolean isNumeric) { + taskRecords.add(new JobHistoryTaskRecord(isNumeric ? RecordCategory.HISTORY_TASK_COUNTER + : RecordCategory.HISTORY_TASK_META, taskIdKey, key, value)); + } + }); break; default: LOG.error("Check if recType was modified and has new members?"); @@ -587,12 +637,8 @@ private String getKey(String key) throws IllegalArgumentException { * @param String key * @param long value */ - private void populatePut(Put p, byte[] family, String key, long value) { - - byte[] valueBytes = null; - valueBytes = (value != 0L) ? Bytes.toBytes(value) : Constants.ZERO_LONG_BYTES; - byte[] qualifier = Bytes.toBytes(getKey(key).toLowerCase()); - p.add(family, qualifier, valueBytes); + private void populateRecord(String key, long value, RecordAdder adder) { + adder.addRecord(new RecordDataKey(getKey(key).toLowerCase()), value, true); } /** @@ -602,19 +648,19 @@ private void populatePut(Put p, byte[] family, String key, long value) { * keeping this function package level visible (unit testing) * @throws IllegalArgumentException if new key is encountered */ - byte[] getValue(String key, int value) { - byte[] valueBytes = null; + Object getValue(String key, int value) { + Object valueObject = null; Class clazz = JobHistoryKeys.KEY_TYPES.get(JobHistoryKeys.valueOf(key)); if (clazz == null) { throw new IllegalArgumentException(" unknown key " + key + " encountered while parsing " + this.jobKey); } if (Long.class.equals(clazz)) { - valueBytes = (value != 0L) ? Bytes.toBytes(new Long(value)) : Constants.ZERO_LONG_BYTES; + valueObject = (value != 0L) ? new Long(value) : 0L; } else { - valueBytes = (value != 0) ? Bytes.toBytes(value) : Constants.ZERO_INT_BYTES; + valueObject = (int)value; } - return valueBytes; + return valueObject; } /** @@ -624,14 +670,11 @@ byte[] getValue(String key, int value) { * @param String key * @param int value */ - private void populatePut(Put p, byte[] family, String key, int value) { - + private void populateRecord(String key, int value, RecordAdder adder) { String jobHistoryKey = getKey(key); - byte[] valueBytes = getValue(jobHistoryKey, value); - byte[] qualifier = Bytes.toBytes(jobHistoryKey.toLowerCase()); - p.add(family, qualifier, valueBytes); + adder.addRecord(new RecordDataKey(jobHistoryKey), getValue(jobHistoryKey, value), true); } - + /** * populates a put for string values * @param {@link Put} p @@ -639,72 +682,13 @@ private void populatePut(Put p, byte[] family, String key, int value) { * @param {@link String} key * @param String value */ - private void populatePut(Put p, byte[] family, String key, String value) { - byte[] valueBytes = null; - valueBytes = Bytes.toBytes(value); - byte[] qualifier = Bytes.toBytes(getKey(key).toLowerCase()); - p.add(family, qualifier, valueBytes); - } - - /** - * populates a put for {@link Counters} - * @param {@link Put} p - * @param {@link Constants} family - * @param String key - * @param String groupName - * @param String counterName - * @param long counterValue - */ - private void populatePut(Put p, byte[] family, String key, String groupName, String counterName, - Long counterValue) { - byte[] counterPrefix = null; - - try { - switch (JobHistoryKeys.valueOf(JobHistoryKeys.class, key)) { - case COUNTERS: - case TOTAL_COUNTERS: - case TASK_COUNTERS: - case TASK_ATTEMPT_COUNTERS: - counterPrefix = Bytes.add(Constants.COUNTER_COLUMN_PREFIX_BYTES, Constants.SEP_BYTES); - break; - case MAP_COUNTERS: - counterPrefix = Bytes.add(Constants.MAP_COUNTER_COLUMN_PREFIX_BYTES, Constants.SEP_BYTES); - break; - case REDUCE_COUNTERS: - counterPrefix = - Bytes.add(Constants.REDUCE_COUNTER_COLUMN_PREFIX_BYTES, Constants.SEP_BYTES); - break; - default: - throw new IllegalArgumentException("Unknown counter type " + key.toString()); - } - } catch (IllegalArgumentException iae) { - throw new ProcessingException("Unknown counter type " + key, iae); - } catch (NullPointerException npe) { - throw new ProcessingException("Null counter type " + key, npe); - } - - byte[] groupPrefix = Bytes.add(counterPrefix, Bytes.toBytes(groupName), Constants.SEP_BYTES); - byte[] qualifier = Bytes.add(groupPrefix, Bytes.toBytes(counterName)); - - /** - * correct and populate map and reduce slot millis - */ - if ((Constants.SLOTS_MILLIS_MAPS.equals(counterName)) || - (Constants.SLOTS_MILLIS_REDUCES.equals(counterName))) { - counterValue = getStandardizedCounterValue(counterName, counterValue); - } - - p.add(family, qualifier, Bytes.toBytes(counterValue)); - + private void populateRecord(String key, String value, RecordAdder adder) { + adder.addRecord(new RecordDataKey(getKey(key).toLowerCase()), value, false); } private long getMemoryMb(String key) { long memoryMb = 0L; - if (Constants.MAP_MEMORY_MB_CONF_KEY.equals(key)){ - memoryMb = this.jobConf.getLong(key, Constants.DEFAULT_MAP_MEMORY_MB); - }else if (Constants.REDUCE_MEMORY_MB_CONF_KEY.equals(key)){ - memoryMb = this.jobConf.getLong(key, Constants.DEFAULT_REDUCE_MEMORY_MB); - } + memoryMb = this.jobConf.getLong(key, 0L); if (memoryMb == 0L) { throw new ProcessingException( "While correcting slot millis, " + key + " was found to be 0 "); @@ -755,7 +739,7 @@ private Long getStandardizedCounterValue(String counterName, Long counterValue) * Returns the Task ID or Task Attempt ID, stripped of the leading job ID, appended to the job row * key. */ - public byte[] getTaskKey(String prefix, String jobNumber, String fullId) { + public TaskKey getTaskKey(String prefix, String jobNumber, String fullId) { String taskComponent = fullId; if (fullId == null) { taskComponent = ""; @@ -765,50 +749,31 @@ public byte[] getTaskKey(String prefix, String jobNumber, String fullId) { taskComponent = fullId.substring(expectedPrefix.length()); } } - return taskKeyConv.toBytes(new TaskKey(this.jobKey, taskComponent)); + return new TaskKey(this.jobKey, taskComponent); } /** * Returns the AM Attempt id stripped of the leading job ID, appended to the job row key. */ - public byte[] getAMKey(String prefix, String fullId) { - + public TaskKey getAMKey(String prefix, String fullId) { String taskComponent = prefix + fullId; - return taskKeyConv.toBytes(new TaskKey(this.jobKey, taskComponent)); + return new TaskKey(this.jobKey, taskComponent); } /** * {@inheritDoc} */ @Override - public List getJobPuts() { - return jobPuts; + public Collection getJobRecords() { + return jobRecords; } /** * {@inheritDoc} */ @Override - public List getTaskPuts() { - return taskPuts; - } - - /** - * utitlity function for printing all puts - */ - public void printAllPuts(List p) { - for (Put p1 : p) { - Map> d = p1.getFamilyMap(); - for (byte[] k : d.keySet()) { - System.out.println(" k " + Bytes.toString(k)); - } - for (List lkv : d.values()) { - for (KeyValue kv : lkv) { - System.out.println("\n row: " + taskKeyConv.fromBytes(kv.getRow()) - + "\n " + Bytes.toString(kv.getQualifier()) + ": " + Bytes.toString(kv.getValue())); - } - } - } + public Collection getTaskRecords() { + return taskRecords; } /** diff --git a/hraven-etl/src/main/java/com/twitter/hraven/etl/ProcessRecordService.java b/hraven-etl/src/main/java/com/twitter/hraven/etl/ProcessRecordService.java index fc067b2..85bdfc9 100644 --- a/hraven-etl/src/main/java/com/twitter/hraven/etl/ProcessRecordService.java +++ b/hraven-etl/src/main/java/com/twitter/hraven/etl/ProcessRecordService.java @@ -148,10 +148,10 @@ public void writeJobRecord(ProcessRecord processRecord) throws IOException { * state. * @throws IOException */ - public ProcessRecord getLastSuccessfulProcessRecord(String cluster) + public ProcessRecord getLastSuccessfulProcessRecord(String cluster, String processFileSubstring) throws IOException { List processRecords = getProcessRecords(cluster, NOT_EQUAL, - CREATED, 1, null); + CREATED, 1, processFileSubstring); if (processRecords.size() > 0) { return processRecords.get(0); } diff --git a/hraven-etl/src/main/java/com/twitter/hraven/etl/Sink.java b/hraven-etl/src/main/java/com/twitter/hraven/etl/Sink.java new file mode 100644 index 0000000..ec70048 --- /dev/null +++ b/hraven-etl/src/main/java/com/twitter/hraven/etl/Sink.java @@ -0,0 +1,33 @@ +package com.twitter.hraven.etl; + +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; +import com.twitter.hraven.mapreduce.GraphiteOutputFormat; +import com.twitter.hraven.mapreduce.HbaseOutputFormat; +import com.twitter.hraven.mapreduce.JobFileTableMapper; + +public enum Sink { + + HBASE { + + @Override + public void configureJob(Job job) { + MultipleOutputs.addNamedOutput(job, name(), HbaseOutputFormat.class, + JobFileTableMapper.getOutputKeyClass(), JobFileTableMapper.getOutputValueClass()); + } + + }, + + GRAPHITE { + + @Override + public void configureJob(Job job) { + MultipleOutputs.addNamedOutput(job, name(), GraphiteOutputFormat.class, + JobFileTableMapper.getOutputKeyClass(), JobFileTableMapper.getOutputValueClass()); + } + + }; + + public abstract void configureJob(Job job); + +} diff --git a/hraven-etl/src/main/java/com/twitter/hraven/mapreduce/GraphiteHistoryWriter.java b/hraven-etl/src/main/java/com/twitter/hraven/mapreduce/GraphiteHistoryWriter.java new file mode 100644 index 0000000..7e0b294 --- /dev/null +++ b/hraven-etl/src/main/java/com/twitter/hraven/mapreduce/GraphiteHistoryWriter.java @@ -0,0 +1,366 @@ +package com.twitter.hraven.mapreduce; + +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Calendar; +import java.util.Date; +import java.util.List; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.apache.commons.lang.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.util.Bytes; + +import com.kenai.jffi.Array; +import com.twitter.hraven.Constants; +import com.twitter.hraven.Framework; +import com.twitter.hraven.HravenService; +import com.twitter.hraven.JobHistoryKeys; +import com.twitter.hraven.JobHistoryRecordCollection; +import com.twitter.hraven.JobHistoryRecord; +import com.twitter.hraven.JobKey; +import com.twitter.hraven.RecordCategory; +import com.twitter.hraven.RecordDataKey; +import com.twitter.hraven.datasource.JobKeyConverter; +import com.twitter.hraven.util.ByteUtil; + +public class GraphiteHistoryWriter { + + private static Log LOG = LogFactory.getLog(GraphiteHistoryWriter.class); + + private final Pattern APPID_PATTERN_OOZIE_LAUNCHER = Pattern.compile("oozie:launcher:T=(.*):W=(.*):A=(.*):ID=(.*)"); + private final Pattern APPID_PATTERN_OOZIE_ACTION = Pattern.compile("oozie:action:T=(.*):W=(.*):A=(.*):ID=[0-9]{7}-[0-9]{15}-oozie-oozi-W(.*)"); + private final Pattern APPID_PATTERN_PIGJOB = Pattern.compile("PigLatin:(.*).pig"); + private static final String GRAPHITE_KEY_FILTER = "[./\\\\\\s,]"; + private static final int PIG_ALIAS_FINGERPRINT_LENGTH = 100; + + private static final String submitTimeKey = JobHistoryKeys.SUBMIT_TIME.toString(); + private static final String launchTimeKey = JobHistoryKeys.LAUNCH_TIME.toString(); + private static final String finishTimeKey = JobHistoryKeys.FINISH_TIME.toString(); + private static final String totalTimeKey = "total_time"; + private static final String runTimeKey = "run_time"; + + private HravenService service; + private JobHistoryRecordCollection recordCollection; + private String prefix; + private StringBuilder lines; + private List userFilter; + private List queueFilter; + private List excludedComponents; + private List doNotExcludeApps; + + private HTable keyMappingTable; + private HTable reverseKeyMappingTable; + + /** + * Writes a single {@link JobHistoryRecord} to the specified {@link HravenService} Passes the + * large multi record of which this record is a part of, so that we can get other contextual + * attributes to use in the graphite metric naming scheme + * @param graphiteKeyMappingTable + * @param serviceKey + * @param userFilter + * @param doNotExcludeApps + * @param jobRecord + * @param multiRecord + * @throws IOException + * @throws InterruptedException + */ + + public GraphiteHistoryWriter(HTable keyMappingTable, HTable reverseKeyMappingTable, String prefix, HravenService serviceKey, + JobHistoryRecordCollection recordCollection, StringBuilder sb, String userFilter, String queueFilter, String excludedComponents, String doNotExcludeApps) { + this.keyMappingTable = keyMappingTable; + this.reverseKeyMappingTable = reverseKeyMappingTable; + this.service = serviceKey; + this.recordCollection = recordCollection; + this.prefix = prefix; + this.lines = sb; + if (StringUtils.isNotEmpty(userFilter)) + this.userFilter = Arrays.asList(userFilter.split(",")); + if (StringUtils.isNotEmpty(queueFilter)) + this.queueFilter = Arrays.asList(queueFilter.split(",")); + if (StringUtils.isNotEmpty(excludedComponents)) + this.excludedComponents = Arrays.asList(excludedComponents.split(",")); + if (StringUtils.isNotEmpty(doNotExcludeApps)) + this.doNotExcludeApps = Arrays.asList(doNotExcludeApps.split(",")); + } + + public int write() throws IOException { + /* + * Send metrics in the format {PREFIX}.{cluster}.{user}.{appId}.{subAppId} {value} + * {submit_time} subAppId is formed differently for each framework. For pig, its the alias + * names and feature used in the job. appId will be parsed with a bunch of known patterns + * (oozie launcher jobs, pig jobs, etc.) + */ + + int lineCount = 0; + + boolean inDoNotExcludeApps = false; + + if (doNotExcludeApps != null) { + for (String appStr: this.doNotExcludeApps) { + if (recordCollection.getKey().getAppId().indexOf(appStr) != -1) { + inDoNotExcludeApps = true; + break; + } + } + } + + if ( + // exclude from further filters if appId matches list of doNotExcludeApps substrings + inDoNotExcludeApps || + + // or it must pass the user and queue filters, if not null + ( (userFilter == null || userFilter.contains(recordCollection.getKey().getUserName())) && + (queueFilter == null || queueFilter.contains(recordCollection.getValue(RecordCategory.CONF_META, + new RecordDataKey(Constants.HRAVEN_QUEUE)))) ) + ) { + + Framework framework = getFramework(recordCollection); + String metricsPathPrefix; + + String pigAliasFp = getPigAliasFingerprint(recordCollection); + String genAppId = genAppId(recordCollection, recordCollection.getKey().getAppId()); + + if (genAppId == null) { + genAppId = recordCollection.getKey().getAppId(); + LOG.error("Generated appId is null for app " + recordCollection.getKey().toString()); + } + + if (framework == Framework.PIG && pigAliasFp != null) { + // TODO: should ideally include app version too, but PIG-2587's pig.logical.plan.signature + // which hraven uses was available only from pig 0.11 + + metricsPathPrefix = + generatePathPrefix(prefix, + recordCollection.getKey().getCluster(), + recordCollection.getKey().getUserName(), + genAppId, + pigAliasFp) + .toString(); + } else { + metricsPathPrefix = + generatePathPrefix(prefix, + recordCollection.getKey().getCluster(), + recordCollection.getKey().getUserName(), + genAppId) + .toString(); + } + + try { + storeAppIdMapping(metricsPathPrefix); + } catch (IOException e) { + LOG.error("Failed to store mapping for app " + recordCollection.getKey().getAppId() + + " to '" + metricsPathPrefix + "'"); + } + + // Round the timestamp to second as Graphite accepts it in such + // a format. + int timestamp = Math.round(recordCollection.getSubmitTime() / 1000); + + // For now, relies on receiving job history and job conf as part of the same + // JobHistoryMultiRecord + for (JobHistoryRecord jobRecord : recordCollection) { + if (service == HravenService.JOB_HISTORY + && (jobRecord.getDataCategory() == RecordCategory.HISTORY_COUNTER || jobRecord + .getDataCategory() == RecordCategory.INFERRED) + && !(jobRecord.getDataKey().get(0).equalsIgnoreCase(submitTimeKey) + || jobRecord.getDataKey().get(0).equalsIgnoreCase(launchTimeKey) || jobRecord.getDataKey() + .get(0).equalsIgnoreCase(finishTimeKey))) { + + StringBuilder line = new StringBuilder(); + line.append(metricsPathPrefix); + + boolean ignoreRecord = false; + for (String comp : jobRecord.getDataKey().getComponents()) { + if (excludedComponents != null && excludedComponents.contains(comp)) { + ignoreRecord = true; + LOG.info("Excluding component '" + jobRecord.getDataKey().toString() + "' of app " + jobRecord.getKey().toString()); + break; + } + line.append(".").append(sanitize(comp)); + } + + if (ignoreRecord) + continue; + else + lineCount++; + + line.append(" ").append(jobRecord.getDataValue()).append(" ") + .append(timestamp).append("\n"); + lines.append(line); + } + } + + //handle run times + Long finishTime = (Long)recordCollection.getValue(RecordCategory.HISTORY_COUNTER, new RecordDataKey(finishTimeKey)); + if (finishTime == null) + finishTime = (Long)recordCollection.getValue(RecordCategory.HISTORY_COUNTER, new RecordDataKey(finishTimeKey.toLowerCase())); + Long launchTime = (Long)recordCollection.getValue(RecordCategory.HISTORY_COUNTER, new RecordDataKey(launchTimeKey)); + if (launchTime == null) + launchTime = (Long)recordCollection.getValue(RecordCategory.HISTORY_COUNTER, new RecordDataKey(launchTimeKey.toLowerCase())); + + if (finishTime != null && recordCollection.getSubmitTime() != null) { + lines.append(metricsPathPrefix + ".").append(totalTimeKey + " " + (finishTime-recordCollection.getSubmitTime()) + " " + timestamp + "\n"); + lineCount++; + } + + if (finishTime != null && launchTime != null) { + lines.append(metricsPathPrefix + ".").append(runTimeKey + " " + (finishTime-launchTime) + " " + timestamp + "\n"); + lineCount++; + } + } + + return lineCount; + } + + private void storeAppIdMapping(String metricsPathPrefix) throws IOException { + Put put = new Put(new JobKeyConverter().toBytes(recordCollection.getKey())); + put.add(Constants.INFO_FAM_BYTES, Constants.GRAPHITE_KEY_MAPPING_COLUMN_BYTES, Bytes.toBytes(metricsPathPrefix)); + keyMappingTable.put(put); + + put = new Put(Bytes.toBytes(metricsPathPrefix)); + + byte[] appIdBytes = ByteUtil.join(Constants.SEP_BYTES, + Bytes.toBytes(recordCollection.getKey().getCluster()), + Bytes.toBytes(recordCollection.getKey().getUserName()), + Bytes.toBytes(recordCollection.getKey().getAppId())); + + put.add(Constants.INFO_FAM_BYTES, Constants.GRAPHITE_KEY_MAPPING_COLUMN_BYTES, appIdBytes); + reverseKeyMappingTable.put(put); + } + + private String getPigAliasFingerprint(JobHistoryRecordCollection recordCollection) { + Object aliasRec = recordCollection.getValue(RecordCategory.CONF, new RecordDataKey("pig.alias")); + Object featureRec = recordCollection.getValue(RecordCategory.CONF, new RecordDataKey("pig.job.feature")); + + String alias = null; + String feature = null; + + if (aliasRec != null) { + alias = (String) aliasRec; + } + + if (featureRec != null) { + feature = (String) featureRec; + } + + if (alias != null) { + return (feature != null ? feature + ":" : "") + + StringUtils.abbreviate(alias, PIG_ALIAS_FINGERPRINT_LENGTH); + } + + return null; + } + + private String genAppId(JobHistoryRecordCollection recordCollection, String appId) { + String oozieActionName = getOozieActionName(recordCollection); + + if (getFramework(recordCollection) == Framework.PIG && APPID_PATTERN_PIGJOB.matcher(appId).matches()) { + // pig:{oozie-action-name}:{pigscript} + if (oozieActionName != null) { + appId = APPID_PATTERN_PIGJOB.matcher(appId).replaceAll("pig:" + oozieActionName + ":$1"); + } else { + appId = APPID_PATTERN_PIGJOB.matcher(appId).replaceAll("pig:$1"); + } + } else if (APPID_PATTERN_OOZIE_LAUNCHER.matcher(appId).matches()) { + // ozl:{oozie-workflow-name} + appId = APPID_PATTERN_OOZIE_LAUNCHER.matcher(appId).replaceAll("ozl:$1:$2:$3"); + } else if (APPID_PATTERN_OOZIE_ACTION.matcher(appId).matches()) { + // oza:{oozie-workflow-name}:{oozie-action-name} + appId = APPID_PATTERN_OOZIE_ACTION.matcher(appId).replaceAll("oza:$1:$2:$3:$4"); + } + + return appId; + } + + private Framework getFramework(JobHistoryRecordCollection recordCollection) { + Object rec = + recordCollection.getValue(RecordCategory.CONF_META, new RecordDataKey(Constants.FRAMEWORK_COLUMN)); + + if (rec != null) { + return Framework.valueOf((String) rec); + } + + return null; + } + + private String getOozieActionName(JobHistoryRecordCollection recordCollection) { + Object rec = recordCollection.getValue(RecordCategory.CONF, new RecordDataKey("oozie.action.id")); + + if (rec != null) { + String actionId = ((String) rec); + return actionId.substring(actionId.indexOf("@") + 1, actionId.length()); + } + + return null; + } + + /** + * Util method to generate metrix path prefix + * @return + * @throws UnsupportedEncodingException + */ + + private StringBuilder generatePathPrefix(String... args) { + StringBuilder prefix = new StringBuilder(); + boolean first = true; + for (String arg : args) { + if (!first) { + prefix.append("."); + } + + prefix.append(sanitize(arg)); + first = false; + } + return prefix; + } + + /** + * Util method to sanitize metrics for sending to graphite E.g. remove periods ("."), etc. + * @throws UnsupportedEncodingException + */ + public static String sanitize(String s) { + return s.replaceAll(GRAPHITE_KEY_FILTER, "_"); + } + + /** + * Output the {@link JobHistoryRecord} received in debug log + * @param serviceKey + * @param jobRecord + */ + + public static void logRecord(HravenService serviceKey, JobHistoryRecord jobRecord) { + StringBuilder line = new StringBuilder(); + String seperator = ", "; + String seperator2 = "|"; + + line.append("Service: " + serviceKey.name()); + + JobKey key = jobRecord.getKey(); + line.append(seperator).append("Cluster: " + key.getCluster()); + line.append(seperator).append("User: " + key.getUserName()); + line.append(seperator).append("AppId: " + key.getAppId()); + line.append(seperator).append("RunId: " + key.getRunId()); + line.append(seperator).append("JobId: " + key.getJobId()); + + line.append(seperator2); + line.append(seperator).append("Category: " + jobRecord.getDataCategory().name()); + + line.append(seperator).append("Key: "); + for (String comp : jobRecord.getDataKey().getComponents()) { + line.append(comp).append("."); + } + + line.append(seperator).append("Value: " + jobRecord.getDataValue()); + line.append(seperator).append("SubmitTime: " + jobRecord.getSubmitTime()); + + LOG.debug(line); + } +} diff --git a/hraven-etl/src/main/java/com/twitter/hraven/mapreduce/GraphiteOutputFormat.java b/hraven-etl/src/main/java/com/twitter/hraven/mapreduce/GraphiteOutputFormat.java new file mode 100644 index 0000000..7fbf31c --- /dev/null +++ b/hraven-etl/src/main/java/com/twitter/hraven/mapreduce/GraphiteOutputFormat.java @@ -0,0 +1,198 @@ +package com.twitter.hraven.mapreduce; + +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.io.PrintWriter; +import java.io.UnsupportedEncodingException; +import java.io.Writer; +import java.net.Socket; +import java.net.URLEncoder; +import java.util.regex.Pattern; + +import org.apache.commons.lang.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.OutputFormat; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +import com.twitter.hraven.Constants; +import com.twitter.hraven.HravenRecord; +import com.twitter.hraven.HravenService; +import com.twitter.hraven.JobHistoryRecordCollection; +import com.twitter.hraven.JobHistoryRecord; +import com.twitter.hraven.JobKey; +import com.twitter.hraven.RecordCategory; +import com.twitter.hraven.util.EnumWritable; + +/** + * @author angad.singh {@link OutputFormat} for sending metrics to graphite + */ + +public class GraphiteOutputFormat extends OutputFormat, HravenRecord> { + + private static Log LOG = LogFactory.getLog(GraphiteOutputFormat.class); + private static Writer writer; + + /** + * {@link OutputCommitter} which does nothing + */ + protected static class GraphiteOutputCommitter extends OutputCommitter { + + @Override + public void setupJob(JobContext jobContext) throws IOException { + } + + @Override + public void setupTask(TaskAttemptContext taskContext) throws IOException { + } + + @Override + public boolean needsTaskCommit(TaskAttemptContext taskContext) throws IOException { + return false; + } + + @Override + public void commitTask(TaskAttemptContext taskContext) throws IOException { + } + + @Override + public void abortTask(TaskAttemptContext taskContext) throws IOException { + } + + } + + protected static class GraphiteRecordWriter extends RecordWriter, HravenRecord> { + + // prepend this prefix to all metrics + private String METRIC_PREFIX; + + // filter jobs not submitted by this user + private String userFilter; + + // filter jobs not submitted in this queue + private String queueFilter; + + // exclude these metric path components (e.g MultiInputCounters - create a lot of redundant tree + // paths, and you wouldn't want to send them to graphite) + private String excludedComponents; + + // comma seperated list of app substrings to prevent from being excluded after above filters + private String doNotExcludeApps; + + private HTable keyMappingTable; + private HTable reverseKeyMappingTable; + + + public GraphiteRecordWriter(Configuration hbaseconfig, String host, int port, String prefix, String userFilter, String queueFilter, String excludedComponents, String doNotExcludeApps) throws IOException { + this.METRIC_PREFIX = prefix; + this.userFilter = userFilter; + this.queueFilter = queueFilter; + this.excludedComponents = excludedComponents; + this.doNotExcludeApps = doNotExcludeApps; + + keyMappingTable = new HTable(hbaseconfig, Constants.GRAPHITE_KEY_MAPPING_TABLE_BYTES); + keyMappingTable.setAutoFlush(false); + + reverseKeyMappingTable = new HTable(hbaseconfig, Constants.GRAPHITE_REVERSE_KEY_MAPPING_TABLE_BYTES); + reverseKeyMappingTable.setAutoFlush(false); + + try { + // Open an connection to Graphite server. + Socket socket = new Socket(host, port); + writer = new OutputStreamWriter(socket.getOutputStream()); + } catch (Exception e) { + throw new IOException("Error connecting to graphite, " + host + ":" + port, e); + } + } + + /** + * Split a {@link JobHistoryRecordCollection} into {@link JobHistoryRecord}s and call the + * {@link #writeRecord(HravenService, JobHistoryRecord)} method + */ + + @Override + public void write(EnumWritable serviceKey, HravenRecord value) throws IOException, + InterruptedException { + HravenService service = serviceKey.getValue(); + JobHistoryRecordCollection recordCollection; + + if (value instanceof JobHistoryRecordCollection) { + recordCollection = (JobHistoryRecordCollection) value; + } else { + recordCollection = new JobHistoryRecordCollection((JobHistoryRecord) value); + } + + StringBuilder output = new StringBuilder(); + int lines = 0; + + try { + GraphiteHistoryWriter graphiteWriter = + new GraphiteHistoryWriter(keyMappingTable, reverseKeyMappingTable, METRIC_PREFIX, service, recordCollection, output, userFilter, queueFilter, excludedComponents, doNotExcludeApps); + lines = graphiteWriter.write(); + } catch (Exception e) { + LOG.error("Error generating metrics for graphite", e); + } + + if (output.length() > 0) { + + try { + LOG.info("SendToGraphite: " + recordCollection.getKey().toString() + " : " + lines + " metrics"); + writer.write(output.toString()); + } catch (Exception e) { + LOG.error("Error sending metrics to graphite", e); + throw new IOException("Error sending metrics", e); + } + } + } + + @Override + public void close(TaskAttemptContext context) throws IOException, InterruptedException { + try { + LOG.info("flushing records and closing writer"); + writer.close(); + } catch (Exception e) { + throw new IOException("Error flush metrics to graphite", e); + } + keyMappingTable.close(); + reverseKeyMappingTable.close(); + } + } + + @Override + public void checkOutputSpecs(JobContext arg0) throws IOException, InterruptedException { + } + + @Override + public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, + InterruptedException { + return new GraphiteOutputCommitter(); + } + + /** + * Output a custom {@link GraphiteRecordWriter} to send metrics to graphite + */ + @Override + public RecordWriter, HravenRecord> getRecordWriter(TaskAttemptContext context) + throws IOException, InterruptedException { + Configuration conf = context.getConfiguration(); + return new GraphiteRecordWriter(HBaseConfiguration.create(conf), + conf.get(Constants.JOBCONF_GRAPHITE_HOST_KEY, Constants.GRAPHITE_DEFAULT_HOST), + conf.getInt(Constants.JOBCONF_GRAPHITE_PORT_KEY, Constants.GRAPHITE_DEFAULT_PORT), + conf.get(Constants.JOBCONF_GRAPHITE_PREFIX, Constants.GRAPHITE_DEFAULT_PREFIX), + conf.get(Constants.JOBCONF_GRAPHITE_USER_FILTER), + conf.get(Constants.JOBCONF_GRAPHITE_QUEUE_FILTER), + conf.get(Constants.JOBCONF_GRAPHITE_EXCLUDED_COMPONENTS), + conf.get(Constants.JOBCONF_GRAPHITE_DONOTEXCLUDE_APPS) + ); + } + +} diff --git a/hraven-etl/src/main/java/com/twitter/hraven/mapreduce/HbaseHistoryWriter.java b/hraven-etl/src/main/java/com/twitter/hraven/mapreduce/HbaseHistoryWriter.java new file mode 100644 index 0000000..bd105b7 --- /dev/null +++ b/hraven-etl/src/main/java/com/twitter/hraven/mapreduce/HbaseHistoryWriter.java @@ -0,0 +1,105 @@ +package com.twitter.hraven.mapreduce; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.util.Bytes; +import com.twitter.hraven.Constants; +import com.twitter.hraven.HravenRecord; +import com.twitter.hraven.JobHistoryKeys; +import com.twitter.hraven.JobHistoryRecord; +import com.twitter.hraven.RecordCategory; +import com.twitter.hraven.datasource.ProcessingException; + +public class HbaseHistoryWriter { + private static Log LOG = LogFactory.getLog(HbaseHistoryWriter.class); + + public static void addHistoryPuts(HravenRecord record, Put p) { + byte[] family = Constants.INFO_FAM_BYTES; + + JobHistoryKeys dataKey = null; + if (record.getDataKey() != null && record.getDataKey().get(0) != null) + try { + dataKey = JobHistoryKeys.valueOf(record.getDataKey().get(0)); + } catch (IllegalArgumentException e) { + // some keys other than JobHistoryKeys were added by + // JobHistoryListener. Ignore this exception. + } + + if (dataKey == null) { + byte[] qualifier = null; + if (record.getDataCategory() == RecordCategory.CONF) { + byte[] jobConfColumnPrefix = + Bytes.add(Constants.JOB_CONF_COLUMN_PREFIX_BYTES, Constants.SEP_BYTES); + qualifier = Bytes.add(jobConfColumnPrefix, Bytes.toBytes(record.getDataKey().toString())); + } else { + qualifier = Bytes.toBytes(record.getDataKey().toString().toLowerCase()); + } + + byte[] valueBytes = null; + + if (record.getDataValue() instanceof Long) { + valueBytes = Bytes.toBytes((Long)record.getDataValue()); + } else if (record.getDataValue() instanceof Double) { + valueBytes = Bytes.toBytes((Double)record.getDataValue()); + } else { + valueBytes = Bytes.toBytes(record.getDataValue().toString()); + } + + Bytes.toBytes(record.getDataValue().toString()); + p.add(family, qualifier, valueBytes); + } else if (dataKey == JobHistoryKeys.COUNTERS || dataKey == JobHistoryKeys.MAP_COUNTERS + || dataKey == JobHistoryKeys.REDUCE_COUNTERS) { + + String group = record.getDataKey().get(1); + String counterName = record.getDataKey().get(2); + byte[] counterPrefix = null; + + try { + switch (dataKey) { + case COUNTERS: + case TOTAL_COUNTERS: + case TASK_COUNTERS: + case TASK_ATTEMPT_COUNTERS: + counterPrefix = Bytes.add(Constants.COUNTER_COLUMN_PREFIX_BYTES, Constants.SEP_BYTES); + break; + case MAP_COUNTERS: + counterPrefix = Bytes.add(Constants.MAP_COUNTER_COLUMN_PREFIX_BYTES, Constants.SEP_BYTES); + case REDUCE_COUNTERS: + counterPrefix = + Bytes.add(Constants.REDUCE_COUNTER_COLUMN_PREFIX_BYTES, Constants.SEP_BYTES); + default: + throw new IllegalArgumentException("Unknown counter type " + dataKey.toString()); + } + } catch (IllegalArgumentException iae) { + throw new ProcessingException("Unknown counter type " + dataKey, iae); + } catch (NullPointerException npe) { + throw new ProcessingException("Null counter type " + dataKey, npe); + } + + byte[] groupPrefix = Bytes.add(counterPrefix, Bytes.toBytes(group), Constants.SEP_BYTES); + byte[] qualifier = Bytes.add(groupPrefix, Bytes.toBytes(counterName)); + + p.add(family, qualifier, Bytes.toBytes((Long) record.getDataValue())); + } else { + @SuppressWarnings("rawtypes") + Class clazz = JobHistoryKeys.KEY_TYPES.get(dataKey); + byte[] valueBytes = null; + + if (Integer.class.equals(clazz)) { + valueBytes = + (Integer) record.getDataValue() == 0 ? Constants.ZERO_INT_BYTES : Bytes + .toBytes((Integer) record.getDataValue()); + } else if (Long.class.equals(clazz)) { + valueBytes = + (Long) record.getDataValue() == 0 ? Constants.ZERO_LONG_BYTES : Bytes + .toBytes((Long) record.getDataValue()); + } else { + // keep the string representation by default + valueBytes = Bytes.toBytes((String) record.getDataValue()); + } + byte[] qualifier = Bytes.toBytes(dataKey.toString().toLowerCase()); + p.add(family, qualifier, valueBytes); + } + } +} diff --git a/hraven-etl/src/main/java/com/twitter/hraven/mapreduce/HbaseOutputFormat.java b/hraven-etl/src/main/java/com/twitter/hraven/mapreduce/HbaseOutputFormat.java new file mode 100644 index 0000000..2511c53 --- /dev/null +++ b/hraven-etl/src/main/java/com/twitter/hraven/mapreduce/HbaseOutputFormat.java @@ -0,0 +1,130 @@ +package com.twitter.hraven.mapreduce; + +import java.io.IOException; +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.mapreduce.MultiTableOutputFormat; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.OutputFormat; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +import com.twitter.hraven.Constants; +import com.twitter.hraven.HravenRecord; +import com.twitter.hraven.HravenService; +import com.twitter.hraven.JobHistoryRecordCollection; +import com.twitter.hraven.JobHistoryRecord; +import com.twitter.hraven.JobHistoryTaskRecord; +import com.twitter.hraven.RecordCategory; +import com.twitter.hraven.RecordDataKey; +import com.twitter.hraven.TaskKey; +import com.twitter.hraven.datasource.JobKeyConverter; +import com.twitter.hraven.datasource.TaskKeyConverter; +import com.twitter.hraven.util.EnumWritable; + +/** + * @author angad.singh Wrapper around Hbase's {@link MultiTableOutputFormat} Converts + * {@link HravenRecords} to Hbase {@link Put}s and writes them to {@link HTable}s + * corresponding to {@link HravenService} + */ + +public class HbaseOutputFormat extends OutputFormat, HravenRecord> { + + protected static class HravenHbaseRecordWriter extends RecordWriter, HravenRecord> { + + private RecordWriter recordWriter; + + public HravenHbaseRecordWriter(RecordWriter recordWriter) { + this.recordWriter = recordWriter; + } + + /** + * Writes a single {@link HravenRecord} to the specified {@link HravenService} + * @param serviceKey + * @param value + * @throws IOException + * @throws InterruptedException + */ + private void writeRecord(HravenService serviceKey, HravenRecord value) + throws IOException, InterruptedException { + ImmutableBytesWritable table = null; + Put put = null; + + switch (serviceKey) { + case JOB_HISTORY: + JobHistoryRecord rec = (JobHistoryRecord) value; + put = new Put(new JobKeyConverter().toBytes(rec.getKey())); + table = new ImmutableBytesWritable(Constants.HISTORY_TABLE_BYTES); + HbaseHistoryWriter.addHistoryPuts(rec, put); + break; + case JOB_HISTORY_TASK: + JobHistoryTaskRecord taskRec = (JobHistoryTaskRecord) value; + put = new Put(new TaskKeyConverter().toBytes((TaskKey) taskRec.getKey())); + table = new ImmutableBytesWritable(Constants.HISTORY_TASK_TABLE_BYTES); + HbaseHistoryWriter.addHistoryPuts(taskRec, put); + break; + } + + recordWriter.write(table, put); + } + + /** + * Split a {@link JobHistoryRecordCollection} into {@link JobHistoryRecord}s and call the + * {@link #writeRecord(HravenService, JobHistoryRecord)} method + */ + + @Override + public void write(EnumWritable serviceKey, HravenRecord value) throws IOException, + InterruptedException { + HravenService service = serviceKey.getValue(); + if (value instanceof JobHistoryRecordCollection) { + for (JobHistoryRecord record : (JobHistoryRecordCollection) value) { + writeRecord(service, record); + } + } else { + writeRecord(service, value); + } + } + + @Override + public void close(TaskAttemptContext context) throws IOException, InterruptedException { + recordWriter.close(context); + } + } + + private MultiTableOutputFormat outputFormat; + + /** + * Wrap around Hbase's {@link MultiTableOutputFormat} + */ + public HbaseOutputFormat() { + this.outputFormat = new MultiTableOutputFormat(); + } + + /** + * Wrap around {@link MultiTableOutputFormat}'s {@link MultiTableRecordWriter} + */ + @Override + public RecordWriter, HravenRecord> getRecordWriter(TaskAttemptContext context) + throws IOException, InterruptedException { + return new HravenHbaseRecordWriter(outputFormat.getRecordWriter(context)); + } + + @Override + public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException { + outputFormat.checkOutputSpecs(context); + } + + @Override + public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, + InterruptedException { + return outputFormat.getOutputCommitter(context); + } +} diff --git a/hraven-etl/src/main/java/com/twitter/hraven/mapreduce/JobFileTableMapper.java b/hraven-etl/src/main/java/com/twitter/hraven/mapreduce/JobFileTableMapper.java index b6f7112..6804c32 100644 --- a/hraven-etl/src/main/java/com/twitter/hraven/mapreduce/JobFileTableMapper.java +++ b/hraven-etl/src/main/java/com/twitter/hraven/mapreduce/JobFileTableMapper.java @@ -19,9 +19,14 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; +import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.Properties; +import javax.annotation.Nullable; + +import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -33,14 +38,23 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; + +import com.google.common.base.Function; +import com.google.common.collect.Collections2; import com.twitter.hraven.Constants; +import com.twitter.hraven.HravenRecord; +import com.twitter.hraven.HravenService; import com.twitter.hraven.JobDesc; import com.twitter.hraven.JobDescFactory; +import com.twitter.hraven.JobHistoryRecordCollection; +import com.twitter.hraven.JobHistoryRecord; +import com.twitter.hraven.JobHistoryTaskRecord; import com.twitter.hraven.JobKey; import com.twitter.hraven.QualifiedJobId; +import com.twitter.hraven.RecordCategory; +import com.twitter.hraven.RecordDataKey; import com.twitter.hraven.datasource.AppVersionService; import com.twitter.hraven.datasource.JobHistoryByIdService; import com.twitter.hraven.datasource.JobHistoryRawService; @@ -53,18 +67,21 @@ import com.twitter.hraven.etl.JobHistoryFileParserBase; import com.twitter.hraven.etl.JobHistoryFileParserFactory; import com.twitter.hraven.etl.ProcessRecordService; +import com.twitter.hraven.etl.Sink; +import com.twitter.hraven.util.EnumWritable; /** * Takes in results from a scan from {@link ProcessRecordService * @getHistoryRawTableScan}, process both job file and history file and emit out - * as puts for the {@link Constants#HISTORY_TABLE} + * as {@link HravenService}, {@link HravenRecord}. {@link Sink}s will process these + * and convert to their respective storage formats *

* As a side-affect we'll load an index record into the * {@link Constants#HISTORY_BY_JOBID_TABLE} as well. * */ public class JobFileTableMapper extends - TableMapper { + TableMapper { private static Log LOG = LogFactory.getLog(JobFileTableMapper.class); @@ -76,6 +93,14 @@ public class JobFileTableMapper extends Constants.HISTORY_RAW_TABLE_BYTES); private static JobKeyConverter jobKeyConv = new JobKeyConverter(); + private MultipleOutputs mos; + + /** + * Define the sinks to output to + */ + + private ArrayList sinks; + /** * Used to create secondary index. */ @@ -96,46 +121,68 @@ public class JobFileTableMapper extends /** * @return the key class for the job output data. */ - public static Class> getOutputKeyClass() { - return ImmutableBytesWritable.class; + public static Class getOutputKeyClass() { + return EnumWritable.class; } /** * @return the value class for the job output data. */ - public static Class getOutputValueClass() { - return Put.class; + public static Class getOutputValueClass() { + return HravenRecord.class; } @Override protected void setup( - Mapper.Context context) + Mapper.Context context) throws java.io.IOException, InterruptedException { Configuration myConf = context.getConfiguration(); jobHistoryByIdService = new JobHistoryByIdService(myConf); appVersionService = new AppVersionService(myConf); rawService = new JobHistoryRawService(myConf); - keyCount = 0; + + sinks = + new ArrayList(Collections2.transform(Arrays.asList(StringUtils.split(context + .getConfiguration().get(Constants.JOBCONF_SINKS), ',')), new Function() { + + @Override + @Nullable + public Sink apply(@Nullable String input) { + return Sink.valueOf(input); + } + })); + + mos = new MultipleOutputs(context); } @Override protected void map( ImmutableBytesWritable key, Result value, - Mapper.Context context) + Mapper.Context context) throws java.io.IOException, InterruptedException { keyCount++; boolean success = true; QualifiedJobId qualifiedJobId = null; try { + + + /** + * STEP 0 + * + * init and extract meaningful data out of raw value + * + * **/ + qualifiedJobId = rawService.getQualifiedJobIdFromResult(value); context.progress(); Configuration jobConf = rawService.createConfigurationFromResult(value); context.progress(); + //figure out submit time byte[] jobhistoryraw = rawService.getJobHistoryRawFromResult(value); long submitTimeMillis = JobHistoryFileParserBase.getSubmitTimeMillisFromJobHistory( jobhistoryraw); @@ -151,22 +198,29 @@ protected void map( Put submitTimePut = rawService.getJobSubmitTimePut(value.getRow(), submitTimeMillis); - context.write(RAW_TABLE, submitTimePut); - + + rawService.getTable().put(submitTimePut); + + /** + * STEP 1 + * + * process and extract job xml/conf + * + * **/ JobDesc jobDesc = JobDescFactory.createJobDesc(qualifiedJobId, submitTimeMillis, jobConf); JobKey jobKey = new JobKey(jobDesc); context.progress(); - + // TODO: remove sysout String msg = "JobDesc (" + keyCount + "): " + jobDesc + " submitTimeMillis: " + submitTimeMillis; LOG.info(msg); - List puts = JobHistoryService.getHbasePuts(jobDesc, jobConf); + JobHistoryRecordCollection confRecordCollection = JobHistoryService.getConfRecord(jobDesc, jobConf); - LOG.info("Writing " + puts.size() + " JobConf puts to " - + Constants.HISTORY_TABLE); + LOG.info("Sending JobConf records to " + + HravenService.JOB_HISTORY + " service"); // TODO: // For Scalding just convert the flowID as a Hex number. Use that for the @@ -176,13 +230,18 @@ protected void map( // Scan should get the first (lowest job-id) then grab the start-time from // the Job. - // Emit the puts - for (Put put : puts) { - context.write(JOB_TABLE, put); - context.progress(); - } + //1.1 Emit the records for job xml/conf/"JobDesc" + //Don't sink config seperately - merge with all other records and then sink + //sink(HravenService.JOB_HISTORY, confRecord); + context.progress(); - // Write secondary index(es) + /** + * STEP 2 + * + * Write secondary index(es) + * + * **/ + LOG.info("Writing secondary indexes"); jobHistoryByIdService.writeIndexes(jobKey); context.progress(); @@ -190,6 +249,14 @@ protected void map( jobDesc.getAppId(), jobDesc.getVersion(), jobDesc.getRunId()); context.progress(); + /** + * STEP 3 + * + * Process and extact actual job history + * + * **/ + + //3.1: get job history KeyValue keyValue = value.getColumnLatest(Constants.RAW_FAM_BYTES, Constants.JOBHISTORY_COL_BYTES); @@ -200,44 +267,25 @@ protected void map( } else { historyFileContents = keyValue.getValue(); } + + //3.2: parse job history JobHistoryFileParser historyFileParser = JobHistoryFileParserFactory .createJobHistoryFileParser(historyFileContents, jobConf); historyFileParser.parse(historyFileContents, jobKey); context.progress(); - puts = historyFileParser.getJobPuts(); - if (puts == null) { - throw new ProcessingException( - " Unable to get job puts for this record!" + jobKey); - } - LOG.info("Writing " + puts.size() + " Job puts to " - + Constants.HISTORY_TABLE); - - // Emit the puts - for (Put put : puts) { - context.write(JOB_TABLE, put); - // TODO: we should not have to do this, but need to confirm that - // TableRecordWriter does this for us. - context.progress(); - } - - puts = historyFileParser.getTaskPuts(); - if (puts == null) { - throw new ProcessingException( - " Unable to get task puts for this record!" + jobKey); - } - LOG.info("Writing " + puts.size() + " Job puts to " - + Constants.HISTORY_TASK_TABLE); - - for (Put put : puts) { - context.write(TASK_TABLE, put); - // TODO: we should not have to do this, but need to confirm that - // TableRecordWriter does this for us. - context.progress(); - } + //3.3: get and write job related data + JobHistoryRecordCollection jobHistoryRecords = (JobHistoryRecordCollection) historyFileParser.getJobRecords(); + jobHistoryRecords.setSubmitTime(submitTimeMillis); + jobHistoryRecords.mergeWith(confRecordCollection); + + LOG.info("Sending " + jobHistoryRecords.size() + " Job history records to " + + HravenService.JOB_HISTORY + " service"); + + context.progress(); - /** post processing steps on job puts and job conf puts */ + //3.4: post processing steps on job records and job conf records Long mbMillis = historyFileParser.getMegaByteMillis(); context.progress(); if (mbMillis == null) { @@ -245,21 +293,37 @@ protected void map( + jobKey); } - Put mbPut = getMegaByteMillisPut(mbMillis, jobKey); - LOG.info("Writing mega byte millis puts to " + Constants.HISTORY_TABLE); - context.write(JOB_TABLE, mbPut); + JobHistoryRecord mbRecord = getMegaByteMillisRecord(mbMillis, jobKey); + LOG.info("Send mega byte millis records to " + HravenService.JOB_HISTORY + " service"); + + jobHistoryRecords.add(mbRecord); context.progress(); - /** post processing steps to get cost of the job */ + //3.5: post processing steps to get cost of the job */ Double jobCost = getJobCost(mbMillis, context.getConfiguration()); context.progress(); if (jobCost == null) { throw new ProcessingException(" Unable to get job cost calculation for this record!" + jobKey); } - Put jobCostPut = getJobCostPut(jobCost, jobKey); - LOG.info("Writing jobCost puts to " + Constants.HISTORY_TABLE); - context.write(JOB_TABLE, jobCostPut); + JobHistoryRecord jobCostRecord = getJobCostRecord(jobCost, jobKey); + LOG.info("Send jobCost records to " + HravenService.JOB_HISTORY + " service"); + jobHistoryRecords.add(jobCostRecord); + + //Sink the merged record + sink(HravenService.JOB_HISTORY, jobHistoryRecords); + context.progress(); + + //3.6: get and write task related data + ArrayList taskHistoryRecords = (ArrayList) historyFileParser.getTaskRecords(); + + LOG.info("Sending " + taskHistoryRecords.size() + " Task history records to " + + HravenService.JOB_HISTORY_TASK + " service"); + + for (JobHistoryTaskRecord taskRecord: taskHistoryRecords) { + sink(HravenService.JOB_HISTORY_TASK, taskRecord); + } + context.progress(); } catch (RowKeyParseException rkpe) { @@ -280,6 +344,12 @@ protected void map( success = false; } + /** + * STEP 4 + * + * Finalization + * + * **/ if (success) { // Update counter to indicate failure. HadoopCompat.incrementCounter(context.getCounter(ProcessingCounter.RAW_ROW_SUCCESS_COUNT), 1); @@ -296,21 +366,28 @@ protected void map( // row, with one succeeding and one failing, there could be a race where the // raw does not properly indicate the true status (which is questionable in // any case with multiple simultaneous runs with different outcome). - context.write(RAW_TABLE, successPut); - + rawService.getTable().put(successPut); } - /** - * generates a put for the megabytemillis + private void sink(HravenService service, HravenRecord record) + throws IOException, InterruptedException { + + for (Sink sink: sinks) { + mos.write(sink.name(), new EnumWritable(service), record); + } + } + +/** + * generates a record for the megabytemillis * @param mbMillis * @param jobKey - * @return the put with megabytemillis + * @return the record with megabytemillis */ - private Put getMegaByteMillisPut(Long mbMillis, JobKey jobKey) { - Put pMb = new Put(jobKeyConv.toBytes(jobKey)); - pMb.add(Constants.INFO_FAM_BYTES, Constants.MEGABYTEMILLIS_BYTES, Bytes.toBytes(mbMillis)); - return pMb; - } + private JobHistoryRecord getMegaByteMillisRecord(Long mbMillis, + JobKey jobKey) { + return new JobHistoryRecord(RecordCategory.INFERRED, jobKey, + new RecordDataKey(Constants.MEGABYTEMILLIS), mbMillis); + } /** * looks for cost file in distributed cache @@ -400,24 +477,28 @@ private Double getJobCost(Long mbMillis, Configuration currentConf) { } /** - * generates a put for the job cost + * generates a record for the job cost * @param jobCost * @param jobKey - * @return the put with job cost + * @return the record with job cost */ - private Put getJobCostPut(Double jobCost, JobKey jobKey) { - Put pJobCost = new Put(jobKeyConv.toBytes(jobKey)); - pJobCost.add(Constants.INFO_FAM_BYTES, Constants.JOBCOST_BYTES, Bytes.toBytes(jobCost)); - return pJobCost; + + private JobHistoryRecord getJobCostRecord(Double jobCost, JobKey jobKey) { + return new JobHistoryRecord(RecordCategory.INFERRED, jobKey, + new RecordDataKey(Constants.JOBCOST), jobCost); } @Override protected void cleanup( - Mapper.Context context) + Mapper.Context context) throws java.io.IOException, InterruptedException { IOException caught = null; - + + //Close the MultipleOutputs instance + //to call close on all wrapped OutputFormats's RecordWriters + mos.close(); + if (jobHistoryByIdService != null) { try { jobHistoryByIdService.close(); diff --git a/hraven-etl/src/main/java/com/twitter/hraven/mapreduce/JobHistoryListener.java b/hraven-etl/src/main/java/com/twitter/hraven/mapreduce/JobHistoryListener.java index 320e05e..4e5e7cd 100644 --- a/hraven-etl/src/main/java/com/twitter/hraven/mapreduce/JobHistoryListener.java +++ b/hraven-etl/src/main/java/com/twitter/hraven/mapreduce/JobHistoryListener.java @@ -17,10 +17,14 @@ import java.io.IOException; import java.text.ParseException; +import java.util.ArrayList; +import java.util.Collection; import java.util.LinkedList; import java.util.List; import java.util.Map; +import javax.annotation.Nullable; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.client.Put; @@ -30,9 +34,17 @@ import org.apache.hadoop.mapred.JobHistoryCopy.Listener; import org.apache.hadoop.mapred.JobHistoryCopy.RecordTypes; +import com.google.common.base.Function; import com.twitter.hraven.Constants; +import com.twitter.hraven.HravenRecord; +import com.twitter.hraven.JobHistoryRecordCollection; import com.twitter.hraven.JobHistoryKeys; +import com.twitter.hraven.JobHistoryRawRecord; +import com.twitter.hraven.JobHistoryRecord; +import com.twitter.hraven.JobHistoryTaskRecord; +import com.twitter.hraven.RecordDataKey; import com.twitter.hraven.JobKey; +import com.twitter.hraven.RecordCategory; import com.twitter.hraven.TaskKey; import com.twitter.hraven.datasource.JobKeyConverter; import com.twitter.hraven.datasource.TaskKeyConverter; @@ -47,17 +59,14 @@ public class JobHistoryListener implements Listener { private String jobId; /** Job ID, minus the leading "job_" */ private String jobNumber = ""; - private final byte[] jobKeyBytes; /** explicitly initializing map millis and * reduce millis in case it's not found */ private long mapSlotMillis = 0L; private long reduceSlotMillis = 0L; - private List jobPuts = new LinkedList(); - private List taskPuts = new LinkedList(); - private JobKeyConverter jobKeyConv = new JobKeyConverter(); - private TaskKeyConverter taskKeyConv = new TaskKeyConverter(); + private Collection jobRecords; + private Collection taskRecords; /** * Constructor for listener to be used to read in a Job History File. While @@ -72,7 +81,8 @@ public JobHistoryListener(JobKey jobKey) { throw new IllegalArgumentException(msg); } this.jobKey = jobKey; - this.jobKeyBytes = jobKeyConv.toBytes(jobKey); + this.jobRecords = new JobHistoryRecordCollection(jobKey); + this.taskRecords = new ArrayList(); setJobId(jobKey.getJobId().getJobIdString()); } @@ -96,9 +106,12 @@ public void handle(RecordTypes recType, Map values) // skip other types ; } - //System.out.println("Reading: " + recType.toString()); } + private interface RecordGenerator { + public HravenRecord getRecord(RecordDataKey key, Object value, boolean isNumeric); + } + private void handleJob(Map values) { String id = values.get(JobHistoryKeys.JOBID); @@ -110,13 +123,15 @@ private void handleJob(Map values) { LOG.error(msg); throw new ImportException(msg); } - // add job ID to values to put - Put p = new Put(this.jobKeyBytes); + for (Map.Entry e : values.entrySet()) { - addKeyValues(p, Constants.INFO_FAM_BYTES, e.getKey(), e.getValue()); + addRecords(jobRecords, e.getKey(), e.getValue(), new RecordGenerator() { + @Override + public JobHistoryRecord getRecord(RecordDataKey key, Object value, boolean isNumeric) { + return new JobHistoryRecord(isNumeric ? RecordCategory.HISTORY_COUNTER : RecordCategory.HISTORY_META, jobKey, key, value); + } + }); } - this.jobPuts.add(p); - } /** @@ -124,10 +139,10 @@ private void handleJob(Map values) { * @param pVersion * @throws IllegalArgumentException if put is null */ - public void includeHadoopVersionPut(Put pVersion) { + public void includeHadoopVersionRecord(JobHistoryRecord pVersion) { // set the hadoop version for this record if (pVersion != null) { - this.jobPuts.add(pVersion); + this.jobRecords.add(pVersion); } else { String msg = "Hadoop Version put cannot be null"; LOG.error(msg); @@ -136,75 +151,76 @@ public void includeHadoopVersionPut(Put pVersion) { } private void handleTask(Map values) { - byte[] taskIdKeyBytes = getTaskKey("task_", this.jobNumber, values.get(JobHistoryKeys.TASKID)); - Put p = new Put(taskIdKeyBytes); + final TaskKey taskKey = getTaskKey("task_", this.jobNumber, values.get(JobHistoryKeys.TASKID)); - p.add(Constants.INFO_FAM_BYTES, Constants.RECORD_TYPE_COL_BYTES, - Bytes.toBytes(RecordTypes.Task.toString())); - for (Map.Entry e : values.entrySet()) { - addKeyValues(p, Constants.INFO_FAM_BYTES, e.getKey(), e.getValue()); + this.taskRecords.add(new JobHistoryTaskRecord(RecordCategory.HISTORY_TASK_META, taskKey, + new RecordDataKey(Constants.RECORD_TYPE_COL), RecordTypes.Task.toString())); + + for (Map.Entry e : values.entrySet()) { + addRecords(taskRecords, e.getKey(), e.getValue(), new RecordGenerator() { + @Override + public JobHistoryTaskRecord getRecord(RecordDataKey key, Object value, boolean isNumeric) { + return new JobHistoryTaskRecord(isNumeric ? RecordCategory.HISTORY_TASK_COUNTER + : RecordCategory.HISTORY_TASK_META, taskKey, key, value); + } + }); } - this.taskPuts.add(p); } private void handleMapAttempt(Map values) { - byte[] taskIdKeyBytes = getTaskKey("attempt_", this.jobNumber, values.get(JobHistoryKeys.TASK_ATTEMPT_ID)); - Put p = new Put(taskIdKeyBytes); + final TaskKey taskKey = + getTaskKey("attempt_", this.jobNumber, values.get(JobHistoryKeys.TASK_ATTEMPT_ID)); - p.add(Constants.INFO_FAM_BYTES, Constants.RECORD_TYPE_COL_BYTES, - Bytes.toBytes(RecordTypes.MapAttempt.toString())); - for (Map.Entry e : values.entrySet()) { - addKeyValues(p, Constants.INFO_FAM_BYTES, e.getKey(), e.getValue()); - } + this.taskRecords.add(new JobHistoryTaskRecord(RecordCategory.HISTORY_TASK_META, taskKey, + new RecordDataKey(Constants.RECORD_TYPE_COL), RecordTypes.MapAttempt.toString())); - this.taskPuts.add(p); + for (Map.Entry e : values.entrySet()) { + addRecords(taskRecords, e.getKey(), e.getValue(), new RecordGenerator() { + @Override + public JobHistoryTaskRecord getRecord(RecordDataKey key, Object value, boolean isNumeric) { + return new JobHistoryTaskRecord(isNumeric ? RecordCategory.HISTORY_TASK_COUNTER + : RecordCategory.HISTORY_TASK_META, taskKey, key, value); + } + }); + } } private void handleReduceAttempt(Map values) { - byte[] taskIdKeyBytes = getTaskKey("attempt_", this.jobNumber, values.get(JobHistoryKeys.TASK_ATTEMPT_ID)); - Put p = new Put(taskIdKeyBytes); + final TaskKey taskKey = + getTaskKey("attempt_", this.jobNumber, values.get(JobHistoryKeys.TASK_ATTEMPT_ID)); - p.add(Constants.INFO_FAM_BYTES, Constants.RECORD_TYPE_COL_BYTES, - Bytes.toBytes(RecordTypes.ReduceAttempt.toString())); - for (Map.Entry e : values.entrySet()) { - addKeyValues(p, Constants.INFO_FAM_BYTES, e.getKey(), e.getValue()); - } + this.taskRecords.add(new JobHistoryTaskRecord(RecordCategory.HISTORY_TASK_META, taskKey, + new RecordDataKey(Constants.RECORD_TYPE_COL), RecordTypes.ReduceAttempt.toString())); - this.taskPuts.add(p); + for (Map.Entry e : values.entrySet()) { + addRecords(taskRecords, e.getKey(), e.getValue(), new RecordGenerator() { + @Override + public JobHistoryTaskRecord getRecord(RecordDataKey key, Object value, boolean isNumeric) { + return new JobHistoryTaskRecord(isNumeric ? RecordCategory.HISTORY_TASK_COUNTER + : RecordCategory.HISTORY_TASK_META, taskKey, key, value); + } + }); + } } - private void addKeyValues(Put p, byte[] family, JobHistoryKeys key, String value) { + private void addRecords(Collection recordCollection, JobHistoryKeys key, String value, + RecordGenerator generator) { if (key == JobHistoryKeys.COUNTERS || key == JobHistoryKeys.MAP_COUNTERS || key == JobHistoryKeys.REDUCE_COUNTERS) { try { Counters counters = Counters.fromEscapedCompactString(value); - /* - * Name counter columns as: - * g!groupname!countername - */ - byte[] counterPrefix = null; - if (key == JobHistoryKeys.COUNTERS) { - counterPrefix = Bytes.add(Constants.COUNTER_COLUMN_PREFIX_BYTES, - Constants.SEP_BYTES); - } else if (key == JobHistoryKeys.MAP_COUNTERS) { - counterPrefix = Bytes.add(Constants.MAP_COUNTER_COLUMN_PREFIX_BYTES, - Constants.SEP_BYTES); - } else if (key == JobHistoryKeys.REDUCE_COUNTERS) { - counterPrefix = Bytes.add(Constants.REDUCE_COUNTER_COLUMN_PREFIX_BYTES, - Constants.SEP_BYTES); - } else { - throw new IllegalArgumentException("Unknown counter type "+key.toString()); - } for (Counters.Group group : counters) { - byte[] groupPrefix = Bytes.add( - counterPrefix, Bytes.toBytes(group.getName()), Constants.SEP_BYTES); for (Counters.Counter counter : group) { + RecordDataKey dataKey = new RecordDataKey(key.toString()); + dataKey.add(group.getName()); String counterName = counter.getName(); long counterValue = counter.getValue(); - byte[] qualifier = Bytes.add(groupPrefix, Bytes.toBytes(counterName)); - p.add(family, qualifier, Bytes.toBytes(counterValue)); - // get the map and reduce slot millis for megabytemillis calculations + dataKey.add(counterName); + recordCollection.add(generator.getRecord(dataKey, counterValue, true)); + + // get the map and reduce slot millis for megabytemillis + // calculations if (Constants.SLOTS_MILLIS_MAPS.equals(counterName)) { this.mapSlotMillis = counterValue; } @@ -214,34 +230,31 @@ private void addKeyValues(Put p, byte[] family, JobHistoryKeys key, String value } } } catch (ParseException pe) { - LOG.error("Counters could not be parsed from string'"+value+"'", pe); + LOG.error("Counters could not be parsed from string'" + value + "'", pe); } } else { @SuppressWarnings("rawtypes") Class clazz = JobHistoryKeys.KEY_TYPES.get(key); - byte[] valueBytes = null; + Object valueObj = value; + boolean isNumeric = false; if (Integer.class.equals(clazz)) { + isNumeric = true; try { - valueBytes = (value != null && value.trim().length() > 0) ? - Bytes.toBytes(Integer.parseInt(value)) : Constants.ZERO_INT_BYTES; + valueObj = (value != null && value.trim().length() > 0) ? Integer.parseInt(value) : 0; } catch (NumberFormatException nfe) { - // us a default value - valueBytes = Constants.ZERO_INT_BYTES; + valueObj = 0; } } else if (Long.class.equals(clazz)) { + isNumeric = true; try { - valueBytes = (value != null && value.trim().length() > 0) ? - Bytes.toBytes(Long.parseLong(value)) : Constants.ZERO_LONG_BYTES; + valueObj = (value != null && value.trim().length() > 0) ? Long.parseLong(value) : 0; } catch (NumberFormatException nfe) { - // us a default value - valueBytes = Constants.ZERO_LONG_BYTES; + valueObj = 0; } - } else { - // keep the string representation by default - valueBytes = Bytes.toBytes(value); } - byte[] qualifier = Bytes.toBytes(key.toString().toLowerCase()); - p.add(family, qualifier, valueBytes); + + recordCollection.add(generator.getRecord(new RecordDataKey(key.toString()), + valueObj, isNumeric)); } } @@ -260,7 +273,7 @@ private void setJobId(String id) { * Returns the Task ID or Task Attempt ID, stripped of the leading job ID, * appended to the job row key. */ - public byte[] getTaskKey(String prefix, String jobNumber, String fullId) { + public TaskKey getTaskKey(String prefix, String jobNumber, String fullId) { String taskComponent = fullId; if (fullId == null) { taskComponent = ""; @@ -272,15 +285,15 @@ public byte[] getTaskKey(String prefix, String jobNumber, String fullId) { } } - return taskKeyConv.toBytes(new TaskKey(this.jobKey, taskComponent)); + return new TaskKey(this.jobKey, taskComponent); } /** * getter for jobKeyBytes * @return the byte array of jobKeyBytes */ - public byte[] getJobKeyBytes() { - return this.jobKeyBytes; + public JobKey getJobKey() { + return this.jobKey; } /** @@ -289,12 +302,12 @@ public byte[] getJobKeyBytes() { * is called with this listener. * @return a non-null (possibly empty) list of jobPuts */ - public List getJobPuts() { - return this.jobPuts; + public Collection getJobRecords() { + return this.jobRecords; } - public List getTaskPuts() { - return this.taskPuts; + public Collection getTaskRecords() { + return this.taskRecords; } public Long getMapSlotMillis() { diff --git a/hraven-etl/src/test/java/com/twitter/hraven/etl/TestJobHistoryFileParserHadoop1.java b/hraven-etl/src/test/java/com/twitter/hraven/etl/TestJobHistoryFileParserHadoop1.java index 411b0c4..9a3357a 100644 --- a/hraven-etl/src/test/java/com/twitter/hraven/etl/TestJobHistoryFileParserHadoop1.java +++ b/hraven-etl/src/test/java/com/twitter/hraven/etl/TestJobHistoryFileParserHadoop1.java @@ -30,6 +30,8 @@ import com.google.common.io.Files; import com.twitter.hraven.Constants; +import com.twitter.hraven.JobHistoryRecordCollection; +import com.twitter.hraven.JobHistoryRecord; import com.twitter.hraven.JobKey; public class TestJobHistoryFileParserHadoop1 { @@ -64,8 +66,8 @@ public void testMegaByteMillis() throws IOException { JobKey jobKey = new JobKey("cluster1", "user1", "Sleep", 1, "job_201311192236_3583"); historyFileParser.parse(contents, jobKey); - List jobPuts = historyFileParser.getJobPuts(); - assertNotNull(jobPuts); + JobHistoryRecordCollection jobRecords = (JobHistoryRecordCollection)historyFileParser.getJobRecords(); + assertNotNull(jobRecords); Long mbMillis = historyFileParser.getMegaByteMillis(); Long expValue = 2981062L; diff --git a/hraven-etl/src/test/java/com/twitter/hraven/etl/TestJobHistoryFileParserHadoop2.java b/hraven-etl/src/test/java/com/twitter/hraven/etl/TestJobHistoryFileParserHadoop2.java index ea9960d..1de4c38 100644 --- a/hraven-etl/src/test/java/com/twitter/hraven/etl/TestJobHistoryFileParserHadoop2.java +++ b/hraven-etl/src/test/java/com/twitter/hraven/etl/TestJobHistoryFileParserHadoop2.java @@ -23,13 +23,19 @@ import com.twitter.hraven.Constants; import com.twitter.hraven.HadoopVersion; import com.twitter.hraven.JobHistoryKeys; +import com.twitter.hraven.JobHistoryRecord; +import com.twitter.hraven.JobHistoryRecordCollection; +import com.twitter.hraven.JobHistoryTaskRecord; import com.twitter.hraven.JobKey; +import com.twitter.hraven.RecordCategory; +import com.twitter.hraven.RecordDataKey; import com.twitter.hraven.datasource.JobKeyConverter; import com.twitter.hraven.datasource.ProcessingException; import com.twitter.hraven.datasource.TaskKeyConverter; import java.io.File; import java.io.IOException; import java.util.Arrays; +import java.util.Collection; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -64,72 +70,30 @@ public void testCreateJobHistoryFileParserCorrectCreation() throws IOException { JobKey jobKey = new JobKey("cluster1", "user", "Sleep", 1, "job_1329348432655_0001"); historyFileParser.parse(contents, jobKey); - List jobPuts = historyFileParser.getJobPuts(); - assertEquals(6, jobPuts.size()); + JobHistoryRecordCollection jobRecords = (JobHistoryRecordCollection)historyFileParser.getJobRecords(); + assertEquals(156, jobRecords.size()); - JobKeyConverter jobKeyConv = new JobKeyConverter(); assertEquals("cluster1!user!Sleep!1!job_1329348432655_0001", - jobKeyConv.fromBytes(jobPuts.get(0).getRow()).toString()); + jobRecords.getKey().toString()); // check hadoop version - boolean foundVersion2 = false; - for (Put p : jobPuts) { - List kv2 = p.get(Constants.INFO_FAM_BYTES, - Bytes.toBytes(JobHistoryKeys.hadoopversion.toString())); - if (kv2.size() == 0) { - // we are interested in hadoop version put only - // hence continue - continue; - } - assertEquals(1, kv2.size()); - Map> d = p.getFamilyMap(); - for (List lkv : d.values()) { - for (KeyValue kv : lkv) { - // ensure we have a hadoop2 version as the value - assertEquals(Bytes.toString(kv.getValue()), - HadoopVersion.TWO.toString()); - - // ensure we don't see the same put twice - assertFalse(foundVersion2); - // now set this to true - foundVersion2 = true; - } - } - } - // ensure that we got the hadoop2 version put - assertTrue(foundVersion2); + Object versionRecord = + jobRecords.getValue(RecordCategory.HISTORY_META, new RecordDataKey( + JobHistoryKeys.hadoopversion.toString().toLowerCase())); + + assertNotNull(versionRecord); + assertEquals((String)versionRecord, HadoopVersion.TWO.toString()); // check job status - boolean foundJobStatus = false; - for (Put p : jobPuts) { - List kv2 = - p.get(Constants.INFO_FAM_BYTES, - Bytes.toBytes(JobHistoryKeys.JOB_STATUS.toString().toLowerCase())); - if (kv2.size() == 0) { - // we are interested in JobStatus put only - // hence continue - continue; - } - assertEquals(1, kv2.size()); - - for (KeyValue kv : kv2) { - // ensure we have a job status value as the value - assertEquals(Bytes.toString(kv.getValue()), - JobHistoryFileParserHadoop2.JOB_STATUS_SUCCEEDED); - - // ensure we don't see the same put twice - assertFalse(foundJobStatus); - // now set this to true - foundJobStatus = true; - } - } - // ensure that we got the JobStatus put - assertTrue(foundJobStatus); - - List taskPuts = historyFileParser.getTaskPuts(); - assertEquals(taskPuts.size(), 45); + Object statusRecord = + jobRecords.getValue(RecordCategory.HISTORY_META, new RecordDataKey( + JobHistoryKeys.JOB_STATUS.toString().toLowerCase())); + + assertNotNull(statusRecord); + assertEquals((String)statusRecord, JobHistoryFileParserHadoop2.JOB_STATUS_SUCCEEDED); - TaskKeyConverter taskKeyConv = new TaskKeyConverter(); + List taskRecords = (List) historyFileParser.getTaskRecords(); + assertEquals(382, taskRecords.size()); Set putRowKeys = new HashSet(Arrays.asList( @@ -158,9 +122,8 @@ public void testCreateJobHistoryFileParserCorrectCreation() throws IOException { "cluster1!user!Sleep!1!job_1329348432655_0001!m_000001_0")); String tKey; - for (Put p : taskPuts) { - tKey = taskKeyConv.fromBytes(p.getRow()).toString(); - assertTrue(putRowKeys.contains(tKey)); + for (JobHistoryTaskRecord p: taskRecords) { + assertTrue(putRowKeys.contains(p.getKey().toString())); } // check post processing for megabytemillis @@ -227,15 +190,15 @@ public void testLongExpGetValuesIntBytes() { String[] keysToBeChecked = {"totalMaps", "totalReduces", "finishedMaps", "finishedReduces", "failedMaps", "failedReduces"}; - byte[] byteValue = null; + Long longValue = null; int intValue10 = 10; long longValue10 = 10L; JobHistoryFileParserHadoop2 jh = new JobHistoryFileParserHadoop2(null); for(String key: keysToBeChecked) { - byteValue = jh.getValue(JobHistoryKeys.HADOOP2_TO_HADOOP1_MAPPING.get(key), intValue10); - assertEquals(Bytes.toLong(byteValue), longValue10); + longValue = (Long) jh.getValue(JobHistoryKeys.HADOOP2_TO_HADOOP1_MAPPING.get(key), intValue10); + assertEquals(longValue.longValue(), longValue10); } } @@ -246,14 +209,14 @@ public void testLongExpGetValuesIntBytes() { public void testIntExpGetValuesIntBytes() { String[] keysToBeChecked = {"httpPort"}; - byte[] byteValue = null; + Integer intValue = null; int intValue10 = 10; JobHistoryFileParserHadoop2 jh = new JobHistoryFileParserHadoop2(null); for(String key: keysToBeChecked) { - byteValue = jh.getValue(JobHistoryKeys.HADOOP2_TO_HADOOP1_MAPPING.get(key), intValue10); - assertEquals(Bytes.toInt(byteValue), intValue10); + intValue = (Integer) jh.getValue(JobHistoryKeys.HADOOP2_TO_HADOOP1_MAPPING.get(key), intValue10); + assertEquals(intValue.intValue(), intValue10); } } } diff --git a/hraven-etl/src/test/java/com/twitter/hraven/etl/TestJobHistoryListener.java b/hraven-etl/src/test/java/com/twitter/hraven/etl/TestJobHistoryListener.java index 90af968..4dfd0bf 100644 --- a/hraven-etl/src/test/java/com/twitter/hraven/etl/TestJobHistoryListener.java +++ b/hraven-etl/src/test/java/com/twitter/hraven/etl/TestJobHistoryListener.java @@ -30,6 +30,8 @@ import com.twitter.hraven.Constants; import com.twitter.hraven.JobHistoryKeys; +import com.twitter.hraven.JobHistoryRecord; +import com.twitter.hraven.JobHistoryRecordCollection; import com.twitter.hraven.JobKey; import com.twitter.hraven.HadoopVersion; import com.twitter.hraven.mapreduce.JobHistoryListener; @@ -50,39 +52,33 @@ public void checkHadoopVersionSet() { JobKey jobKey = new JobKey("cluster1", "user", "Sleep", 1, "job_1329348432655_0001"); JobHistoryListener jobHistoryListener = new JobHistoryListener(jobKey); - assertEquals(jobHistoryListener.getJobPuts().size(), 0); + assertEquals(jobHistoryListener.getJobRecords().size(), 0); JobHistoryFileParserHadoop1 jh = new JobHistoryFileParserHadoop1(null); - Put versionPut = jh.getHadoopVersionPut( + JobHistoryRecord versionRecord = jh.getHadoopVersionRecord( HadoopVersion.ONE, - jobHistoryListener.getJobKeyBytes()); - jobHistoryListener.includeHadoopVersionPut(versionPut); - assertEquals(jobHistoryListener.getJobPuts().size(), 1); + jobHistoryListener.getJobKey()); + jobHistoryListener.includeHadoopVersionRecord(versionRecord); + assertEquals(jobHistoryListener.getJobRecords().size(), 1); // check hadoop version boolean foundVersion1 = false; - for (Put p : jobHistoryListener.getJobPuts()) { - List kv2 = p.get(Constants.INFO_FAM_BYTES, - Bytes.toBytes(JobHistoryKeys.hadoopversion.toString())); - if (kv2.size() == 0) { + for (JobHistoryRecord p : (JobHistoryRecordCollection)jobHistoryListener.getJobRecords()) { + if (!p.getDataKey().get(0).equals(JobHistoryKeys.hadoopversion.toString().toLowerCase())) { // we are interested in hadoop version put only // hence continue continue; } - assertEquals(1, kv2.size()); - Map> d = p.getFamilyMap(); - for (List lkv : d.values()) { - for (KeyValue kv : lkv) { - // ensure we have a hadoop2 version as the value - assertEquals(Bytes.toString(kv.getValue()), - HadoopVersion.ONE.toString() ); - // ensure we don't see the same put twice - assertFalse(foundVersion1); - // now set this to true - foundVersion1 = true; - } - } + assert(p.getDataValue() != null); + + // ensure we have a hadoop2 version as the value + assertEquals(p.getDataValue(), + HadoopVersion.ONE.toString()); + // ensure we don't see the same put twice + assertFalse(foundVersion1); + // now set this to true + foundVersion1 = true; } // ensure that we got the hadoop1 version put assertTrue(foundVersion1);