From fa00d72c175fbf5cf9b65b3bbdbaf79b93899983 Mon Sep 17 00:00:00 2001 From: Dwayne Pryce Date: Tue, 8 Oct 2013 10:20:08 -0700 Subject: [PATCH] Updating Louvain to use Text objects instead of LongWritable --- .../louvain/giraph/LouvainMasterCompute.java | 27 +- .../xdata/louvain/giraph/LouvainMessage.java | 33 +- .../louvain/giraph/LouvainNodeState.java | 17 +- .../xdata/louvain/giraph/LouvainVertex.java | 405 +++++++++--------- .../giraph/LouvainVertexInputFormat.java | 114 +++-- .../giraph/LouvainVertexOutputFormat.java | 12 +- .../mapreduce/CommunityCompression.java | 46 +- .../mapreduce/LouvainVertexWritable.java | 21 +- 8 files changed, 318 insertions(+), 357 deletions(-) diff --git a/src/main/java/mil/darpa/xdata/louvain/giraph/LouvainMasterCompute.java b/src/main/java/mil/darpa/xdata/louvain/giraph/LouvainMasterCompute.java index 374c6f0..1b93775 100644 --- a/src/main/java/mil/darpa/xdata/louvain/giraph/LouvainMasterCompute.java +++ b/src/main/java/mil/darpa/xdata/louvain/giraph/LouvainMasterCompute.java @@ -1,13 +1,5 @@ package mil.darpa.xdata.louvain.giraph; -import java.io.BufferedReader; -import java.io.BufferedWriter; -import java.io.IOException; -import java.io.InputStreamReader; -import java.io.OutputStreamWriter; -import java.util.ArrayList; -import java.util.List; - import org.apache.giraph.aggregators.DoubleSumAggregator; import org.apache.giraph.aggregators.LongSumAggregator; import org.apache.giraph.master.DefaultMasterCompute; @@ -17,6 +9,10 @@ import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.LongWritable; +import java.io.*; +import java.util.ArrayList; +import java.util.List; + /** * Master compute class. performs a compute function before each super step. Performs 4 functions. @@ -46,9 +42,9 @@ public class LouvainMasterCompute extends DefaultMasterCompute{ public void initialize() throws InstantiationException, IllegalAccessException { this.registerAggregator(LouvainVertex.CHANGE_AGG, LongSumAggregator.class); this.registerPersistentAggregator(LouvainVertex.TOTAL_EDGE_WEIGHT_AGG, LongSumAggregator.class); - for (int i =0; i < LouvainVertex.getNumQAggregators(getConf()); i++){ - this.registerPersistentAggregator(LouvainVertex.ACTUAL_Q_AGG+i, DoubleSumAggregator.class); - } + //for (int i =0; i < LouvainVertex.getNumQAggregators(getConf()); i++){ + this.registerPersistentAggregator(LouvainVertex.ACTUAL_Q_AGG, DoubleSumAggregator.class); + //} } @@ -102,9 +98,9 @@ else if (halt){ private double getActualQ(){ double actualQ = 0.0; - for (int i =0; i < LouvainVertex.getNumQAggregators(getConf()); i++){ - actualQ += ( (DoubleWritable) getAggregatedValue(LouvainVertex.ACTUAL_Q_AGG+i) ).get(); - } + //for (int i =0; i < LouvainVertex.getNumQAggregators(getConf()); i++){ + actualQ += ( (DoubleWritable) getAggregatedValue(LouvainVertex.ACTUAL_Q_AGG) ).get(); + //} return actualQ; } @@ -146,6 +142,7 @@ private void markPipeLineComplete(String message){ String outputPath = getConf().get("mapred.output.dir"); String dir = outputPath.substring(0, outputPath.lastIndexOf("/")); String filename = getConf().get("fs.defaultFS")+dir+"/_COMPLETE"; + //String filename = "hdfs://xd-namenode:8020"+dir+"/_COMPLETE"; System.out.println("Writing "+filename); writeFile(filename,message); } @@ -159,6 +156,7 @@ private void writeQvalue(String message){ String stage = outputPath.substring(lastIndexOfSlash+1); String stagenumber = stage.substring(stage.lastIndexOf("_")+1); String filename = getConf().get("fs.defaultFS")+dir+"/_q_"+stagenumber; + //String filename = "hdfs://xd-namenode:8020"+dir+"/_q_"+stagenumber; writeFile(filename,message); } @@ -176,6 +174,7 @@ private double getPreviousQvalue(){ } else{ String filename = getConf().get("fs.defaultFS")+dir+"/_q_"+previousStageNumber; + //String filename = "hdfs://xd-namenode:8020"+dir+"/_q_"+previousStageNumber; String result = this.readFile(filename).trim(); return Double.parseDouble(result); } diff --git a/src/main/java/mil/darpa/xdata/louvain/giraph/LouvainMessage.java b/src/main/java/mil/darpa/xdata/louvain/giraph/LouvainMessage.java index c4f9ee7..a8591dd 100644 --- a/src/main/java/mil/darpa/xdata/louvain/giraph/LouvainMessage.java +++ b/src/main/java/mil/darpa/xdata/louvain/giraph/LouvainMessage.java @@ -1,25 +1,26 @@ package mil.darpa.xdata.louvain.giraph; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableUtils; + import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; -import org.apache.hadoop.io.Writable; - /** - * messages sent between vertcies. + * messages sent between vertices. */ -public class LouvainMessage implements Writable{ +public class LouvainMessage implements Writable { - private long communityId; + private String communityId = ""; private long communitySigmaTotal; private long edgeWeight; - private long sourceId; + private String sourceId = ""; public LouvainMessage(){} - public LouvainMessage(long communityId,long sigmaTotal,long weight,long sourceId){ + public LouvainMessage(String communityId,long sigmaTotal,long weight,String sourceId){ this(); this.communityId = communityId; this.communitySigmaTotal = sigmaTotal; @@ -33,25 +34,27 @@ public LouvainMessage(LouvainMessage other){ @Override public void readFields(DataInput in) throws IOException { - communityId = in.readLong(); + communityId = WritableUtils.readString(in); communitySigmaTotal = in.readLong(); edgeWeight = in.readLong(); - sourceId = in.readLong(); + sourceId = WritableUtils.readString(in); } @Override public void write(DataOutput out) throws IOException { - out.writeLong(communityId); + WritableUtils.writeString(out, communityId); + //out.writeUTF(communityId); out.writeLong(communitySigmaTotal); out.writeLong(edgeWeight); - out.writeLong(sourceId); + //out.writeUTF(sourceId); + WritableUtils.writeString(out, sourceId); } - public long getCommunityId() { + public String getCommunityId() { return communityId; } - public void setCommunityId(long l) { + public void setCommunityId(String l) { this.communityId = l; } @@ -75,11 +78,11 @@ public void setEdgeWeight(long edgeWeight) { this.edgeWeight = edgeWeight; } - public long getSourceId(){ + public String getSourceId(){ return sourceId; } - public void setSourceId(long sourceId){ + public void setSourceId(String sourceId){ this.sourceId = sourceId; } diff --git a/src/main/java/mil/darpa/xdata/louvain/giraph/LouvainNodeState.java b/src/main/java/mil/darpa/xdata/louvain/giraph/LouvainNodeState.java index 0384081..a75d70a 100644 --- a/src/main/java/mil/darpa/xdata/louvain/giraph/LouvainNodeState.java +++ b/src/main/java/mil/darpa/xdata/louvain/giraph/LouvainNodeState.java @@ -1,20 +1,21 @@ package mil.darpa.xdata.louvain.giraph; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableUtils; + import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.util.ArrayList; -import org.apache.hadoop.io.Writable; - /** * The state of a vertex. * */ -public class LouvainNodeState implements Writable{ +public class LouvainNodeState implements Writable { - private long community; + private String community = ""; private long communitySigmaTotal; // the interanal edge weight of a node @@ -40,7 +41,7 @@ public LouvainNodeState(){ @Override public void readFields(DataInput in) throws IOException { - community = in.readLong(); + community = WritableUtils.readString(in); communitySigmaTotal = in.readLong(); internalWeight = in.readLong(); changed = in.readLong(); @@ -55,7 +56,7 @@ public void readFields(DataInput in) throws IOException { @Override public void write(DataOutput out) throws IOException { - out.writeLong(community); + WritableUtils.writeString(out, community); out.writeLong(communitySigmaTotal); out.writeLong(internalWeight); out.writeLong(changed); @@ -67,11 +68,11 @@ public void write(DataOutput out) throws IOException { } - public long getCommunity() { + public String getCommunity() { return community; } - public void setCommunity(long community) { + public void setCommunity(String community) { this.community = community; } diff --git a/src/main/java/mil/darpa/xdata/louvain/giraph/LouvainVertex.java b/src/main/java/mil/darpa/xdata/louvain/giraph/LouvainVertex.java index d4bb398..3ad8897 100644 --- a/src/main/java/mil/darpa/xdata/louvain/giraph/LouvainVertex.java +++ b/src/main/java/mil/darpa/xdata/louvain/giraph/LouvainVertex.java @@ -1,382 +1,367 @@ package mil.darpa.xdata.louvain.giraph; -import java.math.RoundingMode; -import java.math.BigDecimal; -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Iterator; -import java.util.Map; import org.apache.giraph.edge.Edge; import org.apache.giraph.edge.EdgeFactory; import org.apache.giraph.graph.Vertex; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; - +import java.io.IOException; +import java.math.BigDecimal; +import java.math.RoundingMode; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; /** - * Performs the BSP portion of the distributed louvain algorithim. + * Performs the BSP portion of the distributed louvain algorithim. * - * The computation is completed as a series of repeated steps, movement is restricted to ~half - * of the nodes on each cycle so a full pass requires two cycles + * The computation is completed as a series of repeated steps, movement is + * restricted to ~half of the nodes on each cycle so a full pass requires two + * cycles * - * step 1. each vertex recieves community values from its community hub, and sends its community to its neighbors - * step 2. each vertex determines if it should move to a neighboring community or not, and sends its information to - * its community hub. - * step 3. each community hub re-calcuates community totals, and sends the updates to each community memeber. + * step 1. each vertex recieves community values from its community hub, and + * sends its community to its neighbors step 2. each vertex determines if it + * should move to a neighboring community or not, and sends its information to + * its community hub. step 3. each community hub re-calcuates community totals, + * and sends the updates to each community memeber. * - * When the number of nodes that change communities stops decreassing for 4 cycles, or when the number of nodes - * that change reaches 0 the computation ends. + * When the number of nodes that change communities stops decreassing for 4 + * cycles, or when the number of nodes that change reaches 0 the computation + * ends. * * * * @author Eric Kimbrel - Sotera Defense - * + * */ -public class LouvainVertex extends Vertex{ +public class LouvainVertex extends Vertex { - - // constants used to register and lookup aggregators public static final String CHANGE_AGG = "change_aggregator"; public static final String TOTAL_EDGE_WEIGHT_AGG = "total_edge_weight_aggregator"; - - + // It may be the case that splitting the aggregators to multiple aggreators - // will improve performance, set actual.Q.aggregators to set the number, defaults to 1 + // will improve performance, set actual.Q.aggregators to set the number, + // defaults to 1 public static final String ACTUAL_Q_AGG = "actual_q_aggregator"; - - - public static int getNumQAggregators(Configuration conf){ - return conf.getInt("actual.Q.aggregators",1); - } - - private void aggregateQ(Double q){ - int aggregators = getNumQAggregators(getConf()); - if (0 < aggregators){ - int modId = (int) this.getId().get() % aggregators; - aggregate(ACTUAL_Q_AGG+modId,new DoubleWritable(q)); - } + +// public static int getNumQAggregators(Configuration conf) { +// return conf.getInt("actual.Q.aggregators", 1); +// } + + private void aggregateQ(Double q) { +// int aggregators = getNumQAggregators(getConf()); +// if (0 < aggregators) { +// int modId = (int) this.getId().get() % aggregators; + aggregate(ACTUAL_Q_AGG, new DoubleWritable(q)); +// } } - - - + @Override public void compute(Iterable messages) throws IOException { - - long superstep = getSuperstep(); - int minorstep = (int) (superstep % 3); // the step in this iteration - int iteration = (int) (superstep / 3); // the current iteration, two iterations make a full pass. - - - + long superstep = getSuperstep(); + int minorstep = (int) (superstep % 3); // the step in this iteration + int iteration = (int) (superstep / 3); // the current iteration, two + // iterations make a full pass. // count the total edge weight of the graph on the first super step only - if (superstep == 0){ - aggregate(TOTAL_EDGE_WEIGHT_AGG,new LongWritable(getValue().getNodeWeight() + getValue().getInternalWeight())); + if (superstep == 0) { + aggregate(TOTAL_EDGE_WEIGHT_AGG, new LongWritable(getValue().getNodeWeight() + getValue().getInternalWeight())); } - + // nodes that have no edges send themselves a message on the step 0 - if (superstep == 0 && !getEdges().iterator().hasNext()){ + if (superstep == 0 && !getEdges().iterator().hasNext()) { this.sendMessage(this.getId(), new LouvainMessage()); voteToHalt(); return; } - // nodes that have no edges aggregate their Q value and exit computation on step 1 - else if (superstep == 1 && !getEdges().iterator().hasNext()){ - double q = calculateActualQ( new ArrayList()); + // nodes that have no edges aggregate their Q value and exit computation + // on step 1 + else if (superstep == 1 && !getEdges().iterator().hasNext()) { + double q = calculateActualQ(new ArrayList()); aggregateQ(q); voteToHalt(); return; } - - // at the start of each full pass check to see if progress is still being made, if not halt - if (minorstep ==1 && iteration > 0 && iteration % 2 == 0){ - getValue().setChanged(0); // change count is per pass - long totalChange = ( (LongWritable) getAggregatedValue(CHANGE_AGG)).get(); + + // at the start of each full pass check to see if progress is still + // being made, if not halt + if (minorstep == 1 && iteration > 0 && iteration % 2 == 0) { + getValue().setChanged(0); // change count is per pass + long totalChange = ((LongWritable) getAggregatedValue(CHANGE_AGG)).get(); getValue().getChangeHistory().add(totalChange); - - // if halting aggregate q value and replace node edges with community edges (for next stage in pipeline) - if (LouvainMasterCompute.decideToHalt(getValue().getChangeHistory(),getConf())){ + + // if halting aggregate q value and replace node edges with + // community edges (for next stage in pipeline) + if (LouvainMasterCompute.decideToHalt(getValue().getChangeHistory(), getConf())) { double q = calculateActualQ(messages); replaceNodeEdgesWithCommunityEdges(messages); aggregateQ(q); return; - // note: we did not vote to halt, MasterCompute will halt computation on next step + // note: we did not vote to halt, MasterCompute will halt + // computation on next step } } - - try{ + + try { switch (minorstep) { - - case 0: + + case 0: getAndSendCommunityInfo(messages); - + // if the next step well require a progress check, // aggregate the number of nodes who have changed community. - if (iteration > 0 && iteration %2 == 0){ - aggregate(CHANGE_AGG,new LongWritable(getValue().getChanged())); + if (iteration > 0 && iteration % 2 == 0) { + aggregate(CHANGE_AGG, new LongWritable(getValue().getChanged())); } - + break; case 1: - calculateBestCommunity(messages,iteration); + calculateBestCommunity(messages, iteration); break; case 2: updateCommunities(messages); break; default: - throw new IllegalArgumentException("Invalid minorstep: "+minorstep); + throw new IllegalArgumentException("Invalid minorstep: " + minorstep); } - } - finally{ + } finally { voteToHalt(); } - + } - - /** - * Get the total edge weight of the graph. - * @return 2*the total graph weight. + * Get the total edge weight of the graph. + * + * @return 2*the total graph weight. */ - private long getTotalEdgeWeight(){ - long m = ( (LongWritable) getAggregatedValue(TOTAL_EDGE_WEIGHT_AGG)).get(); + private long getTotalEdgeWeight() { + long m = ((LongWritable) getAggregatedValue(TOTAL_EDGE_WEIGHT_AGG)).get(); return m; } - - /** - * Each vertex will recieve its own communities sigma_total (if updated), and then send its currenty - * community info to each of its neighbors. + * Each vertex will recieve its own communities sigma_total (if updated), + * and then send its currenty community info to each of its neighbors. + * * @param messages */ - private void getAndSendCommunityInfo(Iterable messages){ + private void getAndSendCommunityInfo(Iterable messages) { LouvainNodeState state = this.getValue(); // set new community information. - if (getSuperstep() > 0){ + if (getSuperstep() > 0) { Iterator it = messages.iterator(); - if (!it.hasNext()){ - throw new IllegalStateException("No community info recieved in getAndSendCommunityInfo! Superstep: "+getSuperstep()+" id: "+this.getId()); + if (!it.hasNext()) { + throw new IllegalStateException("No community info recieved in getAndSendCommunityInfo! Superstep: " + getSuperstep() + " id: " + this.getId()); } - LouvainMessage inMess =it.next(); - if (it.hasNext()){ - throw new IllegalStateException("More than one community info packets recieved in getAndSendCommunityInfo! Superstep: "+getSuperstep()+" id: "+this.getId()); + LouvainMessage inMess = it.next(); + if (it.hasNext()) { + throw new IllegalStateException("More than one community info packets recieved in getAndSendCommunityInfo! Superstep: " + getSuperstep() + " id: " + this.getId()); } state.setCommunity(inMess.getCommunityId()); state.setCommunitySigmaTotal(inMess.getCommunitySigmaTotal()); } - - // send community info to all neighbors - for (Edge edge : getEdges()){ + + // send community info to all neighbors + for (Edge edge : getEdges()) { LouvainMessage outMess = new LouvainMessage(); outMess.setCommunityId(state.getCommunity()); outMess.setCommunitySigmaTotal(state.getCommunitySigmaTotal()); outMess.setEdgeWeight(edge.getValue().get()); - outMess.setSourceId(getId().get()); + outMess.setSourceId(getId().toString()); this.sendMessage(edge.getTargetVertexId(), outMess); } - + } - - /** - * Based on community of each of its neighbors, each vertex determimnes if it should - * retain its currenty community or swtich to a neighboring communinity. + * Based on community of each of its neighbors, each vertex determimnes if + * it should retain its currenty community or swtich to a neighboring + * communinity. + * + * At the end of this step a message is sent to the nodes community hub so a + * new community sigma_total can be calculated. * - * At the end of this step a message is sent to the nodes community hub so a new - * community sigma_total can be calculated. - * * @param messages * @param iteration */ - private void calculateBestCommunity(Iterable messages,int iteration) { - + private void calculateBestCommunity(Iterable messages, int iteration) { LouvainNodeState state = getValue(); - + // group messages by communities. - HashMap communityMap = new HashMap(); - for (LouvainMessage message : messages){ - - - long communityId = message.getCommunityId(); + HashMap communityMap = new HashMap(); + for (LouvainMessage message : messages) { + + String communityId = message.getCommunityId(); long weight = message.getEdgeWeight(); - LouvainMessage newmess = new LouvainMessage(message); + LouvainMessage newmess = new LouvainMessage(message); - if (communityMap.containsKey(communityId)){ + if (communityMap.containsKey(communityId)) { LouvainMessage m = communityMap.get(communityId); m.setEdgeWeight(m.getEdgeWeight() + weight); - } - else{ + } else { communityMap.put(communityId, newmess); } } - + // calculate change in Q for each potential community - long bestCommunityId = getValue().getCommunity(); - long startingCommunityId = bestCommunityId; + String bestCommunityId = getValue().getCommunity(); + String startingCommunityId = bestCommunityId; BigDecimal maxDeltaQ = new BigDecimal("0.0"); - for (Map.Entry entry : communityMap.entrySet()){ - BigDecimal deltaQ = q(startingCommunityId,entry.getValue().getCommunityId(),entry.getValue().getCommunitySigmaTotal(),entry.getValue().getEdgeWeight(),state.getNodeWeight(),state.getInternalWeight()); - if (deltaQ.compareTo(maxDeltaQ) > 0 || (deltaQ.equals(maxDeltaQ) && entry.getValue().getCommunityId() < bestCommunityId)){ + for (Map.Entry entry : communityMap.entrySet()) { + BigDecimal deltaQ = q(startingCommunityId, entry.getValue().getCommunityId(), entry.getValue().getCommunitySigmaTotal(), entry.getValue().getEdgeWeight(), state.getNodeWeight(), state.getInternalWeight()); + if (deltaQ.compareTo(maxDeltaQ) > 0 || (deltaQ.equals(maxDeltaQ) && entry.getValue().getCommunityId().compareTo(bestCommunityId) < 0)) { bestCommunityId = entry.getValue().getCommunityId(); maxDeltaQ = deltaQ; } } - - //ignore switches based on iteration (prevent certain cycles) - if( (state.getCommunity() > bestCommunityId && iteration % 2 == 0) || - ( state.getCommunity() < bestCommunityId && iteration % 2 != 0)){ - bestCommunityId = state.getCommunity(); - //System.out.println("Iteration: "+iteration+" Node: "+getId()+" held stable to prevent cycle"); + + // ignore switches based on iteration (prevent certain cycles) + if ((state.getCommunity().compareTo(bestCommunityId) > 0 && iteration % 2 == 0) || (state.getCommunity().compareTo(bestCommunityId) < 0 && iteration % 2 != 0)) { + bestCommunityId = state.getCommunity(); + // System.out.println("Iteration: "+iteration+" Node: "+getId()+" held stable to prevent cycle"); } - - //update community and change count - if (state.getCommunity() != bestCommunityId){ - //long old = state.getCommunity(); + + // update community and change count + if (!state.getCommunity().equals(bestCommunityId)) { + // long old = state.getCommunity(); LouvainMessage c = communityMap.get(bestCommunityId); - if (bestCommunityId != c.getCommunityId()){ + if (!bestCommunityId.equals(c.getCommunityId())) { throw new IllegalStateException("Community mapping contains wrong Id"); } state.setCommunity(c.getCommunityId()); state.setCommunitySigmaTotal(c.getCommunitySigmaTotal()); state.setChanged(1L); - //System.out.println("Iteration: "+iteration+" Node: "+getId()+" changed from "+old+" -> "+state.getCommunity()+" dq: "+maxDeltaQ); + // System.out.println("Iteration: "+iteration+" Node: "+getId()+" changed from "+old+" -> "+state.getCommunity()+" dq: "+maxDeltaQ); } - - // send our node weight to the community hub to be summed in next superstep - this.sendMessage(new LongWritable(state.getCommunity()), new LouvainMessage(state.getCommunity(),state.getNodeWeight()+state.getInternalWeight(),0,getId().get())); + + // send our node weight to the community hub to be summed in next + // superstep + this.sendMessage(new Text(state.getCommunity()), new LouvainMessage(state.getCommunity(), state.getNodeWeight() + state.getInternalWeight(), 0, getId().toString())); } - - - + /** * determine the change in q if a node were to move to the given community. + * * @param currCommunityId - * @param testCommunityId - * @param testSigmaTot - * @param edgeWeightInCommunity (sum of weight of edges from this ndoe to target community) - * @param nodeWeight (the node degree) + * @param testCommunityId + * @param testSigmaTot + * @param edgeWeightInCommunity + * (sum of weight of edges from this ndoe to target community) + * @param nodeWeight + * (the node degree) * @param internalWeight * @return */ - private BigDecimal q(long currCommunityId,long testCommunityId,long testSigmaTot,long edgeWeightInCommunity,long nodeWeight,long internalWeight){ - boolean isCurrentCommunity = (currCommunityId == testCommunityId); + private BigDecimal q(String currCommunityId, String testCommunityId, long testSigmaTot, long edgeWeightInCommunity, long nodeWeight, long internalWeight) { + boolean isCurrentCommunity = (currCommunityId.equals(testCommunityId)); BigDecimal M = new BigDecimal(Long.toString(getTotalEdgeWeight())); - long k_i_in_L = (isCurrentCommunity) ? edgeWeightInCommunity+internalWeight : edgeWeightInCommunity; - BigDecimal k_i_in = new BigDecimal(Long.toString(k_i_in_L)); - BigDecimal k_i = new BigDecimal(Long.toString(nodeWeight+internalWeight)); + long k_i_in_L = (isCurrentCommunity) ? edgeWeightInCommunity + internalWeight : edgeWeightInCommunity; + BigDecimal k_i_in = new BigDecimal(Long.toString(k_i_in_L)); + BigDecimal k_i = new BigDecimal(Long.toString(nodeWeight + internalWeight)); BigDecimal sigma_tot = new BigDecimal(Long.toString(testSigmaTot)); - if (isCurrentCommunity){ - sigma_tot = sigma_tot.subtract(k_i); - } - //diouble sigma_tot_temp = (isCurrentCommunity) ? testSigmaTot - k_i : testSigmaTot; - BigDecimal deltaQ = new BigDecimal("0.0"); - if (! (isCurrentCommunity && sigma_tot.equals(deltaQ))) { - BigDecimal dividend = k_i.multiply(sigma_tot); - int scale = 20; - deltaQ = k_i_in.subtract(dividend.divide(M,scale,RoundingMode.HALF_DOWN)); - - } - return deltaQ; + if (isCurrentCommunity) { + sigma_tot = sigma_tot.subtract(k_i); + } + // diouble sigma_tot_temp = (isCurrentCommunity) ? testSigmaTot - k_i : + // testSigmaTot; + BigDecimal deltaQ = new BigDecimal("0.0"); + if (!(isCurrentCommunity && sigma_tot.equals(deltaQ))) { + BigDecimal dividend = k_i.multiply(sigma_tot); + int scale = 20; + deltaQ = k_i_in.subtract(dividend.divide(M, scale, RoundingMode.HALF_DOWN)); + + } + return deltaQ; } - - /** - * Each commuity hub aggregates the values from each of its memebers to update the nodes sigma total, - * and then sends this back to each of its members. + * Each commuity hub aggregates the values from each of its memebers to + * update the nodes sigma total, and then sends this back to each of its + * members. + * * @param messages */ private void updateCommunities(Iterable messages) { // sum all community contributions LouvainMessage sum = new LouvainMessage(); - sum.setCommunityId(getId().get()); + sum.setCommunityId(getId().toString()); sum.setCommunitySigmaTotal(0); - for (LouvainMessage m : messages){ + for (LouvainMessage m : messages) { sum.addToSigmaTotal(m.getCommunitySigmaTotal()); } - - //send community back out to all community members - for(LouvainMessage m : messages){ - this.sendMessage(new LongWritable(m.getSourceId()), sum); + + // send community back out to all community members + for (LouvainMessage m : messages) { + this.sendMessage(new Text(m.getSourceId()), sum); } } - - /** * Calculate this nodes contribution for the actual q value of the graph. */ - private double calculateActualQ(Iterable messages){ - //long start = System.currentTimeMillis(); + private double calculateActualQ(Iterable messages) { + // long start = System.currentTimeMillis(); LouvainNodeState state = getValue(); long k_i_in = state.getInternalWeight(); - for (LouvainMessage m : messages){ - if (m.getCommunityId() == state.getCommunity()){ + for (LouvainMessage m : messages) { + if (m.getCommunityId().equals(state.getCommunity())) { try { - k_i_in += this.getEdgeValue(new LongWritable(m.getSourceId())).get(); - } catch(NullPointerException e){ - throw new IllegalStateException("Node: "+getId()+" does not have edge: "+m.getSourceId()+" check that the graph is bi-directional."); + k_i_in += this.getEdgeValue(new Text(m.getSourceId())).get(); + } catch (NullPointerException e) { + throw new IllegalStateException("Node: " + getId() + " does not have edge: " + m.getSourceId() + " check that the graph is bi-directional."); } } } long sigma_tot = getValue().getCommunitySigmaTotal(); long M = this.getTotalEdgeWeight(); long k_i = getValue().getNodeWeight() + getValue().getInternalWeight(); - - double q = ( (((double)k_i_in)/M) - ( ((double) (sigma_tot*k_i)) / Math.pow(M, 2) ) ); - q = (q < 0 ) ? 0 : q; - - //long end = System.currentTimeMillis(); - //System.out.println("calculated actual q in :"+(end-start)); + double q = ((((double) k_i_in) / M) - (((double) (sigma_tot * k_i)) / Math.pow(M, 2))); + q = (q < 0) ? 0 : q; + + // long end = System.currentTimeMillis(); + // System.out.println("calculated actual q in :"+(end-start)); return q; } - - - + /** - * Replace each edge to a neighbor with an edge to that neigbors community instead. - * Done just before exiting computation. In the next state of the piple line this - * edges are aggregated and all communities are represented as single nodes. - * Edges from the community to itself are tracked be the ndoes interal weight. + * Replace each edge to a neighbor with an edge to that neigbors community + * instead. Done just before exiting computation. In the next state of the + * piple line this edges are aggregated and all communities are represented + * as single nodes. Edges from the community to itself are tracked be the + * ndoes interal weight. * * @param messages */ private void replaceNodeEdgesWithCommunityEdges(Iterable messages) { - - + // group messages by communities. - HashMap communityMap = new HashMap(); - for (LouvainMessage message : messages){ - - long communityId = message.getCommunityId(); + HashMap communityMap = new HashMap(); + for (LouvainMessage message : messages) { + + String communityId = message.getCommunityId(); - if (communityMap.containsKey(communityId)){ + if (communityMap.containsKey(communityId)) { LouvainMessage m = communityMap.get(communityId); m.setEdgeWeight(m.getEdgeWeight() + message.getEdgeWeight()); - } - else{ + } else { LouvainMessage newmess = new LouvainMessage(message); communityMap.put(communityId, newmess); } } - - ArrayList> edges = new ArrayList>(communityMap.size()+1); - for (Map.Entry entry : communityMap.entrySet()){ - edges.add(EdgeFactory.create(new LongWritable(entry.getKey()),new LongWritable(entry.getValue().getEdgeWeight()))); + + ArrayList> edges = new ArrayList>(communityMap.size() + 1); + for (Map.Entry entry : communityMap.entrySet()) { + edges.add(EdgeFactory.create(new Text(entry.getKey()), new LongWritable(entry.getValue().getEdgeWeight()))); } this.setEdges(edges); } - - - + } diff --git a/src/main/java/mil/darpa/xdata/louvain/giraph/LouvainVertexInputFormat.java b/src/main/java/mil/darpa/xdata/louvain/giraph/LouvainVertexInputFormat.java index 88fd237..2c9d97b 100644 --- a/src/main/java/mil/darpa/xdata/louvain/giraph/LouvainVertexInputFormat.java +++ b/src/main/java/mil/darpa/xdata/louvain/giraph/LouvainVertexInputFormat.java @@ -1,114 +1,100 @@ package mil.darpa.xdata.louvain.giraph; -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; -import java.util.ArrayList; - - import org.apache.giraph.edge.Edge; import org.apache.giraph.edge.EdgeFactory; import org.apache.giraph.graph.Vertex; import org.apache.giraph.io.formats.TextVertexInputFormat; import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.TaskAttemptContext; - - +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; /** - * Reads in a graph from text file in hdfs. Required formatin is a tab delimited file with 3 columns - * id internal weight edge list + * Reads in a graph from text file in hdfs. Required formatin is a tab delimited + * file with 3 columns id internal weight edge list + * + * the edge list is a comma seperated list of edges of the form id:weight * - * the edge list is a comma seperated list of edges of the form id:weight + * The graph must be bi-directional i.e. if vertex 1 has edge 2:9, the vertex 2 + * must have id 1:9 This condition is not verified as the input is read, but + * results of the algorithim will not be correct, and the run may fail with + * expceptions. * - * The graph must be bi-directional i.e. if vertex 1 has edge 2:9, the vertex 2 must have id 1:9 - * This condition is not verified as the input is read, but results of the algorithim will not be correct, - * and the run may fail with expceptions. * - * */ -public class LouvainVertexInputFormat extends TextVertexInputFormat{ +public class LouvainVertexInputFormat extends TextVertexInputFormat { @Override - public TextVertexReader createVertexReader( - InputSplit arg0, TaskAttemptContext arg1) throws IOException { + public TextVertexReader createVertexReader(InputSplit arg0, TaskAttemptContext arg1) throws IOException { return new LouvainVertexReader(); } - - protected class LouvainVertexReader extends TextVertexReader{ + + protected class LouvainVertexReader extends TextVertexReader { @Override - public Vertex getCurrentVertex() - throws IOException, InterruptedException { + public Vertex getCurrentVertex() throws IOException, InterruptedException { String line = getRecordReader().getCurrentValue().toString(); String[] tokens = line.trim().split("\t"); - if (tokens.length < 2){ - throw new IllegalArgumentException("Invalid line: ("+line+")"); + if (tokens.length < 2) { + throw new IllegalArgumentException("Invalid line: (" + line + ")"); } - - int edgeListIndex = 2; - int internalWeightIndex = 1; - if (tokens.length ==2){ - internalWeightIndex = -1; - edgeListIndex = 1; - } - LouvainNodeState state = new LouvainNodeState(); - LongWritable id = new LongWritable(Long.parseLong(tokens[0])); - state.setCommunity(id.get()); - state.setInternalWeight( (internalWeightIndex > 0) ? Long.parseLong(tokens[internalWeightIndex]) : 0L ); - + Text id = new Text(tokens[0]); + state.setCommunity(id.toString()); + state.setInternalWeight(Long.parseLong(tokens[1])); + long sigma_tot = 0; - Map edgeMap = new HashMap(); - ArrayList> edgesList = new ArrayList>(); - String[] edges = (edgeListIndex < tokens.length) ? tokens[edgeListIndex].split(",") : new String[0]; - for (int i = 0; i < edges.length ; i++){ - if (edges[i].indexOf(':') != -1){ + Map edgeMap = new HashMap(); + ArrayList> edgesList = new ArrayList>(); + String[] edges = (tokens.length > 2) ? tokens[2].split(",") : new String[0]; + for (int i = 0; i < edges.length; i++) { + if (edges[i].indexOf(':') != -1) { String[] edgeTokens = edges[i].split(":"); - if (edgeTokens.length != 2){ - throw new IllegalArgumentException("invalid edge ("+edgeTokens[i]+") in line ("+line+")"); + if (edgeTokens.length != 2) { + throw new IllegalArgumentException("invalid edge (" + edgeTokens[i] + ") in line (" + line + ")"); } long weight = Long.parseLong(edgeTokens[1]); sigma_tot += weight; - Long edgeKey = Long.parseLong(edgeTokens[0]); - edgeMap.put(new LongWritable(edgeKey), new LongWritable(weight)); - //edgesList.add(EdgeFactory.create(new LongWritable(edgeKey),new LongWritable(weight))); - } - else{ - Long edgeKey = Long.parseLong(edges[i]); + Text edgeKey = new Text(edgeTokens[0]); + edgeMap.put(edgeKey, new LongWritable(weight)); + // edgesList.add(EdgeFactory.create(new + // LongWritable(edgeKey),new LongWritable(weight))); + } else { + Text edgeKey = new Text(tokens[i]); Long weight = 1L; sigma_tot += weight; - edgeMap.put(new LongWritable(edgeKey), new LongWritable(weight)); - //edgesList.add(EdgeFactory.create(new LongWritable(edgeKey),new LongWritable(weight))); + edgeMap.put(edgeKey, new LongWritable(weight)); + // edgesList.add(EdgeFactory.create(new + // LongWritable(edgeKey),new LongWritable(weight))); } - + } - state.setCommunitySigmaTotal(sigma_tot+state.getInternalWeight()); + state.setCommunitySigmaTotal(sigma_tot + state.getInternalWeight()); state.setNodeWeight(sigma_tot); - - for (Map.Entry entry : edgeMap.entrySet()){ - edgesList.add(EdgeFactory.create(entry.getKey(),entry.getValue())); - } + for (Map.Entry entry : edgeMap.entrySet()) { + edgesList.add(EdgeFactory.create(entry.getKey(), entry.getValue())); + } + + Vertex vertex = this.getConf().createVertex(); + vertex.initialize(id, state, edgesList); - Vertex vertex = this.getConf().createVertex(); - vertex.initialize(id, state,edgesList); - return vertex; - + } - - @Override public boolean nextVertex() throws IOException, InterruptedException { return getRecordReader().nextKeyValue(); } - + } } diff --git a/src/main/java/mil/darpa/xdata/louvain/giraph/LouvainVertexOutputFormat.java b/src/main/java/mil/darpa/xdata/louvain/giraph/LouvainVertexOutputFormat.java index 82b6d81..d29193a 100644 --- a/src/main/java/mil/darpa/xdata/louvain/giraph/LouvainVertexOutputFormat.java +++ b/src/main/java/mil/darpa/xdata/louvain/giraph/LouvainVertexOutputFormat.java @@ -1,8 +1,5 @@ package mil.darpa.xdata.louvain.giraph; -import java.io.IOException; - - import org.apache.giraph.edge.Edge; import org.apache.giraph.graph.Vertex; import org.apache.giraph.io.formats.TextVertexOutputFormat; @@ -10,6 +7,7 @@ import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.TaskAttemptContext; +import java.io.IOException; /** @@ -21,7 +19,7 @@ * the edge list is a comma seperated list of edges of the form id:weight * */ -public class LouvainVertexOutputFormat extends TextVertexOutputFormat{ +public class LouvainVertexOutputFormat extends TextVertexOutputFormat{ @Override public TextVertexWriter createVertexWriter( @@ -34,7 +32,7 @@ protected class LouvainVertexWriter extends TextVertexWriter { @Override public void writeVertex( - Vertex vertex) + Vertex vertex) throws IOException, InterruptedException { StringBuilder b = new StringBuilder(); b.append(vertex.getValue().getCommunity()); @@ -42,7 +40,7 @@ public void writeVertex( b.append(vertex.getValue().getInternalWeight()); b.append("\t"); - for (Edge e: vertex.getEdges()){ + for (Edge e: vertex.getEdges()){ b.append(e.getTargetVertexId()); b.append(":"); b.append(e.getValue()); @@ -50,7 +48,7 @@ public void writeVertex( } b.setLength(b.length() - 1); - getRecordWriter().write(new Text(vertex.getId().toString()), new Text(b.toString())); + getRecordWriter().write(vertex.getId(), new Text(b.toString())); } diff --git a/src/main/java/mil/darpa/xdata/louvain/mapreduce/CommunityCompression.java b/src/main/java/mil/darpa/xdata/louvain/mapreduce/CommunityCompression.java index 1895a68..94e7f89 100644 --- a/src/main/java/mil/darpa/xdata/louvain/mapreduce/CommunityCompression.java +++ b/src/main/java/mil/darpa/xdata/louvain/mapreduce/CommunityCompression.java @@ -1,26 +1,14 @@ package mil.darpa.xdata.louvain.mapreduce; -import java.io.IOException; -import java.util.HashMap; -import java.util.Iterator; -import java.util.Map.Entry; - import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapred.FileInputFormat; -import org.apache.hadoop.mapred.FileOutputFormat; -import org.apache.hadoop.mapred.JobClient; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.MapReduceBase; -import org.apache.hadoop.mapred.Mapper; -import org.apache.hadoop.mapred.OutputCollector; -import org.apache.hadoop.mapred.Reducer; -import org.apache.hadoop.mapred.Reporter; -import org.apache.hadoop.mapred.TextInputFormat; -import org.apache.hadoop.mapred.TextOutputFormat; - +import org.apache.hadoop.mapred.*; +import java.io.IOException; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map.Entry; /** @@ -39,17 +27,17 @@ public class CommunityCompression { - public static class Map extends MapReduceBase implements Mapper { + public static class Map extends MapReduceBase implements Mapper { @Override - public void map(LongWritable key, Text value, OutputCollector output, Reporter reporter) throws IOException { + public void map(LongWritable key, Text value, OutputCollector output, Reporter reporter) throws IOException { String[] tokens = value.toString().trim().split("\t"); if (3 > tokens.length){ throw new IllegalArgumentException("Expected 4 cols: got "+tokens.length+" from line: "+tokens.toString()); } - LongWritable outKey = new LongWritable(Long.parseLong(tokens[1])); // group by community + Text outKey = new Text(tokens[1]); // group by community String edgeListStr = (tokens.length == 3) ? "" : tokens[3]; LouvainVertexWritable outValue = LouvainVertexWritable.fromTokens(tokens[2], edgeListStr); output.collect(outKey, outValue); @@ -58,19 +46,19 @@ public void map(LongWritable key, Text value, OutputCollector { + public static class Reduce extends MapReduceBase implements Reducer { - public void reduce(LongWritable key, Iterator values, OutputCollector output, Reporter reporter) throws IOException { - long communityId = key.get(); + public void reduce(Text key, Iterator values, OutputCollector output, Reporter reporter) throws IOException { + String communityId = key.toString(); long weight = 0; - HashMap edgeMap = new HashMap(); + HashMap edgeMap = new HashMap(); while (values.hasNext()){ LouvainVertexWritable vertex = values.next(); weight += vertex.weight; - for (Entry entry : vertex.edges.entrySet()){ - long entrykey = entry.getKey(); + for (Entry entry : vertex.edges.entrySet()){ + String entrykey = entry.getKey(); - if (entrykey == communityId){ + if (entrykey.equals(communityId)){ weight += entry.getValue(); } else if (edgeMap.containsKey(entrykey)){ @@ -85,7 +73,7 @@ else if (edgeMap.containsKey(entrykey)){ StringBuilder b = new StringBuilder(); b.append(weight).append("\t"); - for (Entry entry : edgeMap.entrySet()){ + for (Entry entry : edgeMap.entrySet()){ b.append(entry.getKey()).append(":").append(entry.getValue()).append(","); } b.setLength(b.length() - 1); @@ -101,7 +89,7 @@ public static void main(String [] args) throws Exception { JobConf conf = new JobConf(CommunityCompression.class); conf.setJobName("Louvain graph compression"); - conf.setOutputKeyClass(LongWritable.class); + conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(LouvainVertexWritable.class); conf.setMapperClass(Map.class); diff --git a/src/main/java/mil/darpa/xdata/louvain/mapreduce/LouvainVertexWritable.java b/src/main/java/mil/darpa/xdata/louvain/mapreduce/LouvainVertexWritable.java index 1adb06c..b7ac694 100644 --- a/src/main/java/mil/darpa/xdata/louvain/mapreduce/LouvainVertexWritable.java +++ b/src/main/java/mil/darpa/xdata/louvain/mapreduce/LouvainVertexWritable.java @@ -1,31 +1,32 @@ package mil.darpa.xdata.louvain.mapreduce; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableUtils; + import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.util.HashMap; import java.util.Map; -import org.apache.hadoop.io.Writable; - /** * Writable class to represent community information for compressing a graph * by its communities. */ -public class LouvainVertexWritable implements Writable{ +public class LouvainVertexWritable implements Writable { long weight; - Map edges; + Map edges; @Override public void readFields(DataInput in) throws IOException { weight = in.readLong(); int edgeSize = in.readInt(); - edges = new HashMap(edgeSize); + edges = new HashMap(edgeSize); for (int i =0; i< edgeSize; i++){ - edges.put(in.readLong(), in.readLong()); + edges.put(WritableUtils.readString(in), in.readLong()); } } @@ -34,8 +35,8 @@ public void readFields(DataInput in) throws IOException { public void write(DataOutput out) throws IOException { out.writeLong(weight); out.writeInt(edges.size()); - for (Map.Entry entry : edges.entrySet()){ - out.writeLong(entry.getKey()); + for (Map.Entry entry : edges.entrySet()){ + WritableUtils.writeString(out, entry.getKey()); out.writeLong(entry.getValue()); } } @@ -45,11 +46,11 @@ public static LouvainVertexWritable fromTokens(String weight,String edges){ LouvainVertexWritable vertex = new LouvainVertexWritable(); vertex.weight = Long.parseLong(weight); - Map edgeMap = new HashMap(); + Map edgeMap = new HashMap(); if (edges.length() > 0){ for (String edgeTuple: edges.split(",")){ String[] edgeTokens = edgeTuple.split(":"); - edgeMap.put(Long.parseLong(edgeTokens[0]), Long.parseLong(edgeTokens[1])); + edgeMap.put(edgeTokens[0], Long.parseLong(edgeTokens[1])); } } vertex.edges = edgeMap;