Skip to content
This repository has been archived by the owner on Jan 15, 2022. It is now read-only.

Refactoring hraven for multiple sink support #102

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion bin/etl/hraven-etl.sh
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ hbaseconfdir=${HBASE_CONF_DIR:-$HBASE_HOME/conf}
# HDFS directories for processing and loading job history data
historyRawDir=/yarn/history/done/
historyProcessingDir=/hraven/processing/
sinks=GRAPHITE,HBASE
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding additional (optional) support sounds great. Here it looks like you are making a graphite sink the default.

jobFileProcessorConfOpts=-Dhraven.sink.graphite.userfilter=rmcuser -Dhraven.sink.graphite.queuefilter=userplatform -Dhraven.sink.graphite.excludedcomponents=MultiInputCounters
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you are referring to rmcuser, which as it seems from other pull requests, happens to be user that you use to run some of this.
What does userplatform mean ?

#######################################################

#If costfile is empty, fill it with default values
Expand Down Expand Up @@ -71,4 +73,4 @@ $home/jobFilePreprocessor.sh $hadoopconfdir $historyRawDir $historyProcessingDir
$home/jobFileLoader.sh $hadoopconfdir $mapredmaxsplitsize $schedulerpoolname $cluster $historyProcessingDir

# Process
$home/jobFileProcessor.sh $hbaseconfdir $schedulerpoolname $historyProcessingDir $cluster $threads $batchsize $machinetype $costfile
$home/jobFileProcessor.sh $hbaseconfdir $schedulerpoolname $historyProcessingDir $cluster $threads $batchsize $machinetype $costfile $sinks $jobFileProcessorConfOpts
6 changes: 3 additions & 3 deletions bin/etl/jobFileProcessor.sh
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
# [schedulerpoolname] [historyprocessingdir] [cluster] [threads] [batchsize] [machinetype] [costfile]
# a sample cost file can be found in the conf dir as sampleCostDetails.properties

