Skip to content

Commit

Permalink
Updating Louvain to use Text objects instead of LongWritable
Browse files Browse the repository at this point in the history
  • Loading branch information
Dwayne Pryce committed Oct 8, 2013
1 parent 473060a commit fa00d72
Show file tree
Hide file tree
Showing 8 changed files with 318 additions and 357 deletions.
Original file line number Diff line number Diff line change
@@ -1,13 +1,5 @@
package mil.darpa.xdata.louvain.giraph;

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.util.ArrayList;
import java.util.List;

import org.apache.giraph.aggregators.DoubleSumAggregator;
import org.apache.giraph.aggregators.LongSumAggregator;
import org.apache.giraph.master.DefaultMasterCompute;
Expand All @@ -17,6 +9,10 @@
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;

import java.io.*;
import java.util.ArrayList;
import java.util.List;


/**
* Master compute class. performs a compute function before each super step. Performs 4 functions.
Expand Down Expand Up @@ -46,9 +42,9 @@ public class LouvainMasterCompute extends DefaultMasterCompute{
public void initialize() throws InstantiationException, IllegalAccessException {
this.registerAggregator(LouvainVertex.CHANGE_AGG, LongSumAggregator.class);
this.registerPersistentAggregator(LouvainVertex.TOTAL_EDGE_WEIGHT_AGG, LongSumAggregator.class);
for (int i =0; i < LouvainVertex.getNumQAggregators(getConf()); i++){
this.registerPersistentAggregator(LouvainVertex.ACTUAL_Q_AGG+i, DoubleSumAggregator.class);
}
//for (int i =0; i < LouvainVertex.getNumQAggregators(getConf()); i++){
this.registerPersistentAggregator(LouvainVertex.ACTUAL_Q_AGG, DoubleSumAggregator.class);
//}

}

Expand Down Expand Up @@ -102,9 +98,9 @@ else if (halt){

private double getActualQ(){
double actualQ = 0.0;
for (int i =0; i < LouvainVertex.getNumQAggregators(getConf()); i++){
actualQ += ( (DoubleWritable) getAggregatedValue(LouvainVertex.ACTUAL_Q_AGG+i) ).get();
}
//for (int i =0; i < LouvainVertex.getNumQAggregators(getConf()); i++){
actualQ += ( (DoubleWritable) getAggregatedValue(LouvainVertex.ACTUAL_Q_AGG) ).get();
//}
return actualQ;
}

Expand Down Expand Up @@ -146,6 +142,7 @@ private void markPipeLineComplete(String message){
String outputPath = getConf().get("mapred.output.dir");
String dir = outputPath.substring(0, outputPath.lastIndexOf("/"));
String filename = getConf().get("fs.defaultFS")+dir+"/_COMPLETE";
//String filename = "hdfs://xd-namenode:8020"+dir+"/_COMPLETE";
System.out.println("Writing "+filename);
writeFile(filename,message);
}
Expand All @@ -159,6 +156,7 @@ private void writeQvalue(String message){
String stage = outputPath.substring(lastIndexOfSlash+1);
String stagenumber = stage.substring(stage.lastIndexOf("_")+1);
String filename = getConf().get("fs.defaultFS")+dir+"/_q_"+stagenumber;
//String filename = "hdfs://xd-namenode:8020"+dir+"/_q_"+stagenumber;
writeFile(filename,message);

}
Expand All @@ -176,6 +174,7 @@ private double getPreviousQvalue(){
}
else{
String filename = getConf().get("fs.defaultFS")+dir+"/_q_"+previousStageNumber;
//String filename = "hdfs://xd-namenode:8020"+dir+"/_q_"+previousStageNumber;
String result = this.readFile(filename).trim();
return Double.parseDouble(result);
}
Expand Down
33 changes: 18 additions & 15 deletions src/main/java/mil/darpa/xdata/louvain/giraph/LouvainMessage.java
Original file line number Diff line number Diff line change
@@ -1,25 +1,26 @@
package mil.darpa.xdata.louvain.giraph;

import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.Writable;


/**
* messages sent between vertcies.
* messages sent between vertices.
*/
public class LouvainMessage implements Writable{
public class LouvainMessage implements Writable {

private long communityId;
private String communityId = "";
private long communitySigmaTotal;
private long edgeWeight;
private long sourceId;
private String sourceId = "";

public LouvainMessage(){}

public LouvainMessage(long communityId,long sigmaTotal,long weight,long sourceId){
public LouvainMessage(String communityId,long sigmaTotal,long weight,String sourceId){
this();
this.communityId = communityId;
this.communitySigmaTotal = sigmaTotal;
Expand All @@ -33,25 +34,27 @@ public LouvainMessage(LouvainMessage other){

@Override
public void readFields(DataInput in) throws IOException {
communityId = in.readLong();
communityId = WritableUtils.readString(in);
communitySigmaTotal = in.readLong();
edgeWeight = in.readLong();
sourceId = in.readLong();
sourceId = WritableUtils.readString(in);
}

@Override
public void write(DataOutput out) throws IOException {
out.writeLong(communityId);
WritableUtils.writeString(out, communityId);
//out.writeUTF(communityId);
out.writeLong(communitySigmaTotal);
out.writeLong(edgeWeight);
out.writeLong(sourceId);
//out.writeUTF(sourceId);
WritableUtils.writeString(out, sourceId);
}

public long getCommunityId() {
public String getCommunityId() {
return communityId;
}

public void setCommunityId(long l) {
public void setCommunityId(String l) {
this.communityId = l;
}

Expand All @@ -75,11 +78,11 @@ public void setEdgeWeight(long edgeWeight) {
this.edgeWeight = edgeWeight;
}

public long getSourceId(){
public String getSourceId(){
return sourceId;
}

public void setSourceId(long sourceId){
public void setSourceId(String sourceId){
this.sourceId = sourceId;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,21 @@
package mil.darpa.xdata.louvain.giraph;

import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;

import org.apache.hadoop.io.Writable;


/**
* The state of a vertex.
*
*/
public class LouvainNodeState implements Writable{
public class LouvainNodeState implements Writable {

private long community;
private String community = "";
private long communitySigmaTotal;

// the interanal edge weight of a node
Expand All @@ -40,7 +41,7 @@ public LouvainNodeState(){

@Override
public void readFields(DataInput in) throws IOException {
community = in.readLong();
community = WritableUtils.readString(in);
communitySigmaTotal = in.readLong();
internalWeight = in.readLong();
changed = in.readLong();
Expand All @@ -55,7 +56,7 @@ public void readFields(DataInput in) throws IOException {

@Override
public void write(DataOutput out) throws IOException {
out.writeLong(community);
WritableUtils.writeString(out, community);
out.writeLong(communitySigmaTotal);
out.writeLong(internalWeight);
out.writeLong(changed);
Expand All @@ -67,11 +68,11 @@ public void write(DataOutput out) throws IOException {

}

public long getCommunity() {
public String getCommunity() {
return community;
}

public void setCommunity(long community) {
public void setCommunity(String community) {
this.community = community;
}

Expand Down
Loading

0 comments on commit fa00d72

Please sign in to comment.