Skip to content
This repository has been archived by the owner on Oct 8, 2019. It is now read-only.

Commit

Permalink
Implemented late binding of a session groupID (jobID) for Tez
Browse files Browse the repository at this point in the history
  • Loading branch information
myui committed Sep 4, 2014
1 parent 27aefb1 commit c186617
Show file tree
Hide file tree
Showing 5 changed files with 15 additions and 6 deletions.
2 changes: 1 addition & 1 deletion src/main/hivemall/LearnerBaseUDTF.java
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ protected PredictionModel createModel(String label) {
protected MixClient configureMixClient(String connectURIs, String label, PredictionModel model) {
assert (connectURIs != null);
assert (model != null);
String jobId = (mixSessionName == null) ? HadoopUtils.getJobId() : mixSessionName;
String jobId = (mixSessionName == null) ? MixClient.DUMMY_JOB_ID : mixSessionName;
if(label != null) {
jobId = jobId + '-' + label;
}
Expand Down
19 changes: 14 additions & 5 deletions src/main/hivemall/mix/client/MixClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import hivemall.mix.MixMessage;
import hivemall.mix.MixMessage.MixEventName;
import hivemall.mix.NodeInfo;
import hivemall.utils.hadoop.HadoopUtils;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
Expand All @@ -41,12 +42,15 @@
import java.util.HashMap;
import java.util.Map;

import javax.annotation.CheckForNull;
import javax.annotation.Nonnull;
import javax.net.ssl.SSLException;

public final class MixClient implements ModelUpdateHandler, Closeable {
public static final String DUMMY_JOB_ID = "__DUMMY_JOB_ID__";

private final MixEventName event;
private final String groupID;
private String groupID;
private final boolean ssl;
private final int mixThreshold;
private final MixRequestRouter router;
Expand All @@ -56,10 +60,7 @@ public final class MixClient implements ModelUpdateHandler, Closeable {
private boolean initialized = false;
private EventLoopGroup workers;

public MixClient(MixEventName event, String groupID, String connectURIs, boolean ssl, int mixThreshold, PredictionModel model) {
assert (event != null);
assert (connectURIs != null);
assert (model != null);
public MixClient(@Nonnull MixEventName event, @CheckForNull String groupID, @Nonnull String connectURIs, boolean ssl, int mixThreshold, @Nonnull PredictionModel model) {
if(groupID == null) {
throw new IllegalArgumentException("groupID is null");
}
Expand Down Expand Up @@ -118,6 +119,7 @@ public boolean onUpdate(Object feature, float weight, float covar, short clock,
}

if(!initialized) {
replaceGroupIDIfRequired();
initialize(); // initialize connections to mix servers
}

Expand All @@ -136,6 +138,13 @@ public boolean onUpdate(Object feature, float weight, float covar, short clock,
return true;
}

private void replaceGroupIDIfRequired() {
if(groupID.startsWith(DUMMY_JOB_ID)) {
String jobId = HadoopUtils.getJobId();
this.groupID = groupID.replace(DUMMY_JOB_ID, jobId);
}
}

@Override
public void close() throws IOException {
if(workers != null) {
Expand Down
Binary file modified target/hivemall-fat.jar
Binary file not shown.
Binary file modified target/hivemall-with-dependencies.jar
Binary file not shown.
Binary file modified target/hivemall.jar
Binary file not shown.

0 comments on commit c186617

Please sign in to comment.