if [ $# -ne 8 ]
if [ $# -ne 10 ]
then
echo "Usage: `basename $0` [hbaseconfdir] [schedulerpoolname] [historyprocessingdir] [cluster] [threads] [batchsize] [machinetype] [costfile]"
echo "Usage: `basename $0` [hbaseconfdir] [schedulerpoolname] [historyprocessingdir] [cluster] [threads] [batchsize] [machinetype] [costfile] [sinks] [jobfileprocessorconf]"
exit 1
fi

Expand All @@ -40,5 +40,5 @@ fi
create_pidfile $HRAVEN_PID_DIR
trap 'cleanup_pidfile_and_exit $HRAVEN_PID_DIR' INT TERM EXIT

hadoop --config $1 jar $hravenEtlJar com.twitter.hraven.etl.JobFileProcessor -libjars=$LIBJARS -Dmapred.fairscheduler.pool=$2 -d -p $3 -c $4 -t $5 -b $6 -m $7 -z $8
hadoop --config $1 jar $hravenEtlJar com.twitter.hraven.etl.JobFileProcessor -libjars=$LIBJARS -Dmapred.fairscheduler.pool=$2 ${10} -d -p $3 -c $4 -t $5 -b $6 -m $7 -z $8 -s $9

28 changes: 24 additions & 4 deletions hraven-core/src/main/java/com/twitter/hraven/AppKey.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,27 +16,33 @@

package com.twitter.hraven;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.commons.lang.builder.CompareToBuilder;
import org.apache.commons.lang.builder.HashCodeBuilder;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.codehaus.jackson.annotate.JsonCreator;
import org.codehaus.jackson.annotate.JsonProperty;

public class AppKey implements Comparable<Object> {
public class AppKey implements WritableComparable<Object> {

/**
* The cluster on which the application ran
*/
protected final String cluster;
protected String cluster;
/**
* Who ran the application on Hadoop
*/
protected final String userName;
protected String userName;

/**
* The thing that identifies an application,
* such as Pig script identifier, or Scalding identifier.
*/
protected final String appId;
protected String appId;

@JsonCreator
public AppKey(@JsonProperty("cluster") String cluster, @JsonProperty("userName") String userName,
Expand Down Expand Up @@ -111,4 +117,18 @@ public int hashCode() {
.toHashCode();
}

@Override
public void write(DataOutput out) throws IOException {
Text.writeString(out, this.cluster);
Text.writeString(out, this.userName);
Text.writeString(out, this.appId);
}

@Override
public void readFields(DataInput in) throws IOException {
this.cluster = Text.readString(in);
this.userName = Text.readString(in);
this.appId = Text.readString(in);
}

}
27 changes: 27 additions & 0 deletions hraven-core/src/main/java/com/twitter/hraven/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ public class Constants {
public static final char SEP_CHAR = '!';
public static final String SEP = "" + SEP_CHAR;
public static final byte[] SEP_BYTES = Bytes.toBytes(SEP);

public static final String PERIOD_SEP_CHAR = ".";

// common default values
public static final byte[] EMPTY_BYTES = new byte[0];
Expand Down Expand Up @@ -426,4 +428,29 @@ public class Constants {

/** name of the properties file used for cluster to cluster identifier mapping */
public static final String HRAVEN_CLUSTER_PROPERTIES_FILENAME = "hRavenClusters.properties";

public static final String JOBCONF_SINKS = "hraven.sinks";

public static final String JOBCONF_GRAPHITE_HOST_KEY = "hraven.sink.graphite.host";
public static final String JOBCONF_GRAPHITE_PORT_KEY = "hraven.sink.graphite.port";
public static final String JOBCONF_GRAPHITE_PREFIX = "hraven.sink.graphite.prefix";
public static final String JOBCONF_GRAPHITE_USER_FILTER = "hraven.sink.graphite.userfilter";
public static final String JOBCONF_GRAPHITE_QUEUE_FILTER = "hraven.sink.graphite.queuefilter";
public static final String JOBCONF_GRAPHITE_EXCLUDED_COMPONENTS = "hraven.sink.graphite.excludedcomponents";
public static final String JOBCONF_GRAPHITE_DONOTEXCLUDE_APPS = "hraven.sink.graphite.donotexcludeapps";

public static final int GRAPHITE_DEFAULT_PORT = 2003;
public static final String GRAPHITE_DEFAULT_HOST = "localhost";
public static final String GRAPHITE_DEFAULT_PREFIX = "DEFAULT";

public static final String GRAPHITE_KEY_MAPPING_TABLE = PREFIX + "graphite_key_mapping";
public static final byte[] GRAPHITE_KEY_MAPPING_TABLE_BYTES = Bytes.toBytes(GRAPHITE_KEY_MAPPING_TABLE);

public static final String GRAPHITE_REVERSE_KEY_MAPPING_TABLE = PREFIX + "graphite_key_mapping_r";
public static final byte[] GRAPHITE_REVERSE_KEY_MAPPING_TABLE_BYTES = Bytes.toBytes(GRAPHITE_REVERSE_KEY_MAPPING_TABLE);

public static final String GRAPHITE_KEY_MAPPING_COLUMN = "k";
public static final byte[] GRAPHITE_KEY_MAPPING_COLUMN_BYTES = Bytes.toBytes(GRAPHITE_KEY_MAPPING_COLUMN);


}
22 changes: 21 additions & 1 deletion hraven-core/src/main/java/com/twitter/hraven/FlowKey.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,19 @@
*/
package com.twitter.hraven;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.commons.lang.builder.CompareToBuilder;
import org.apache.commons.lang.builder.HashCodeBuilder;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.codehaus.jackson.annotate.JsonCreator;
import org.codehaus.jackson.annotate.JsonProperty;

public class FlowKey extends AppKey implements Comparable<Object> {
public class FlowKey extends AppKey implements WritableComparable<Object> {

/**
* Identifying one single run of a version of an app. Smaller values indicate
Expand Down Expand Up @@ -111,4 +118,17 @@ public int hashCode(){
.toHashCode();
}

@Override
public void write(DataOutput out) throws IOException {
super.write(out);
new LongWritable(this.runId).write(out);
}

@Override
public void readFields(DataInput in) throws IOException {
super.readFields(in);
LongWritable lw = new LongWritable();
lw.readFields(in);
this.runId = lw.get();
}
}
172 changes: 172 additions & 0 deletions hraven-core/src/main/java/com/twitter/hraven/HravenRecord.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
package com.twitter.hraven;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need copyright header on all new files before we can accept them.


import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Writable;

import com.twitter.hraven.util.EnumWritable;
import com.twitter.hraven.util.Serializer;

/**
*
* @author angad.singh
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

drop authors

*
* {@link JobFileTableMapper outputs this as value. It corresponds to the
* Put record which was earlier emitted
*
* @param <K> key type
* @param <V> type of dataValue object to be stored
*/

public abstract class HravenRecord<K extends Writable, V> implements Writable{
private K key;
private RecordCategory dataCategory;
private RecordDataKey dataKey;
private V dataValue;
private Long submitTime;

public HravenRecord() {

}

public K getKey() {
return key;
}

public void setKey(K key) {
this.key = key;
}

public RecordCategory getDataCategory() {
return dataCategory;
}

public void setDataCategory(RecordCategory dataCategory) {
this.dataCategory = dataCategory;
}

public RecordDataKey getDataKey() {
return dataKey;
}

public void setDataKey(RecordDataKey dataKey) {
this.dataKey = dataKey;
}

public V getDataValue() {
return dataValue;
}

public void setDataValue(V dataValue) {
this.dataValue = dataValue;
}

public Long getSubmitTime() {
return submitTime;
}

public void setSubmitTime(Long submitTime) {
this.submitTime = submitTime;
}

@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((dataCategory == null) ? 0 : dataCategory.hashCode());
result = prime * result + ((dataKey == null) ? 0 : dataKey.hashCode());
result = prime * result + ((dataValue == null) ? 0 : dataValue.hashCode());
result = prime * result + ((key == null) ? 0 : key.hashCode());
result = prime * result + (int) (submitTime ^ (submitTime >>> 32));
return result;
}

@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
HravenRecord other = (HravenRecord) obj;
if (dataCategory != other.dataCategory) {
return false;
}
if (dataKey == null) {
if (other.dataKey != null) {
return false;
}
} else if (!dataKey.equals(other.dataKey)) {
return false;
}
if (dataValue == null) {
if (other.dataValue != null) {
return false;
}
} else if (!dataValue.equals(other.dataValue)) {
return false;
}
if (key == null) {
if (other.key != null) {
return false;
}
} else if (!key.equals(other.key)) {
return false;
}
if (submitTime != other.submitTime) {
return false;
}
return true;
}

@Override
public String toString() {
return "HravenRecord [key=" + key + ", dataCategory=" + dataCategory + ", dataKey=" + dataKey
+ ", dataValue=" + dataValue + ", submitTime=" + submitTime + "]";
}

@Override
public void write(DataOutput out) throws IOException {
//key
this.key.write(out);
//dataCategory
new EnumWritable(this.dataCategory).write(out);
//dataKey
this.dataKey.write(out);
//dataValue
byte[] b = Serializer.serialize(this.dataValue);
out.writeInt(b.length);
out.write(b);
//submitTime
new LongWritable(this.submitTime).write(out);
}

@Override
public void readFields(DataInput in) throws IOException {
//key
this.key.readFields(in);
//dataCategory
new EnumWritable(this.dataCategory).readFields(in);
//dataKey
this.dataKey.readFields(in);
//dataValue
byte[] b = new byte[in.readInt()];
in.readFully(b);
try {
this.dataValue = (V) Serializer.deserialize(b);
} catch (ClassNotFoundException e) {
throw new RuntimeException("Failure in deserializing HravenRecord.dataValue");
}
//submitTime
LongWritable lw = new LongWritable();
lw.readFields(in);
this.submitTime = lw.get();
}
}
32 changes: 32 additions & 0 deletions hraven-core/src/main/java/com/twitter/hraven/HravenService.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package com.twitter.hraven;

/**
*
* @author angad.singh
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

drop author, add head to all files please.

*
* {@link JobFileTableMapper} outputs this as key. It corresponds to the
* Hbase table which was earlier emitted
*/

public enum HravenService {
JOB_HISTORY_RAW {
@Override
public HravenRecord getNewRecord() {
return new JobHistoryRawRecord();
}
},
JOB_HISTORY {
@Override
public HravenRecord getNewRecord() {
return new JobHistoryRecord();
}
},
JOB_HISTORY_TASK {
@Override
public HravenRecord getNewRecord() {
return new JobHistoryTaskRecord();
}
};

public abstract HravenRecord getNewRecord();
}
Loading