diff --git a/.jenkins/run-splits.sh b/.jenkins/run-splits.sh
index de48d6e056..048e86a815 100755
--- a/.jenkins/run-splits.sh
+++ b/.jenkins/run-splits.sh
@@ -12,4 +12,4 @@ fi
echo "Running tests: $tests"
jacoco=$2
echo "Using jacoco: $jacoco"
-time mvn -fae -Dsurefire.excludedGroups=com.gentics.mesh.test.category.FailingTests,com.gentics.mesh.test.category.ClusterTests -Dmaven.javadoc.skip=true -Dskip.cluster.tests=true -Dmaven.test.failure.ignore=true -Dmesh.container.image.prefix=docker.apa-it.at/ -B -e -pl '!demo/default,!doc,!performance-tests' test -Dtest=$tests -DfailIfNoTests=false -Djacoco.skip=$jacoco | ts "$3 [%Y-%m-%d %H:%M:%S]"
+time mvn -fae -Dsurefire.excludedGroups=com.gentics.mesh.test.category.FailingTests,com.gentics.mesh.test.category.ClusterTests -Dmaven.javadoc.skip=true -Dskip.cluster.tests=true -Dskip.vertx-hazelcast=true -Dmaven.test.failure.ignore=true -Dmesh.container.image.prefix=docker.apa-it.at/ -B -e -pl '!demo/default,!doc,!performance-tests' test -Dtest=$tests -DfailIfNoTests=false -Djacoco.skip=$jacoco | ts "$3 [%Y-%m-%d %H:%M:%S]"
diff --git a/CHANGELOG.adoc b/CHANGELOG.adoc
index 62aee4b201..b075c81f2d 100644
--- a/CHANGELOG.adoc
+++ b/CHANGELOG.adoc
@@ -29,6 +29,9 @@ icon:plus[] Core: The OrientDB database as been updated to version 3.2.10.
icon:plus[] Rest: The new endpoints `/api/v2/.../rolePermissions` allow getting, granting and revoking permissions on entities for multiple roles in a single request.
+icon:plus[] Core: The core Vert.x library was updated to version `4.3.2`.
+
+
[[v1.9.3]]
== 1.9.3 (22.09.2022)
diff --git a/Jenkinsfile.split b/Jenkinsfile.split
index 4f82a6d79e..a6b2114c88 100644
--- a/Jenkinsfile.split
+++ b/Jenkinsfile.split
@@ -62,8 +62,9 @@ stage("Setup Build Environment") {
echo "Setup of GPG"
sh "gpg --no-tty --batch --import /mnt/credentials/gpg/gpg-public-key.asc"
sh "gpg --no-tty --batch --import /mnt/credentials/gpg/gpg-secret-key.asc"
- withCredentials([usernamePassword(credentialsId: 'gentics.gpg', usernameVariable: 'gpgKeyName', passwordVariable: 'gpgKeyPass')]) {
- sh "mvn -ff -B -U -Dmaven.javadoc.skip=true -Dskip.test-plugins=false -Dskip.cluster.tests=true -Dgpg.skip=false -DskipTests ${extraFlags} clean install"
+ withCredentials([usernamePassword(credentialsId: 'repo.gentics.com', usernameVariable: 'repoUsername', passwordVariable: 'repoPassword'),
+ usernamePassword(credentialsId: 'gentics.gpg', usernameVariable: 'gpgKeyName', passwordVariable: 'gpgKeyPass')]) {
+ sh "mvn -ff -B -U -Dmaven.javadoc.skip=true -Dskip.test-plugins=false -Dskip.cluster.tests=true -Dskip.vertx-hazelcast=true -Dgpg.skip=false -DskipTests ${extraFlags} clean install"
}
} else {
echo "Omitted since we don't execute tests"
@@ -116,7 +117,7 @@ stage("Setup Build Environment") {
} else {
sshagent(["git"]) {
try {
- sh "mvn -fae -Dsurefire.excludedGroups=com.gentics.mesh.test.category.FailingTests,com.gentics.mesh.test.category.ClusterTests -Dmaven.javadoc.skip=true -Dskip.cluster.tests=true -Dmaven.test.failure.ignore=true -Dmesh.container.image.prefix=docker.apa-it.at/ -B -U -e -pl '!ferma,!demo/default,!doc,!performance-tests' clean install"
+ sh "mvn -fae -Dsurefire.excludedGroups=com.gentics.mesh.test.category.FailingTests,com.gentics.mesh.test.category.ClusterTests -Dmaven.javadoc.skip=true -Dskip.cluster.tests=true -Dskip.vertx-hazelcast=true -Dmaven.test.failure.ignore=true -Dmesh.container.image.prefix=docker.apa-it.at/ -B -U -e -pl '!ferma,!demo/default,!doc,!performance-tests' clean install"
} finally {
step([$class: 'JUnitResultArchiver', testResults: '**/target/surefire-reports/*.xml'])
}
@@ -131,7 +132,7 @@ stage("Setup Build Environment") {
if (Boolean.valueOf(params.runUnstableTests)) {
sshagent(["git"]) {
try {
- sh "mvn -fae -Dsurefire.groups=com.gentics.mesh.test.category.FailingTests -Dmaven.javadoc.skip=true -Dskip.cluster.tests=true -Dmaven.test.failure.ignore=true -Dmesh.container.image.prefix=docker.apa-it.at/ -B -e -pl '!ferma,!demo/default,!doc,!performance-tests' test -DfailIfNoTests=false"
+ sh "mvn -fae -Dsurefire.groups=com.gentics.mesh.test.category.FailingTests -Dmaven.javadoc.skip=true -Dskip.cluster.tests=true -Dskip.vertx-hazelcast=true -Dmaven.test.failure.ignore=true -Dmesh.container.image.prefix=docker.apa-it.at/ -B -e -pl '!ferma,!demo/default,!doc,!performance-tests' test -DfailIfNoTests=false"
} finally {
step([$class: 'JUnitResultArchiver', testResults: '**/target/surefire-reports/*.xml'])
}
@@ -218,7 +219,7 @@ stage("Setup Build Environment") {
node("mesh-performance-worker-11") {
try {
unstash 'project'
- sh "mvn -B -U clean package -pl '!doc,!demo/default,!server' -Dskip.unit.tests=true -Dskip.cluster.tests=true -Dskip.performance.tests=false -Dmaven.test.failure.ignore=true -Dmesh.container.image.prefix=docker.apa-it.at/"
+ sh "mvn -B -U clean package -pl '!doc,!demo/default,!server' -Dskip.unit.tests=true -Dskip.cluster.tests=true -Dskip.vertx-hazelcast=true -Dskip.performance.tests=false -Dmaven.test.failure.ignore=true -Dmesh.container.image.prefix=docker.apa-it.at/"
} finally {
step([$class: 'JUnitResultArchiver', testResults: '**/target/*.performance.xml'])
deleteDir()
diff --git a/api/pom.xml b/api/pom.xml
index 6997f2d03d..5711ba35c6 100644
--- a/api/pom.xml
+++ b/api/pom.xml
@@ -47,6 +47,10 @@
commons-clicommons-cli
+
+ com.fasterxml.jackson.core
+ jackson-databind
+ com.github.stefanbirkner
diff --git a/bom/pom.xml b/bom/pom.xml
index 24ce87ad86..1dbeb9281d 100644
--- a/bom/pom.xml
+++ b/bom/pom.xml
@@ -29,7 +29,7 @@
3.12.82.13.22.13.2
- 4.1.72.Final
+ 4.1.78.Final1.262.16.831.30
@@ -497,6 +497,13 @@
2.6.0
+
+
+ com.gentics.mesh
+ hazelcast3-cluster-manager
+ ${project.version}
+
+
com.gentics.mesh
diff --git a/common/src/main/java/com/gentics/mesh/auth/handler/MeshAnonymousAuthHandler.java b/common/src/main/java/com/gentics/mesh/auth/handler/MeshAnonymousAuthHandler.java
index 452e9bcbfc..f4332af5ac 100644
--- a/common/src/main/java/com/gentics/mesh/auth/handler/MeshAnonymousAuthHandler.java
+++ b/common/src/main/java/com/gentics/mesh/auth/handler/MeshAnonymousAuthHandler.java
@@ -15,18 +15,18 @@
import io.vertx.core.AsyncResult;
import io.vertx.core.Handler;
import io.vertx.core.http.HttpServerRequest;
-import io.vertx.core.json.JsonObject;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
+import io.vertx.ext.auth.User;
import io.vertx.ext.web.RoutingContext;
-import io.vertx.ext.web.handler.impl.AuthHandlerImpl;
+import io.vertx.ext.web.handler.impl.AuthenticationHandlerImpl;
/**
* Auth handler which will deal with anonymous auth handling. This handler will only auth the user if anonymous auth is enabled and the request does not contain
* any auth header.
*/
@Singleton
-public class MeshAnonymousAuthHandler extends AuthHandlerImpl implements MeshAuthHandler {
+public class MeshAnonymousAuthHandler extends AuthenticationHandlerImpl implements MeshAuthHandler {
public static final String ANONYMOUS_USERNAME = "anonymous";
@@ -45,7 +45,7 @@ public MeshAnonymousAuthHandler(MeshJWTAuthProvider authProvider, MeshOptions op
}
@Override
- public void parseCredentials(RoutingContext arg0, Handler> arg1) {
+ public void authenticate(RoutingContext routingContext, Handler> handler) {
// Not needed for this handler
}
diff --git a/common/src/main/java/com/gentics/mesh/auth/handler/MeshAuthHandler.java b/common/src/main/java/com/gentics/mesh/auth/handler/MeshAuthHandler.java
index fba96a4b81..c3ce95feb2 100644
--- a/common/src/main/java/com/gentics/mesh/auth/handler/MeshAuthHandler.java
+++ b/common/src/main/java/com/gentics/mesh/auth/handler/MeshAuthHandler.java
@@ -2,12 +2,12 @@
import io.vertx.ext.auth.User;
import io.vertx.ext.web.RoutingContext;
-import io.vertx.ext.web.handler.AuthHandler;
+import io.vertx.ext.web.handler.AuthenticationHandler;
/**
- * Common interface all all custom Gentics Mesh Auth handlers.
+ * Common interface all custom Gentics Mesh Auth handlers.
*/
-public interface MeshAuthHandler extends AuthHandler {
+public interface MeshAuthHandler extends AuthenticationHandler {
/**
* Finish the request with code 401.
diff --git a/common/src/main/java/com/gentics/mesh/auth/handler/MeshJWTAuthHandler.java b/common/src/main/java/com/gentics/mesh/auth/handler/MeshJWTAuthHandler.java
index 0a418171ca..263d9dfe38 100644
--- a/common/src/main/java/com/gentics/mesh/auth/handler/MeshJWTAuthHandler.java
+++ b/common/src/main/java/com/gentics/mesh/auth/handler/MeshJWTAuthHandler.java
@@ -16,24 +16,23 @@
import io.vertx.core.AsyncResult;
import io.vertx.core.Handler;
+import io.vertx.core.http.Cookie;
import io.vertx.core.http.HttpServerRequest;
-import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import io.vertx.ext.auth.User;
-import io.vertx.ext.web.Cookie;
import io.vertx.ext.web.RoutingContext;
import io.vertx.ext.web.handler.JWTAuthHandler;
-import io.vertx.ext.web.handler.impl.AuthHandlerImpl;
+import io.vertx.ext.web.handler.impl.AuthenticationHandlerImpl;
/**
- * This class extends the Vert.x AuthHandler, so that it also works when the token is set as a cookie.
+ * This class extends the Vert.x AuthenticationHandlerImpl, so that it also works when the token is set as a cookie.
*
* Central authentication handler for mesh. All requests to secured resources must pass this handler.
*/
@Singleton
-public class MeshJWTAuthHandler extends AuthHandlerImpl implements JWTAuthHandler, MeshAuthHandler {
+public class MeshJWTAuthHandler extends AuthenticationHandlerImpl implements JWTAuthHandler, MeshAuthHandler {
private static final Logger log = LoggerFactory.getLogger(MeshJWTAuthHandler.class);
@@ -56,24 +55,6 @@ public MeshJWTAuthHandler(MeshJWTAuthProvider authProvider, MeshOptions meshOpti
options = new JsonObject();
}
- @Override
- public JWTAuthHandler setAudience(List audience) {
- options.put("audience", new JsonArray(audience));
- return this;
- }
-
- @Override
- public JWTAuthHandler setIssuer(String issuer) {
- options.put("issuer", issuer);
- return this;
- }
-
- @Override
- public JWTAuthHandler setIgnoreExpiration(boolean ignoreExpiration) {
- options.put("ignoreExpiration", ignoreExpiration);
- return this;
- }
-
@Override
public void handle(RoutingContext context) {
handle(context, false);
@@ -97,7 +78,7 @@ public void handle(RoutingContext context, boolean ignoreDecodeErrors) {
}
@Override
- public void parseCredentials(RoutingContext arg0, Handler> arg1) {
+ public void authenticate(RoutingContext routingContext, Handler> handler) {
// Not needed for this handler
}
@@ -155,7 +136,7 @@ private void handleJWTAuth(RoutingContext context, boolean ignoreDecodeErrors) {
}
// 4. Authenticate the found token using JWT
- JsonObject authInfo = new JsonObject().put("jwt", token).put("options", options);
+ JsonObject authInfo = new JsonObject().put("token", token).put("options", options);
authProvider.authenticateJWT(authInfo, res -> {
// Authentication was successful.
@@ -189,4 +170,18 @@ private void handleJWTAuth(RoutingContext context, boolean ignoreDecodeErrors) {
});
}
+ @Override
+ public JWTAuthHandler scopeDelimiter(String s) {
+ return this;
+ }
+
+ @Override
+ public JWTAuthHandler withScope(String s) {
+ return this;
+ }
+
+ @Override
+ public JWTAuthHandler withScopes(List list) {
+ return this;
+ }
}
diff --git a/common/src/main/java/com/gentics/mesh/auth/provider/MeshJWTAuthProvider.java b/common/src/main/java/com/gentics/mesh/auth/provider/MeshJWTAuthProvider.java
index 32341cb96f..c882a544f5 100644
--- a/common/src/main/java/com/gentics/mesh/auth/provider/MeshJWTAuthProvider.java
+++ b/common/src/main/java/com/gentics/mesh/auth/provider/MeshJWTAuthProvider.java
@@ -8,6 +8,9 @@
import javax.inject.Inject;
import javax.inject.Singleton;
+import com.gentics.mesh.auth.handler.MeshJWTAuthHandler;
+import io.vertx.core.http.Cookie;
+import io.vertx.ext.auth.authentication.AuthenticationProvider;
import org.apache.commons.lang.NotImplementedException;
import org.apache.commons.lang3.StringUtils;
import org.springframework.security.crypto.bcrypt.BCryptPasswordEncoder;
@@ -36,7 +39,6 @@
import io.vertx.ext.auth.User;
import io.vertx.ext.auth.jwt.JWTAuth;
import io.vertx.ext.auth.jwt.JWTAuthOptions;
-import io.vertx.ext.web.Cookie;
/**
* Central mesh authentication provider which will handle JWT.
@@ -46,7 +48,7 @@
*
*/
@Singleton
-public class MeshJWTAuthProvider implements AuthProvider, JWTAuth {
+public class MeshJWTAuthProvider implements AuthenticationProvider, JWTAuth {
private static final Logger log = LoggerFactory.getLogger(MeshJWTAuthProvider.class);
@@ -131,6 +133,11 @@ public String generateToken(JsonObject claims, JWTOptions options) {
throw new NotImplementedException();
}
+ @Override
+ public String generateToken(JsonObject jsonObject) {
+ throw new NotImplementedException();
+ }
+
/**
* Authenticates the user and returns a JWToken if successful.
*
diff --git a/common/src/main/java/com/gentics/mesh/cache/impl/EventAwareCacheFactory.java b/common/src/main/java/com/gentics/mesh/cache/impl/EventAwareCacheFactory.java
index ea78e4a1f9..5dab415727 100644
--- a/common/src/main/java/com/gentics/mesh/cache/impl/EventAwareCacheFactory.java
+++ b/common/src/main/java/com/gentics/mesh/cache/impl/EventAwareCacheFactory.java
@@ -5,23 +5,21 @@
import com.gentics.mesh.cache.EventAwareCache;
import com.gentics.mesh.etc.config.MeshOptions;
+import com.gentics.mesh.event.EventBusStore;
import com.gentics.mesh.metric.MetricsService;
-import io.vertx.core.Vertx;
-
/**
* Factory for {@link EventAwareCache} instances.
*/
@Singleton
public class EventAwareCacheFactory {
-
- private final Vertx vertx;
+ private final EventBusStore eventBusStore;
private final MeshOptions meshOptions;
private final MetricsService metricsService;
@Inject
- public EventAwareCacheFactory(Vertx vertx, MeshOptions meshOptions, MetricsService metricsService) {
- this.vertx = vertx;
+ public EventAwareCacheFactory(EventBusStore eventBusStore, MeshOptions meshOptions, MetricsService metricsService) {
+ this.eventBusStore = eventBusStore;
this.meshOptions = meshOptions;
this.metricsService = metricsService;
}
@@ -35,7 +33,7 @@ public EventAwareCacheFactory(Vertx vertx, MeshOptions meshOptions, MetricsServi
*/
public EventAwareCacheImpl.Builder builder() {
return new EventAwareCacheImpl.Builder()
- .vertx(vertx)
+ .eventBusStore(eventBusStore)
.meshOptions(meshOptions)
.setMetricsService(metricsService);
}
diff --git a/common/src/main/java/com/gentics/mesh/cache/impl/EventAwareCacheImpl.java b/common/src/main/java/com/gentics/mesh/cache/impl/EventAwareCacheImpl.java
index 214e89f5f7..650d2d8dcc 100644
--- a/common/src/main/java/com/gentics/mesh/cache/impl/EventAwareCacheImpl.java
+++ b/common/src/main/java/com/gentics/mesh/cache/impl/EventAwareCacheImpl.java
@@ -11,6 +11,7 @@
import com.gentics.mesh.cache.EventAwareCache;
import com.gentics.mesh.core.rest.MeshEvent;
import com.gentics.mesh.etc.config.MeshOptions;
+import com.gentics.mesh.event.EventBusStore;
import com.gentics.mesh.metric.CachingMetric;
import com.gentics.mesh.metric.MetricsService;
import com.github.benmanes.caffeine.cache.Cache;
@@ -18,8 +19,8 @@
import io.micrometer.core.instrument.Counter;
import io.reactivex.Observable;
+import io.reactivex.disposables.Disposable;
import io.reactivex.functions.Predicate;
-import io.vertx.core.Vertx;
import io.vertx.core.eventbus.EventBus;
import io.vertx.core.eventbus.Message;
import io.vertx.core.eventbus.MessageConsumer;
@@ -36,8 +37,6 @@ public class EventAwareCacheImpl implements EventAwareCache {
private final Cache cache;
- private final Vertx vertx;
-
private final MeshOptions options;
private final Predicate> filter;
@@ -51,10 +50,11 @@ public class EventAwareCacheImpl implements EventAwareCache {
private final Counter missCounter;
private final Counter hitCounter;
- public EventAwareCacheImpl(String name, long maxSize, Duration expireAfter, Duration expireAfterAccess, Vertx vertx, MeshOptions options, MetricsService metricsService,
+ private Disposable eventSubscription;
+
+ public EventAwareCacheImpl(String name, long maxSize, Duration expireAfter, Duration expireAfterAccess, EventBusStore eventBusStore, MeshOptions options, MetricsService metricsService,
Predicate> filter,
BiConsumer, EventAwareCache> onNext, MeshEvent... events) {
- this.vertx = vertx;
this.options = options;
Caffeine
+
- io.vertx
- vertx-hazelcast
+ com.gentics.mesh
+ hazelcast3-cluster-manager
@@ -145,8 +146,7 @@
-
-
+
com.hazelcasthazelcast-kubernetes
diff --git a/core/src/main/java/com/gentics/mesh/auth/MeshBasicAuthLoginHandler.java b/core/src/main/java/com/gentics/mesh/auth/MeshBasicAuthLoginHandler.java
index c657d00172..b1401ca022 100644
--- a/core/src/main/java/com/gentics/mesh/auth/MeshBasicAuthLoginHandler.java
+++ b/core/src/main/java/com/gentics/mesh/auth/MeshBasicAuthLoginHandler.java
@@ -7,11 +7,9 @@
import io.vertx.core.Handler;
import io.vertx.core.http.HttpHeaders;
import io.vertx.core.http.HttpServerRequest;
-import io.vertx.core.json.JsonObject;
import io.vertx.ext.auth.User;
import io.vertx.ext.web.RoutingContext;
-import io.vertx.ext.web.handler.AuthHandler;
-import io.vertx.ext.web.handler.impl.AuthHandlerImpl;
+import io.vertx.ext.web.handler.impl.AuthenticationHandlerImpl;
import javax.inject.Inject;
import javax.inject.Singleton;
@@ -23,7 +21,7 @@
* The {@link #handle(RoutingContext)} method is overriden in order to support the {@link MeshJWTAuthProvider}.
*/
@Singleton
-public class MeshBasicAuthLoginHandler extends AuthHandlerImpl {
+public class MeshBasicAuthLoginHandler extends AuthenticationHandlerImpl {
final String realm;
@@ -37,18 +35,12 @@ public MeshBasicAuthLoginHandler(MeshJWTAuthProvider authProvider) {
}
private void authorizeUser(RoutingContext ctx, User user) {
- authorize(user, authZ -> {
- if (authZ.failed()) {
- ctx.fail(authZ.cause());
- return;
- }
- // success, allowed to continue
- ctx.next();
- });
+ // authorization is done with roles
+ ctx.next();
}
@Override
- public void parseCredentials(RoutingContext context, Handler> handler) {
+ public void authenticate(RoutingContext routingContext, Handler> handler) {
// Not needed
}
diff --git a/core/src/main/java/com/gentics/mesh/cache/PermissionCacheImpl.java b/core/src/main/java/com/gentics/mesh/cache/PermissionCacheImpl.java
index aaae711588..1ce9b560e4 100644
--- a/core/src/main/java/com/gentics/mesh/cache/PermissionCacheImpl.java
+++ b/core/src/main/java/com/gentics/mesh/cache/PermissionCacheImpl.java
@@ -18,7 +18,7 @@
import com.gentics.mesh.core.rest.MeshEvent;
import com.gentics.mesh.etc.config.MeshOptions;
-import io.vertx.core.Vertx;
+import com.gentics.mesh.event.EventBusStore;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
@@ -30,7 +30,7 @@ public class PermissionCacheImpl extends AbstractMeshCache, EnumSet> uniqueMap = Collections.synchronizedMap(new HashMap<>());
@Inject
- public PermissionCacheImpl(EventAwareCacheFactory factory, Vertx vertx, CacheRegistry registry, MeshOptions options) {
+ public PermissionCacheImpl(EventAwareCacheFactory factory, EventBusStore eventBusStore, CacheRegistry registry, MeshOptions options) {
super(createCache(factory), registry, CACHE_SIZE);
- this.vertx = vertx;
+ this.eventBusStore = eventBusStore;
this.options = options;
}
@@ -104,9 +104,9 @@ private String createCacheKey(Object userId, Object elementId) {
public void clear(boolean notify) {
// Invalidate locally
cache.invalidate();
- if (notify && options.getClusterOptions().isEnabled()) {
+ if (notify && options.getClusterOptions().isEnabled() && eventBusStore.current() != null) {
// Send the event to inform other to purge the stored permissions
- vertx.eventBus().publish(CLEAR_PERMISSION_STORE.address, null);
+ eventBusStore.current().publish(CLEAR_PERMISSION_STORE.address, null);
// log.error("Can't distribute cache clear event. Maybe Vert.x is stopping / starting right now");
}
}
diff --git a/core/src/main/java/com/gentics/mesh/cli/AbstractBootstrapInitializer.java b/core/src/main/java/com/gentics/mesh/cli/AbstractBootstrapInitializer.java
index 4251600e9a..36a7362a84 100644
--- a/core/src/main/java/com/gentics/mesh/cli/AbstractBootstrapInitializer.java
+++ b/core/src/main/java/com/gentics/mesh/cli/AbstractBootstrapInitializer.java
@@ -19,6 +19,7 @@
import javax.inject.Inject;
+import com.gentics.mesh.event.EventBusStore;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.security.crypto.bcrypt.BCryptPasswordEncoder;
@@ -163,6 +164,9 @@ public abstract class AbstractBootstrapInitializer implements BootstrapInitializ
@Inject
public EventBusLivenessManager eventbusLiveness;
+ @Inject
+ public EventBusStore eventBusStore;
+
// TODO: Changing the role name or deleting the role would cause code that utilizes this field to break.
// This is however a rare case.
protected HibRole anonymousRole;
@@ -535,7 +539,6 @@ protected String getLocalIpForRoutedRemoteIP(String destination) {
*/
public void initVertx(MeshOptions options) {
VertxOptions vertxOptions = new VertxOptions();
- vertxOptions.getEventBusOptions().setClustered(options.getClusterOptions().isEnabled());
vertxOptions.setWorkerPoolSize(options.getVertxOptions().getWorkerPoolSize());
vertxOptions.setEventLoopPoolSize(options.getVertxOptions().getEventPoolSize());
@@ -549,7 +552,7 @@ public void initVertx(MeshOptions options) {
vertxOptions.setPreferNativeTransport(true);
System.setProperty("vertx.cacheDirBase", options.getTempDirectory());
Vertx vertx = null;
- if (vertxOptions.getEventBusOptions().isClustered()) {
+ if (options.getClusterOptions().isEnabled()) {
log.info("Creating clustered Vert.x instance");
vertx = createClusteredVertx(options, vertxOptions);
} else {
@@ -563,6 +566,7 @@ public void initVertx(MeshOptions options) {
}
this.vertx = vertx;
+ this.eventBusStore.setEventBus(vertx.eventBus());
}
/**
@@ -993,7 +997,7 @@ protected Vertx createClusteredVertx(MeshOptions options, VertxOptions vertxOpti
try {
return fut.get(getClusteredVertxInitializationTimeoutInSeconds(), SECONDS);
} catch (Exception e) {
- throw new RuntimeException("Error while creating clusterd Vert.x instance");
+ throw new RuntimeException("Error while creating clusterd Vert.x instance", e);
}
}
diff --git a/core/src/main/java/com/gentics/mesh/cli/MeshImpl.java b/core/src/main/java/com/gentics/mesh/cli/MeshImpl.java
index fa4de8fbdb..226847f018 100644
--- a/core/src/main/java/com/gentics/mesh/cli/MeshImpl.java
+++ b/core/src/main/java/com/gentics/mesh/cli/MeshImpl.java
@@ -15,6 +15,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import io.vertx.core.http.*;
import org.apache.commons.lang3.StringUtils;
import com.gentics.mesh.Mesh;
@@ -29,8 +30,6 @@
import io.reactivex.Completable;
import io.vertx.core.MultiMap;
import io.vertx.core.Vertx;
-import io.vertx.core.http.HttpClientOptions;
-import io.vertx.core.http.HttpClientRequest;
import io.vertx.core.impl.launcher.commands.VersionCommand;
import io.vertx.core.json.JsonObject;
import io.vertx.core.logging.Logger;
@@ -205,45 +204,59 @@ private void checkSystemRequirements() {
public void invokeUpdateCheck() {
String currentVersion = Mesh.getPlainVersion();
log.info("Checking for updates..");
- HttpClientRequest request = getVertx().createHttpClient(new HttpClientOptions().setSsl(true).setTrustAll(false)).get(443, "getmesh.io",
- "/api/updatecheck?v=" + Mesh.getPlainVersion(), rh -> {
- int code = rh.statusCode();
- if (code < 200 || code >= 299) {
- log.error("Update check failed with status code {" + code + "}");
- } else {
- rh.bodyHandler(bh -> {
- JsonObject info = bh.toJsonObject();
- String latestVersion = info.getString("latest");
-
- if (currentVersion.contains("-SNAPSHOT")) {
- log.warn("You are using a SNAPSHOT version {" + currentVersion
- + "}. This is potentially dangerous because this version has never been officially released.");
- log.info("The latest version of Gentics Mesh is {" + latestVersion + "}");
- } else {
- int result = VersionUtil.compareVersions(latestVersion, currentVersion);
- if (result == 0) {
- log.info("Great! You are using the latest version");
- } else if (result > 0) {
- log.warn("Your Gentics Mesh version is outdated. You are using {" + currentVersion + "} but version {"
- + latestVersion + "} is available.");
- }
- }
- });
- }
- });
- MultiMap headers = request.headers();
- headers.set("content-type", "application/json");
- String hostname = getHostname();
- if (!isEmpty(hostname)) {
- headers.set("X-Hostname", hostname);
- }
- request.exceptionHandler(err -> {
- log.info("Failed to check for updates.");
- log.debug("Reason for failed update check", err);
- });
- request.end();
+ RequestOptions requestOptions = new RequestOptions();
+ requestOptions.setMethod(HttpMethod.GET);
+ requestOptions.setSsl(true);
+ requestOptions.setHost("getmesh.io/api/updatecheck?v=" + Mesh.getPlainVersion());
+ getVertx().createHttpClient(new HttpClientOptions().setSsl(true).setTrustAll(false))
+ .request(HttpMethod.GET, 443, "getmesh.io", "/api/updatecheck?v=" + Mesh.getPlainVersion(), ar -> {
+ if (ar.succeeded()) {
+ HttpClientRequest req = ar.result();
+
+ MultiMap headers = req.headers();
+ headers.set("content-type", "application/json");
+ String hostname = getHostname();
+ if (!isEmpty(hostname)) {
+ headers.set("X-Hostname", hostname);
+ }
+ req.send(ar2 -> {
+ if (ar2.succeeded()) {
+ HttpClientResponse response = ar2.result();
+ int code = response.statusCode();
+ if (code < 200 || code >= 299) {
+ log.error("Update check failed with status code {" + code + "}");
+ } else {
+ response.bodyHandler(bh -> {
+ JsonObject info = bh.toJsonObject();
+ String latestVersion = info.getString("latest");
+
+ if (currentVersion.contains("-SNAPSHOT")) {
+ log.warn("You are using a SNAPSHOT version {" + currentVersion
+ + "}. This is potentially dangerous because this version has never been officially released.");
+ log.info("The latest version of Gentics Mesh is {" + latestVersion + "}");
+ } else {
+ int result = VersionUtil.compareVersions(latestVersion, currentVersion);
+ if (result == 0) {
+ log.info("Great! You are using the latest version");
+ } else if (result > 0) {
+ log.warn("Your Gentics Mesh version is outdated. You are using {" + currentVersion + "} but version {"
+ + latestVersion + "} is available.");
+ }
+ }
+ });
+ }
+ } else {
+ log.info("Failed to check for updates.");
+ log.debug("Reason for failed update check", ar2.cause());
+ }
+ });
+ } else {
+ log.info("Failed to check for updates.");
+ log.debug("Reason for failed update check", ar.cause());
+ }
+ });
}
/**
diff --git a/core/src/main/java/com/gentics/mesh/core/data/impl/MeshAuthUserImpl.java b/core/src/main/java/com/gentics/mesh/core/data/impl/MeshAuthUserImpl.java
index c2b0983e14..e95442547a 100644
--- a/core/src/main/java/com/gentics/mesh/core/data/impl/MeshAuthUserImpl.java
+++ b/core/src/main/java/com/gentics/mesh/core/data/impl/MeshAuthUserImpl.java
@@ -2,6 +2,7 @@
import java.util.Objects;
+import io.vertx.ext.auth.authorization.Authorization;
import org.apache.commons.lang.NotImplementedException;
import com.gentics.mesh.core.data.dao.UserDao;
@@ -97,7 +98,17 @@ public void setAuthProvider(AuthProvider authProvider) {
}
@Override
- public User isAuthorized(String authority, Handler> resultHandler) {
+ public User merge(User user) {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public JsonObject attributes() {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public User isAuthorized(Authorization authorization, Handler> handler) {
throw new NotImplementedException("Please use the MeshAuthUserImpl method instead.");
}
diff --git a/core/src/main/java/com/gentics/mesh/core/endpoint/eventbus/EventbusEndpoint.java b/core/src/main/java/com/gentics/mesh/core/endpoint/eventbus/EventbusEndpoint.java
index 307cb00997..6d1d7021f2 100644
--- a/core/src/main/java/com/gentics/mesh/core/endpoint/eventbus/EventbusEndpoint.java
+++ b/core/src/main/java/com/gentics/mesh/core/endpoint/eventbus/EventbusEndpoint.java
@@ -13,6 +13,7 @@
import io.vertx.ext.auth.User;
import io.vertx.ext.bridge.BridgeEventType;
import io.vertx.ext.bridge.PermittedOptions;
+import io.vertx.ext.web.Router;
import io.vertx.ext.web.handler.sockjs.SockJSBridgeOptions;
import io.vertx.ext.web.handler.sockjs.SockJSHandler;
import io.vertx.ext.web.handler.sockjs.SockJSHandlerOptions;
@@ -47,13 +48,17 @@ public void registerEndPoints() {
}
private void addEventBusHandler() {
- SockJSHandler handler = null;
- if (localRouter != null) {
+ secureAll();
+ InternalEndpointRoute endpoint = createRoute();
+ endpoint.setRAMLPath("/");
+ endpoint.description("This endpoint provides a sockjs compliant websocket which can be used to interface with the vert.x eventbus.");
+
+ if (!isRamlGeneratorContext()) {
SockJSHandlerOptions sockJSoptions = new SockJSHandlerOptions().setHeartbeatInterval(2000);
- handler = SockJSHandler.create(vertx, sockJSoptions);
+ SockJSHandler handler = SockJSHandler.create(vertx, sockJSoptions);
SockJSBridgeOptions bridgeOptions = new SockJSBridgeOptions();
for (MeshEvent event : MeshEvent.publicEvents()) {
- // TODO ensure that clients can't fire internal mesh events.
+ // TODO ensure that clients can't fire internal mesh events.
bridgeOptions.addInboundPermitted(new PermittedOptions().setAddress(event.address));
bridgeOptions.addOutboundPermitted(new PermittedOptions().setAddress(event.address));
}
@@ -61,7 +66,7 @@ private void addEventBusHandler() {
bridgeOptions.addInboundPermitted(new PermittedOptions().setAddressRegex("custom.*"));
bridgeOptions.addOutboundPermitted(new PermittedOptions().setAddressRegex("custom.*"));
- handler.bridge(bridgeOptions, event -> {
+ Router brigdeRoute = handler.bridge(bridgeOptions, event -> {
if (log.isDebugEnabled()) {
if (event.type() == BridgeEventType.SOCKET_CREATED) {
// TODO maybe it would be useful to send a reply to the user.
@@ -76,14 +81,16 @@ private void addEventBusHandler() {
log.debug("Eventbridge creation. User was authenticated: " + isAuthenticated);
event.complete(isAuthenticated);
});
- }
-
- secureAll();
- InternalEndpointRoute endpoint = createRoute();
- endpoint.setRAMLPath("/");
- endpoint.description("This endpoint provides a sockjs complient websocket which can be used to interface with the vert.x eventbus.");
- endpoint.path("/*").handler(handler);
+ endpoint.path("/*").subRouter(brigdeRoute);
+ }
}
+ /**
+ * Returns whether the method is called from during the documentation generation context.
+ * @return
+ */
+ private boolean isRamlGeneratorContext() {
+ return localRouter == null;
+ }
}
diff --git a/core/src/main/java/com/gentics/mesh/core/endpoint/node/BinaryUploadHandlerImpl.java b/core/src/main/java/com/gentics/mesh/core/endpoint/node/BinaryUploadHandlerImpl.java
index a9df72a7ec..c346422591 100644
--- a/core/src/main/java/com/gentics/mesh/core/endpoint/node/BinaryUploadHandlerImpl.java
+++ b/core/src/main/java/com/gentics/mesh/core/endpoint/node/BinaryUploadHandlerImpl.java
@@ -162,7 +162,7 @@ public void handleUpdateField(InternalActionContext ac, String nodeUuid, String
throw error(BAD_REQUEST, "upload_error_no_version");
}
- Set fileUploads = ac.getFileUploads();
+ List fileUploads = ac.getFileUploads();
if (fileUploads.isEmpty()) {
throw error(BAD_REQUEST, "node_error_no_binarydata_found");
}
diff --git a/core/src/main/java/com/gentics/mesh/core/endpoint/node/S3BinaryMetadataExtractionHandlerImpl.java b/core/src/main/java/com/gentics/mesh/core/endpoint/node/S3BinaryMetadataExtractionHandlerImpl.java
index ace5cf888d..0a3eb1efcf 100644
--- a/core/src/main/java/com/gentics/mesh/core/endpoint/node/S3BinaryMetadataExtractionHandlerImpl.java
+++ b/core/src/main/java/com/gentics/mesh/core/endpoint/node/S3BinaryMetadataExtractionHandlerImpl.java
@@ -164,6 +164,11 @@ public String charSet() {
// TODO Auto-generated method stub
return "UTF-8";
}
+
+ @Override
+ public boolean cancel() {
+ return false;
+ }
};
ctx.setFileUpload(fileUpload);
ctx.setS3ObjectKey(nodeUuid + "/" + fieldName);
diff --git a/core/src/main/java/com/gentics/mesh/core/endpoint/user/UserTokenAuthHandler.java b/core/src/main/java/com/gentics/mesh/core/endpoint/user/UserTokenAuthHandler.java
index 29e319d41a..f98f27c389 100644
--- a/core/src/main/java/com/gentics/mesh/core/endpoint/user/UserTokenAuthHandler.java
+++ b/core/src/main/java/com/gentics/mesh/core/endpoint/user/UserTokenAuthHandler.java
@@ -21,9 +21,9 @@
import io.vertx.core.AsyncResult;
import io.vertx.core.Handler;
-import io.vertx.core.json.JsonObject;
+import io.vertx.ext.auth.User;
import io.vertx.ext.web.RoutingContext;
-import io.vertx.ext.web.handler.impl.AuthHandlerImpl;
+import io.vertx.ext.web.handler.impl.AuthenticationHandlerImpl;
/**
* The user token authentication handler grants access to routes by validating the provides token query parameter value.
@@ -33,7 +33,7 @@
* this handler fails the {@link MeshJWTAuthHandler} should try to extract the JWT token from the cookie and load the correct user.
*/
@Singleton
-public class UserTokenAuthHandler extends AuthHandlerImpl {
+public class UserTokenAuthHandler extends AuthenticationHandlerImpl {
public static final int DEFAULT_MAX_TOKEN_AGE_IN_MINS = 30;
private Database db;
@@ -45,7 +45,7 @@ public UserTokenAuthHandler(MeshJWTAuthProvider authProvider, Database db) {
}
@Override
- public void parseCredentials(RoutingContext context, Handler> handler) {
+ public void authenticate(RoutingContext routingContext, Handler> handler) {
// Not needed
}
diff --git a/core/src/main/java/com/gentics/mesh/rest/MeshLocalClientImpl.java b/core/src/main/java/com/gentics/mesh/rest/MeshLocalClientImpl.java
index bc623d54d3..2702272a10 100644
--- a/core/src/main/java/com/gentics/mesh/rest/MeshLocalClientImpl.java
+++ b/core/src/main/java/com/gentics/mesh/rest/MeshLocalClientImpl.java
@@ -1211,6 +1211,11 @@ public String charSet() {
// TODO Auto-generated method stub
return null;
}
+
+ @Override
+ public boolean cancel() {
+ return false;
+ }
});
fieldAPIHandler.handleUpdateField(ac, nodeUuid, fieldKey, attributes);
diff --git a/core/src/main/java/com/gentics/mesh/rest/MeshLocalRequestImpl.java b/core/src/main/java/com/gentics/mesh/rest/MeshLocalRequestImpl.java
index a55091b0ec..932a532e8b 100644
--- a/core/src/main/java/com/gentics/mesh/rest/MeshLocalRequestImpl.java
+++ b/core/src/main/java/com/gentics/mesh/rest/MeshLocalRequestImpl.java
@@ -3,6 +3,7 @@
import java.util.List;
import java.util.Map;
+import io.vertx.reactivex.SingleHelper;
import org.apache.commons.lang.NotImplementedException;
import com.gentics.mesh.rest.client.MeshRequest;
@@ -28,7 +29,7 @@ public MeshLocalRequestImpl(Future future) {
@Override
public Single toSingle() {
- return new io.vertx.reactivex.core.Future(future).rxOnComplete();
+ return SingleHelper.toSingle(future::onComplete);
}
@Override
diff --git a/databases/orientdb/pom.xml b/databases/orientdb/pom.xml
index 0b1bf29732..aa85cc4d81 100644
--- a/databases/orientdb/pom.xml
+++ b/databases/orientdb/pom.xml
@@ -49,8 +49,8 @@
test
- io.vertx
- vertx-hazelcast
+ com.gentics.mesh
+ hazelcast3-cluster-manager
diff --git a/databases/orientdb/src/main/java/com/syncleus/ferma/ext/orientdb3/OrientDBTx.java b/databases/orientdb/src/main/java/com/syncleus/ferma/ext/orientdb3/OrientDBTx.java
index 1055e2191f..af8a94c9ae 100644
--- a/databases/orientdb/src/main/java/com/syncleus/ferma/ext/orientdb3/OrientDBTx.java
+++ b/databases/orientdb/src/main/java/com/syncleus/ferma/ext/orientdb3/OrientDBTx.java
@@ -5,6 +5,7 @@
import javax.inject.Inject;
+import dagger.Lazy;
import org.springframework.security.crypto.password.PasswordEncoder;
import com.gentics.mesh.Mesh;
@@ -93,7 +94,11 @@ public class OrientDBTx extends AbstractTx {
private final CommonTxData txData;
private final ContextDataRegistry contextDataRegistry;
private final OrientDBDaoCollection daos;
- private final CacheCollection caches;
+ /**
+ * We provide a lazy instance, otherwise we risk prematurely subscribing to the event bus in certain bootstrapping
+ * scenarios (mesh clustered + init cluster flag set to true)
+ */
+ private final Lazy caches;
private final SecurityUtils security;
private final Binaries binaries;
private final S3Binaries s3binaries;
@@ -102,9 +107,9 @@ public class OrientDBTx extends AbstractTx {
@Inject
public OrientDBTx(OrientDBMeshOptions options, Database db, OrientDBBootstrapInitializer boot,
- OrientDBDaoCollection daos, CacheCollection caches, SecurityUtils security, OrientStorage provider,
- TypeResolver typeResolver, MetricsService metrics, PermissionRoots permissionRoots,
- ContextDataRegistry contextDataRegistry, S3Binaries s3binaries, Binaries binaries, CommonTxData txData) {
+ OrientDBDaoCollection daos, Lazy caches, SecurityUtils security, OrientStorage provider,
+ TypeResolver typeResolver, MetricsService metrics, PermissionRoots permissionRoots,
+ ContextDataRegistry contextDataRegistry, S3Binaries s3binaries, Binaries binaries, CommonTxData txData) {
this.db = db;
this.boot = boot;
this.typeResolver = typeResolver;
@@ -324,7 +329,7 @@ public S3Binaries s3binaries() {
@Override
public PermissionCache permissionCache() {
- return caches.permissionCache();
+ return caches.get().permissionCache();
}
@Override
diff --git a/distributed-coordinator/pom.xml b/distributed-coordinator/pom.xml
index 8cc4442e10..706dc7bcd5 100644
--- a/distributed-coordinator/pom.xml
+++ b/distributed-coordinator/pom.xml
@@ -38,6 +38,11 @@
io.vertxvertx-web
+
+ io.vertx
+ vertx-http-proxy
+ ${vertx.version}
+ com.hazelcasthazelcast
diff --git a/distributed-coordinator/src/main/java/com/gentics/mesh/distributed/coordinator/MasterElector.java b/distributed-coordinator/src/main/java/com/gentics/mesh/distributed/coordinator/MasterElector.java
index c55781341a..a462a2fb1d 100644
--- a/distributed-coordinator/src/main/java/com/gentics/mesh/distributed/coordinator/MasterElector.java
+++ b/distributed-coordinator/src/main/java/com/gentics/mesh/distributed/coordinator/MasterElector.java
@@ -106,7 +106,7 @@ public void start() {
/**
* Check whether the instance that runs this code is the elected master.
- *
+ *
* @return
*/
public boolean isMaster() {
@@ -134,7 +134,7 @@ public void setMaster() {
/**
* Each instance in the cluster will call the elect master method when the structure of the cluster changes. The master election runs in a locked manner and
* is terminated as soon as one node in the cluster got elected.
- *
+ *
* @return Elected member
*/
private void electMaster() {
@@ -145,8 +145,8 @@ private void electMaster() {
try {
log.info("Locked for master election");
Optional foundMaster = cluster.getMembers().stream()
- .filter(m -> isMaster(m))
- .findFirst();
+ .filter(m -> isMaster(m))
+ .findFirst();
boolean hasMaster = foundMaster.isPresent();
boolean isElectible = isElectable(localMember());
if (!hasMaster && isElectible) {
@@ -156,8 +156,8 @@ private void electMaster() {
localMember().setBooleanAttribute(MASTER, true);
log.info("Cluster node was elected as new master");
} else if (cluster.getMembers().stream()
- .filter(m -> isMaster(m))
- .count() > 1) {
+ .filter(m -> isMaster(m))
+ .count() > 1) {
log.info("Detected multiple masters in the cluster, giving up the master flag");
giveUpMasterFlag();
}
@@ -169,7 +169,7 @@ private void electMaster() {
/**
* Check whether the member is allowed to be elected as master
- *
+ *
* @param m
* @return
*/
@@ -236,16 +236,16 @@ public void memberAdded(MembershipEvent membershipEvent) {
hazelcast.get().getLifecycleService().addLifecycleListener(event -> {
log.info(String.format("Lifecycle state changed to %s", event.getState()));
switch (event.getState()) {
- case MERGING:
- merging = true;
- break;
- case MERGED:
- // when the instance merged into a cluster, we need to elect a new master (to avoid multimaster situations)
- merging = false;
- electMaster();
- break;
- default:
- break;
+ case MERGING:
+ merging = true;
+ break;
+ case MERGED:
+ // when the instance merged into a cluster, we need to elect a new master (to avoid multimaster situations)
+ merging = false;
+ electMaster();
+ break;
+ default:
+ break;
}
});
@@ -273,8 +273,8 @@ public void memberAdded(MembershipEvent membershipEvent) {
protected void findCurrentMaster() {
Cluster cluster = hazelcast.get().getCluster();
Optional master = cluster.getMembers().stream()
- .filter(m -> isMaster(m))
- .findFirst();
+ .filter(m -> isMaster(m))
+ .findFirst();
if (master.isPresent()) {
masterMember = master.get();
log.info("Updated master member {" + masterMember.getStringAttribute(MESH_NODE_NAME_ATTR) + "}");
@@ -296,7 +296,7 @@ private void giveUpMasterFlag() {
/**
* Check whether the given member currently is the master
- *
+ *
* @param member
* member
* @return true for the master
@@ -307,7 +307,7 @@ private static boolean isMaster(Member member) {
/**
* Check whether the given instance is the local instance.
- *
+ *
* @param member
* @return
*/
@@ -317,7 +317,7 @@ public boolean isLocal(Member member) {
/**
* Return the hazelcast member for this instance.
- *
+ *
* @return
*/
public Member localMember() {
@@ -348,7 +348,7 @@ public MasterServer getMasterMember() {
/**
* Let the handler accept the object from the given message, if the message was not published from the local node
- *
+ *
* @param msg
* message
* @param handler
@@ -363,7 +363,7 @@ private void executeIfNotFromLocal(Message msg, Consumer handler) {
/**
* Let the handler accept the object from the given message, if the message was published from the local node
- *
+ *
* @param msg
* message
* @param handler
@@ -378,7 +378,7 @@ private void executeIfFromLocal(Message msg, Consumer handler) {
/**
* Check whether the local member is electable.
- *
+ *
* @return
*/
public boolean isElectable() {
diff --git a/distributed-coordinator/src/main/java/com/gentics/mesh/distributed/coordinator/proxy/ClusterEnabledRequestDelegatorImpl.java b/distributed-coordinator/src/main/java/com/gentics/mesh/distributed/coordinator/proxy/ClusterEnabledRequestDelegatorImpl.java
index 50e923345f..ebd3274862 100644
--- a/distributed-coordinator/src/main/java/com/gentics/mesh/distributed/coordinator/proxy/ClusterEnabledRequestDelegatorImpl.java
+++ b/distributed-coordinator/src/main/java/com/gentics/mesh/distributed/coordinator/proxy/ClusterEnabledRequestDelegatorImpl.java
@@ -26,8 +26,8 @@
import io.vertx.core.http.HttpServerResponse;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
-import io.vertx.core.streams.Pump;
import io.vertx.ext.web.RoutingContext;
+import io.vertx.httpproxy.ProxyRequest;
/**
* @see RequestDelegator
@@ -133,44 +133,31 @@ public boolean canWrite() {
@Override
public void redirectToMaster(RoutingContext rc) {
HttpServerRequest request = rc.request();
- String requestURI = request.uri();
- HttpMethod method = request.method();
- HttpServerResponse response = rc.response();
MasterServer master = coordinator.getMasterMember();
String host = master.getHost();
int port = master.getPort();
-
if (log.isDebugEnabled()) {
log.debug("Forwarding request to master {" + master.toString() + "}");
}
- @SuppressWarnings("deprecation")
- HttpClientRequest forwardRequest = httpClient.request(method, port, host, requestURI, forwardResponse -> {
- response.setChunked(true);
- response.setStatusCode(forwardResponse.statusCode());
- response.putHeader(MESH_FORWARDED_FROM_HEADER, master.getName());
- forwardHeaders(response, forwardResponse);
- printHeaders("Forward response headers", response.headers());
- Pump.pump(forwardResponse, response)
- .setWriteQueueMaxSize(8192)
- .start();
- forwardResponse.endHandler(v -> response.end());
- });
-
- forwardHeaders(request, forwardRequest);
- forwardRequest.putHeader(MESH_DIRECT_HEADER, "true");
- forwardRequest.setChunked(true);
-
- if (request.isEnded()) {
- log.warn("Request to be proxied is already read");
- proxyEndHandler(forwardRequest, rc.getBody());
- } else {
- request.exceptionHandler(e -> log.error("Could not forward request to Mesh: {}", e, e.getMessage()))
- .endHandler(v -> proxyEndHandler(forwardRequest, null));
- Pump.pump(request, forwardRequest)
- .setWriteQueueMaxSize(8192)
- .start();
- }
+ ProxyRequest proxyRequest = ProxyRequest.reverseProxy(request);
+ proxyRequest.putHeader(MESH_DIRECT_HEADER, "true");
+
+ httpClient.request(proxyRequest.getMethod(), port, host, proxyRequest.getURI())
+ .compose(proxyRequest::send)
+ .onSuccess(proxyResponse -> {
+ // Send the proxy response
+ proxyResponse.putHeader(MESH_FORWARDED_FROM_HEADER, master.getName());
+ proxyResponse.send();
+ })
+ .onFailure(err -> {
+ // Release the request
+ proxyRequest.release();
+
+ // Send error
+ request.response().setStatusCode(500)
+ .send();
+ });
}
@Override
diff --git a/hazelcast3-cluster-manager/README.md b/hazelcast3-cluster-manager/README.md
new file mode 100644
index 0000000000..35eb30fd27
--- /dev/null
+++ b/hazelcast3-cluster-manager/README.md
@@ -0,0 +1,6 @@
+# Hazelcast 3 Cluster Manager
+
+Fork of [Vert.x Hazelcast](https://github.com/vert-x3/vertx-hazelcast) implementing Vert.x 4 Cluster SPI using Hazelcast 3.
+
+This was created since Vert.x Hazelcast cluster manager supports only Hazelcast 4, and OrientDB doesn't support
+Hazelcast 4 at the moment of writing.
diff --git a/hazelcast3-cluster-manager/pom.xml b/hazelcast3-cluster-manager/pom.xml
new file mode 100644
index 0000000000..8fa4101d07
--- /dev/null
+++ b/hazelcast3-cluster-manager/pom.xml
@@ -0,0 +1,243 @@
+
+
+ 4.0.0
+
+ mesh
+ com.gentics.mesh
+ 1.10.0-SNAPSHOT
+
+
+ hazelcast3-cluster-manager
+ Vert.x Hazelcast Cluster Manager
+
+
+ 4.3.2
+ 3.12.2
+ 1.5.1
+ ${skip.vertx-hazelcast}
+ ${project.basedir}/src/main/resources/META-INF/MANIFEST.MF
+
+
+
+
+
+ io.vertx
+ vertx-dependencies
+ ${stack.version}
+ pom
+ import
+
+
+
+
+
+
+
+ io.vertx
+ vertx-core
+
+
+ io.vertx
+ vertx-health-check
+ true
+
+
+ io.vertx
+ vertx-web
+ true
+
+
+ com.hazelcast
+ hazelcast
+ ${hazelcast.version}
+
+
+
+ io.vertx
+ vertx-docgen
+ provided
+
+
+ io.vertx
+ vertx-codegen
+ provided
+
+
+
+ junit
+ junit
+ 4.12
+ test
+
+
+ io.vertx
+ vertx-core
+ test-jar
+ test
+
+
+ io.vertx
+ vertx-web
+ test-jar
+ test
+
+
+ io.vertx
+ vertx-service-discovery
+ test
+
+
+ io.vertx
+ vertx-service-proxy
+ test
+
+
+ io.vertx
+ vertx-service-discovery
+ test-jar
+ test
+
+
+ org.assertj
+ assertj-core
+ 3.3.0
+ test
+
+
+ com.jayway.awaitility
+ awaitility
+ 1.7.0
+ test
+
+
+ com.hazelcast
+ hazelcast-client
+ ${hazelcast.version}
+ test
+
+
+ ch.qos.logback
+ logback-classic
+ 1.1.7
+ test
+
+
+
+
+
+
+
+ org.bsc.maven
+ maven-processor-plugin
+
+
+ generate-sources
+
+
+ ${hazelcast.version}
+
+
+
+
+
+
+ maven-surefire-plugin
+
+ false
+
+ PARANOID
+ ${project.build.directory}
+ ${project.version}
+ true
+ io.vertx.core.logging.SLF4JLogDelegateFactory
+ slf4j
+
+
+ -Xmx1200M
+ 1
+ true
+
+
+
+ maven-failsafe-plugin
+ 2.19.1
+
+ false
+
+ PARANOID
+ ${project.build.directory}
+ ${project.version}
+ true
+ io.vertx.core.logging.SLF4JLogDelegateFactory
+ slf4j
+
+
+ -Xmx1200M
+ 1
+ true
+
+
+
+
+
+
+ maven-surefire-plugin
+
+ ${skipUnitTests}
+
+ **/it/**/*Test.java
+
+
+
+
+ maven-failsafe-plugin
+
+
+ lite-members
+
+ integration-test
+ verify
+
+ integration-test
+
+
+ **/it/litemembers/*Test.java
+
+
+
+
+
+
+
+
+
+
+ coverage
+
+
+
+
+ maven-surefire-plugin
+
+ false
+
+ PARANOID
+ ${project.build.directory}
+ ${project.version}
+ true
+
+
+
+ -Xmx1200M
+ 1
+ true
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/hazelcast3-cluster-manager/src/main/java/examples/Examples.java b/hazelcast3-cluster-manager/src/main/java/examples/Examples.java
new file mode 100644
index 0000000000..4911d7a06f
--- /dev/null
+++ b/hazelcast3-cluster-manager/src/main/java/examples/Examples.java
@@ -0,0 +1,130 @@
+/*
+ * Copyright 2014 Red Hat, Inc.
+ *
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * and Apache License v2.0 which accompanies this distribution.
+ *
+ * The Eclipse Public License is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * The Apache License v2.0 is available at
+ * http://www.opensource.org/licenses/apache2.0.php
+ *
+ * You may elect to redistribute this code under either of these licenses.
+ */
+
+package examples;
+
+import com.hazelcast.config.Config;
+import com.hazelcast.core.HazelcastInstance;
+import io.vertx.core.Handler;
+import io.vertx.core.Promise;
+import io.vertx.core.Vertx;
+import io.vertx.core.VertxOptions;
+import io.vertx.core.spi.cluster.ClusterManager;
+import io.vertx.ext.healthchecks.HealthCheckHandler;
+import io.vertx.ext.healthchecks.HealthChecks;
+import io.vertx.ext.healthchecks.Status;
+import io.vertx.ext.web.Router;
+import io.vertx.spi.cluster.hazelcast.ClusterHealthCheck;
+import io.vertx.spi.cluster.hazelcast.ConfigUtil;
+import io.vertx.spi.cluster.hazelcast.HazelcastClusterManager;
+
+/**
+ * @author Tim Fox
+ */
+public class Examples {
+
+ public void example1() {
+
+ ClusterManager mgr = new HazelcastClusterManager();
+
+ VertxOptions options = new VertxOptions().setClusterManager(mgr);
+
+ Vertx.clusteredVertx(options, res -> {
+ if (res.succeeded()) {
+ Vertx vertx = res.result();
+ } else {
+ // failed!
+ }
+ });
+ }
+
+ public void example2() {
+
+ Config hazelcastConfig = new Config();
+
+ // Now set some stuff on the config (omitted)
+
+ ClusterManager mgr = new HazelcastClusterManager(hazelcastConfig);
+
+ VertxOptions options = new VertxOptions().setClusterManager(mgr);
+
+ Vertx.clusteredVertx(options, res -> {
+ if (res.succeeded()) {
+ Vertx vertx = res.result();
+ } else {
+ // failed!
+ }
+ });
+ }
+
+ public void customizeDefaultConfig() {
+ Config hazelcastConfig = ConfigUtil.loadConfig();
+
+ hazelcastConfig.getGroupConfig()
+ .setName("my-cluster-name");
+
+ ClusterManager mgr = new HazelcastClusterManager(hazelcastConfig);
+
+ VertxOptions options = new VertxOptions().setClusterManager(mgr);
+
+ Vertx.clusteredVertx(options, res -> {
+ if (res.succeeded()) {
+ Vertx vertx = res.result();
+ } else {
+ // failed!
+ }
+ });
+ }
+
+ public void example3(HazelcastInstance hazelcastInstance) {
+ ClusterManager mgr = new HazelcastClusterManager(hazelcastInstance);
+ VertxOptions options = new VertxOptions().setClusterManager(mgr);
+ Vertx.clusteredVertx(options, res -> {
+ if (res.succeeded()) {
+ Vertx vertx = res.result();
+ } else {
+ // failed!
+ }
+ });
+ }
+
+ public void healthCheck(Vertx vertx) {
+ Handler> procedure = ClusterHealthCheck.createProcedure(vertx);
+ HealthChecks checks = HealthChecks.create(vertx).register("cluster-health", procedure);
+ }
+
+ public void healthCheckHandler(Vertx vertx, HealthChecks checks) {
+ Router router = Router.router(vertx);
+ router.get("/readiness").handler(HealthCheckHandler.createWithHealthChecks(checks));
+ }
+
+ public void liteMemberConfig() {
+ Config hazelcastConfig = ConfigUtil.loadConfig()
+ .setLiteMember(true);
+
+ ClusterManager mgr = new HazelcastClusterManager(hazelcastConfig);
+
+ VertxOptions options = new VertxOptions().setClusterManager(mgr);
+
+ Vertx.clusteredVertx(options, res -> {
+ if (res.succeeded()) {
+ Vertx vertx = res.result();
+ } else {
+ // failed!
+ }
+ });
+ }
+}
diff --git a/hazelcast3-cluster-manager/src/main/java/examples/package-info.java b/hazelcast3-cluster-manager/src/main/java/examples/package-info.java
new file mode 100644
index 0000000000..8bb8f7b89b
--- /dev/null
+++ b/hazelcast3-cluster-manager/src/main/java/examples/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2014 Red Hat, Inc.
+ *
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * and Apache License v2.0 which accompanies this distribution.
+ *
+ * The Eclipse Public License is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * The Apache License v2.0 is available at
+ * http://www.opensource.org/licenses/apache2.0.php
+ *
+ * You may elect to redistribute this code under either of these licenses.
+ */
+
+/**
+ * @author Julien Viet
+ */
+@Source
+package examples;
+
+import io.vertx.docgen.Source;
\ No newline at end of file
diff --git a/hazelcast3-cluster-manager/src/main/java/io/vertx/spi/cluster/hazelcast/ClusterHealthCheck.java b/hazelcast3-cluster-manager/src/main/java/io/vertx/spi/cluster/hazelcast/ClusterHealthCheck.java
new file mode 100644
index 0000000000..cd982c1221
--- /dev/null
+++ b/hazelcast3-cluster-manager/src/main/java/io/vertx/spi/cluster/hazelcast/ClusterHealthCheck.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright (c) 2011-2018 Contributors to the Eclipse Foundation
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License 2.0 which is available at
+ * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
+ * which is available at https://www.apache.org/licenses/LICENSE-2.0.
+ *
+ * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
+ */
+package io.vertx.spi.cluster.hazelcast;
+
+import com.hazelcast.core.PartitionService;
+import io.vertx.codegen.annotations.VertxGen;
+import io.vertx.core.Handler;
+import io.vertx.core.Promise;
+import io.vertx.core.Vertx;
+import io.vertx.core.impl.VertxInternal;
+import io.vertx.ext.healthchecks.Status;
+
+import java.util.Objects;
+
+/**
+ * A helper to create Vert.x cluster {@link io.vertx.ext.healthchecks.HealthChecks} procedures.
+ */
+@VertxGen
+public interface ClusterHealthCheck {
+
+ /**
+ * Creates a ready-to-use Vert.x cluster {@link io.vertx.ext.healthchecks.HealthChecks} procedure.
+ *
+ * @param vertx the instance of Vert.x, must not be {@code null}
+ * @return a Vert.x cluster {@link io.vertx.ext.healthchecks.HealthChecks} procedure
+ */
+ static Handler> createProcedure(Vertx vertx) {
+ Objects.requireNonNull(vertx);
+ return future -> {
+ VertxInternal vertxInternal = (VertxInternal) vertx;
+ HazelcastClusterManager clusterManager = (HazelcastClusterManager) vertxInternal.getClusterManager();
+ PartitionService partitionService = clusterManager.getHazelcastInstance().getPartitionService();
+ future.complete(new Status().setOk(partitionService.isClusterSafe()));
+ };
+ }
+}
diff --git a/hazelcast3-cluster-manager/src/main/java/io/vertx/spi/cluster/hazelcast/ConfigUtil.java b/hazelcast3-cluster-manager/src/main/java/io/vertx/spi/cluster/hazelcast/ConfigUtil.java
new file mode 100644
index 0000000000..b4895b4911
--- /dev/null
+++ b/hazelcast3-cluster-manager/src/main/java/io/vertx/spi/cluster/hazelcast/ConfigUtil.java
@@ -0,0 +1,106 @@
+/*
+ * Copyright 2018 Red Hat, Inc.
+ *
+ * Red Hat licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.vertx.spi.cluster.hazelcast;
+
+import com.hazelcast.config.Config;
+import com.hazelcast.config.XmlConfigBuilder;
+import io.vertx.core.impl.logging.Logger;
+import io.vertx.core.impl.logging.LoggerFactory;
+
+import java.io.*;
+
+/**
+ * @author Thomas Segismont
+ */
+public class ConfigUtil {
+
+ private static final Logger log = LoggerFactory.getLogger(ConfigUtil.class);
+
+ // Hazelcast config file
+ private static final String DEFAULT_CONFIG_FILE = "default-cluster.xml";
+ private static final String CONFIG_FILE = "cluster.xml";
+
+ /**
+ * Loads Hazelcast config XML and transform it into a {@link Config} object.
+ *
+ * The content is read from:
+ *
+ *
the location denoted by the {@code vertx.hazelcast.config} sysprop, if present, or
+ *
the {@code cluster.xml} file on the classpath, if present, or
+ *
the default config file
+ *
+ *
+ * @return a config object
+ */
+ public static Config loadConfig() {
+ Config cfg = null;
+ try (InputStream is = getConfigStream();
+ InputStream bis = new BufferedInputStream(is)) {
+ cfg = new XmlConfigBuilder(bis).build();
+ } catch (IOException ex) {
+ log.error("Failed to read config", ex);
+ }
+ return cfg;
+ }
+
+ private static InputStream getConfigStream() {
+ InputStream is = getConfigStreamFromSystemProperty();
+ if (is == null) {
+ is = getConfigStreamFromClasspath(CONFIG_FILE, DEFAULT_CONFIG_FILE);
+ }
+ return is;
+ }
+
+ private static InputStream getConfigStreamFromSystemProperty() {
+ String configProp = System.getProperty("vertx.hazelcast.config");
+ InputStream is = null;
+ if (configProp != null) {
+ if (configProp.startsWith("classpath:")) {
+ return getConfigStreamFromClasspath(configProp.substring("classpath:".length()), CONFIG_FILE);
+ }
+ File cfgFile = new File(configProp);
+ if (cfgFile.exists()) {
+ try {
+ is = new FileInputStream(cfgFile);
+ } catch (FileNotFoundException ex) {
+ log.warn("Failed to open file '" + configProp + "' defined in 'vertx.hazelcast.config'. Continuing " +
+ "classpath search for " + CONFIG_FILE);
+ }
+ }
+ }
+ return is;
+ }
+
+ private static InputStream getConfigStreamFromClasspath(String configFile, String defaultConfig) {
+ InputStream is = null;
+ ClassLoader ctxClsLoader = Thread.currentThread().getContextClassLoader();
+ if (ctxClsLoader != null) {
+ is = ctxClsLoader.getResourceAsStream(configFile);
+ }
+ if (is == null) {
+ is = ConfigUtil.class.getClassLoader().getResourceAsStream(configFile);
+ if (is == null) {
+ is = ConfigUtil.class.getClassLoader().getResourceAsStream(defaultConfig);
+ }
+ }
+ return is;
+ }
+
+ private ConfigUtil() {
+ // Utility class
+ }
+}
diff --git a/hazelcast3-cluster-manager/src/main/java/io/vertx/spi/cluster/hazelcast/HazelcastClusterManager.java b/hazelcast3-cluster-manager/src/main/java/io/vertx/spi/cluster/hazelcast/HazelcastClusterManager.java
new file mode 100644
index 0000000000..918431cbfc
--- /dev/null
+++ b/hazelcast3-cluster-manager/src/main/java/io/vertx/spi/cluster/hazelcast/HazelcastClusterManager.java
@@ -0,0 +1,425 @@
+/*
+ * Copyright (c) 2011-2013 The original author or authors
+ * ------------------------------------------------------
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * and Apache License v2.0 which accompanies this distribution.
+ *
+ * The Eclipse Public License is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * The Apache License v2.0 is available at
+ * http://www.opensource.org/licenses/apache2.0.php
+ *
+ * You may elect to redistribute this code under either of these licenses.
+ */
+
+package io.vertx.spi.cluster.hazelcast;
+
+import com.hazelcast.config.Config;
+import com.hazelcast.core.*;
+import io.vertx.core.Promise;
+import io.vertx.core.Vertx;
+import io.vertx.core.VertxException;
+import io.vertx.core.impl.VertxInternal;
+import io.vertx.core.impl.logging.Logger;
+import io.vertx.core.impl.logging.LoggerFactory;
+import io.vertx.core.shareddata.AsyncMap;
+import io.vertx.core.shareddata.Counter;
+import io.vertx.core.shareddata.Lock;
+import io.vertx.core.spi.cluster.*;
+import io.vertx.spi.cluster.hazelcast.impl.*;
+
+import java.util.*;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+
+/**
+ * A cluster manager that uses Hazelcast
+ *
+ * @author Tim Fox
+ */
+public class HazelcastClusterManager implements ClusterManager, MembershipListener, LifecycleListener {
+
+ private static final Logger log = LoggerFactory.getLogger(HazelcastClusterManager.class);
+
+ private static final String LOCK_SEMAPHORE_PREFIX = "__vertx.";
+ private static final String NODE_ID_ATTRIBUTE = "__vertx.nodeId";
+
+ private VertxInternal vertx;
+ private NodeSelector nodeSelector;
+
+ private HazelcastInstance hazelcast;
+ private String nodeId;
+ private NodeInfo nodeInfo;
+ private SubsMapHelper subsMapHelper;
+ private IMap nodeInfoMap;
+ private String membershipListenerId;
+ private String lifecycleListenerId;
+ private boolean customHazelcastCluster;
+ private Set nodeIds = new HashSet<>();
+
+ private NodeListener nodeListener;
+ private volatile boolean active;
+
+ private Config conf;
+
+ private ExecutorService lockReleaseExec;
+
+ /**
+ * Constructor - gets config from classpath
+ */
+ public HazelcastClusterManager() {
+ }
+
+ /**
+ * Constructor - config supplied
+ *
+ * @param conf Hazelcast config, not null
+ */
+ public HazelcastClusterManager(Config conf) {
+ Objects.requireNonNull(conf, "The Hazelcast config cannot be null.");
+ this.conf = conf;
+ }
+
+ public HazelcastClusterManager(HazelcastInstance instance) {
+ Objects.requireNonNull(instance, "The Hazelcast instance cannot be null.");
+ hazelcast = instance;
+ customHazelcastCluster = true;
+ }
+
+ @Override
+ public void init(Vertx vertx, NodeSelector nodeSelector) {
+ this.vertx = (VertxInternal) vertx;
+ this.nodeSelector = nodeSelector;
+ }
+
+ @Override
+ public void join(Promise promise) {
+ vertx.executeBlocking(prom -> {
+ if (!active) {
+ active = true;
+
+ lockReleaseExec = Executors.newCachedThreadPool(r -> new Thread(r, "vertx-hazelcast-service-release-lock-thread"));
+
+ // The hazelcast instance has not been passed using the constructor.
+ if (!customHazelcastCluster) {
+ if (conf == null) {
+ conf = loadConfig();
+ if (conf == null) {
+ log.warn("Cannot find cluster configuration on 'vertx.hazelcast.config' system property, on the classpath, " +
+ "or specified programmatically. Using default hazelcast configuration");
+ conf = new Config();
+ }
+ }
+
+ // We have our own shutdown hook and need to ensure ours runs before Hazelcast is shutdown
+ conf.setProperty("hazelcast.shutdownhook.enabled", "false");
+
+ hazelcast = Hazelcast.newHazelcastInstance(conf);
+ }
+
+ Member localMember = hazelcast.getCluster().getLocalMember();
+ nodeId = localMember.getUuid();
+ localMember.setStringAttribute(NODE_ID_ATTRIBUTE, nodeId);
+ membershipListenerId = hazelcast.getCluster().addMembershipListener(this);
+ lifecycleListenerId = hazelcast.getLifecycleService().addLifecycleListener(this);
+
+ subsMapHelper = new SubsMapHelper(vertx, hazelcast, nodeSelector);
+ nodeInfoMap = hazelcast.getMap("__vertx.nodeInfo");
+
+ prom.complete();
+ }
+ }, promise);
+ }
+
+ @Override
+ public String getNodeId() {
+ return nodeId;
+ }
+
+ @Override
+ public List getNodes() {
+ List list = new ArrayList<>();
+ for (Member member : hazelcast.getCluster().getMembers()) {
+ String nodeIdAttribute = member.getStringAttribute(NODE_ID_ATTRIBUTE);
+ list.add(nodeIdAttribute != null ? nodeIdAttribute : member.getUuid());
+ }
+ return list;
+ }
+
+ @Override
+ public void nodeListener(NodeListener listener) {
+ this.nodeListener = listener;
+ }
+
+ @Override
+ public void setNodeInfo(NodeInfo nodeInfo, Promise promise) {
+ synchronized (this) {
+ this.nodeInfo = nodeInfo;
+ }
+ HazelcastNodeInfo value = new HazelcastNodeInfo(nodeInfo);
+ vertx.executeBlocking(prom -> {
+ nodeInfoMap.put(nodeId, value);
+ prom.complete();
+ }, false, promise);
+ }
+
+ @Override
+ public synchronized NodeInfo getNodeInfo() {
+ return nodeInfo;
+ }
+
+ @Override
+ public void getNodeInfo(String nodeId, Promise promise) {
+ vertx.executeBlocking(prom -> {
+ HazelcastNodeInfo value = nodeInfoMap.get(nodeId);
+ if (value != null) {
+ prom.complete(value.unwrap());
+ } else {
+ promise.fail("Not a member of the cluster");
+ }
+ }, false, promise);
+ }
+
+ @Override
+ public void getAsyncMap(String name, Promise> promise) {
+ promise.complete(new HazelcastAsyncMap<>(vertx, hazelcast.getMap(name)));
+ }
+
+ @Override
+ public Map getSyncMap(String name) {
+ return hazelcast.getMap(name);
+ }
+
+ @Override
+ public void getLockWithTimeout(String name, long timeout, Promise promise) {
+ vertx.executeBlocking(prom -> {
+ ISemaphore iSemaphore = hazelcast.getSemaphore(LOCK_SEMAPHORE_PREFIX + name);
+ boolean locked = false;
+ long remaining = timeout;
+ do {
+ long start = System.nanoTime();
+ try {
+ locked = iSemaphore.tryAcquire(remaining, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ // OK continue
+ }
+ remaining = remaining - MILLISECONDS.convert(System.nanoTime() - start, NANOSECONDS);
+ } while (!locked && remaining > 0);
+ if (locked) {
+ prom.complete(new HazelcastLock(iSemaphore, lockReleaseExec));
+ } else {
+ throw new VertxException("Timed out waiting to get lock " + name);
+ }
+ }, false, promise);
+ }
+
+ @Override
+ public void getCounter(String name, Promise promise) {
+ promise.complete(new HazelcastCounter(vertx, hazelcast.getAtomicLong(name)));
+ }
+
+ @Override
+ public void leave(Promise promise) {
+ vertx.executeBlocking(prom -> {
+ // We need to synchronized on the cluster manager instance to avoid other call to happen while leaving the
+ // cluster, typically, memberRemoved and memberAdded
+ synchronized (HazelcastClusterManager.this) {
+ if (active) {
+ try {
+ active = false;
+ lockReleaseExec.shutdown();
+ subsMapHelper.close();
+ boolean left = hazelcast.getCluster().removeMembershipListener(membershipListenerId);
+ if (!left) {
+ log.warn("No membership listener");
+ }
+ hazelcast.getLifecycleService().removeLifecycleListener(lifecycleListenerId);
+
+ // Do not shutdown the cluster if we are not the owner.
+ while (!customHazelcastCluster && hazelcast.getLifecycleService().isRunning()) {
+ try {
+ // This can sometimes throw java.util.concurrent.RejectedExecutionException so we retry.
+ hazelcast.getLifecycleService().shutdown();
+ } catch (RejectedExecutionException ignore) {
+ log.debug("Rejected execution of the shutdown operation, retrying");
+ }
+ try {
+ Thread.sleep(1);
+ } catch (InterruptedException t) {
+ // Manage the interruption in another handler.
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ if (customHazelcastCluster) {
+ hazelcast.getCluster().getLocalMember().removeAttribute(NODE_ID_ATTRIBUTE);
+ }
+
+ } catch (Throwable t) {
+ prom.fail(t);
+ }
+ }
+ }
+ prom.complete();
+ }, promise);
+ }
+
+ @Override
+ public synchronized void memberAdded(MembershipEvent membershipEvent) {
+ if (!active) {
+ return;
+ }
+ Member member = membershipEvent.getMember();
+ String attribute = member.getStringAttribute(NODE_ID_ATTRIBUTE);
+ String nid = attribute != null ? attribute : member.getUuid();
+ try {
+ if (nodeListener != null) {
+ nodeIds.add(nid);
+ nodeListener.nodeAdded(nid);
+ }
+ } catch (Throwable t) {
+ log.error("Failed to handle memberAdded", t);
+ }
+ }
+
+ @Override
+ public synchronized void memberRemoved(MembershipEvent membershipEvent) {
+ if (!active) {
+ return;
+ }
+ Member member = membershipEvent.getMember();
+ String attribute = member.getStringAttribute(NODE_ID_ATTRIBUTE);
+ String nid = attribute != null ? attribute : member.getUuid();
+ try {
+ membersRemoved(Collections.singleton(nid));
+ } catch (Throwable t) {
+ log.error("Failed to handle memberRemoved", t);
+ }
+ }
+
+ private synchronized void membersRemoved(Set ids) {
+ cleanSubs(ids);
+ cleanNodeInfos(ids);
+ nodeInfoMap.put(nodeId, new HazelcastNodeInfo(getNodeInfo()));
+ nodeSelector.registrationsLost();
+ republishOwnSubs();
+ if (nodeListener != null) {
+ nodeIds.removeAll(ids);
+ ids.forEach(nodeListener::nodeLeft);
+ }
+ }
+
+ private void cleanSubs(Set ids) {
+ subsMapHelper.removeAllForNodes(ids);
+ }
+
+ private void cleanNodeInfos(Set ids) {
+ ids.forEach(nodeInfoMap::remove);
+ }
+
+ private void republishOwnSubs() {
+ vertx.executeBlocking(prom -> {
+ subsMapHelper.republishOwnSubs();
+ prom.complete();
+ }, false);
+ }
+
+ @Override
+ public synchronized void stateChanged(LifecycleEvent lifecycleEvent) {
+ if (!active) {
+ return;
+ }
+ // Safeguard to make sure members list is OK after a partition merge
+ if (lifecycleEvent.getState() == LifecycleEvent.LifecycleState.MERGED) {
+ final List currentNodes = getNodes();
+ Set newNodes = new HashSet<>(currentNodes);
+ newNodes.removeAll(nodeIds);
+ Set removedMembers = new HashSet<>(nodeIds);
+ removedMembers.removeAll(currentNodes);
+ for (String nodeId : newNodes) {
+ nodeListener.nodeAdded(nodeId);
+ }
+ membersRemoved(removedMembers);
+ nodeIds.retainAll(currentNodes);
+ }
+ }
+
+ @Override
+ public boolean isActive() {
+ return active;
+ }
+
+ @Override
+ public void addRegistration(String address, RegistrationInfo registrationInfo, Promise promise) {
+ SubsOpSerializer serializer = SubsOpSerializer.get(vertx.getOrCreateContext());
+ serializer.execute(subsMapHelper::put, address, registrationInfo, promise);
+ }
+
+ @Override
+ public void removeRegistration(String address, RegistrationInfo registrationInfo, Promise promise) {
+ SubsOpSerializer serializer = SubsOpSerializer.get(vertx.getOrCreateContext());
+ serializer.execute(subsMapHelper::remove, address, registrationInfo, promise);
+ }
+
+ @Override
+ public void getRegistrations(String address, Promise> promise) {
+ vertx.executeBlocking(prom -> {
+ prom.complete(subsMapHelper.get(address));
+ }, false, promise);
+ }
+
+ @Override
+ public void memberAttributeChanged(MemberAttributeEvent memberAttributeEvent) {
+ }
+
+ /**
+ * Get the Hazelcast config.
+ *
+ * @return a config object
+ */
+ public Config getConfig() {
+ return conf;
+ }
+
+ /**
+ * Set the Hazelcast config.
+ *
+ * @param config a config object
+ */
+ public void setConfig(Config config) {
+ this.conf = config;
+ }
+
+ /**
+ * Load Hazelcast config XML and transform it into a {@link Config} object.
+ * The content is read from:
+ *
+ *
the location denoted by the {@code vertx.hazelcast.config} sysprop, if present, or
+ *
the {@code cluster.xml} file on the classpath, if present, or
+ *
the default config file
+ *
+ *
+ * The cluster manager uses this method to load the config when the node joins the cluster, if no config was provided upon creation.
+ *
+ *
+ * You may use this method to get a base config and customize it before the node joins the cluster.
+ * In this case, don't forget to invoke {@link #setConfig(Config)} after you applied your changes.
+ *
+ *
+ * @return a config object
+ */
+ public Config loadConfig() {
+ return ConfigUtil.loadConfig();
+ }
+
+ public HazelcastInstance getHazelcastInstance() {
+ return hazelcast;
+ }
+}
diff --git a/hazelcast3-cluster-manager/src/main/java/io/vertx/spi/cluster/hazelcast/impl/ConversionUtils.java b/hazelcast3-cluster-manager/src/main/java/io/vertx/spi/cluster/hazelcast/impl/ConversionUtils.java
new file mode 100644
index 0000000000..0cfe8cd289
--- /dev/null
+++ b/hazelcast3-cluster-manager/src/main/java/io/vertx/spi/cluster/hazelcast/impl/ConversionUtils.java
@@ -0,0 +1,90 @@
+package io.vertx.spi.cluster.hazelcast.impl;
+
+import com.hazelcast.nio.ObjectDataInput;
+import com.hazelcast.nio.ObjectDataOutput;
+import com.hazelcast.nio.serialization.DataSerializable;
+import io.vertx.core.buffer.Buffer;
+import io.vertx.core.shareddata.impl.ClusterSerializable;
+
+import java.io.IOException;
+
+public class ConversionUtils {
+
+ @SuppressWarnings("unchecked")
+ public static T convertParam(T obj) {
+ if (obj instanceof ClusterSerializable) {
+ ClusterSerializable cobj = (ClusterSerializable) obj;
+ return (T) (new DataSerializableHolder(cobj));
+ } else {
+ return obj;
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ public static T convertReturn(Object obj) {
+ if (obj instanceof DataSerializableHolder) {
+ DataSerializableHolder cobj = (DataSerializableHolder) obj;
+ return (T) cobj.clusterSerializable();
+ } else {
+ return (T) obj;
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private static final class DataSerializableHolder implements DataSerializable {
+
+ private ClusterSerializable clusterSerializable;
+
+ public DataSerializableHolder() {
+ }
+
+ private DataSerializableHolder(ClusterSerializable clusterSerializable) {
+ this.clusterSerializable = clusterSerializable;
+ }
+
+ @Override
+ public void writeData(ObjectDataOutput objectDataOutput) throws IOException {
+ objectDataOutput.writeUTF(clusterSerializable.getClass().getName());
+ Buffer buffer = Buffer.buffer();
+ clusterSerializable.writeToBuffer(buffer);
+ byte[] bytes = buffer.getBytes();
+ objectDataOutput.writeInt(bytes.length);
+ objectDataOutput.write(bytes);
+ }
+
+ @Override
+ public void readData(ObjectDataInput objectDataInput) throws IOException {
+ String className = objectDataInput.readUTF();
+ int length = objectDataInput.readInt();
+ byte[] bytes = new byte[length];
+ objectDataInput.readFully(bytes);
+ try {
+ Class> clazz = Thread.currentThread().getContextClassLoader().loadClass(className);
+ clusterSerializable = (ClusterSerializable) clazz.newInstance();
+ clusterSerializable.readFromBuffer(0, Buffer.buffer(bytes));
+ } catch (Exception e) {
+ throw new IllegalStateException("Failed to load class " + e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (!(o instanceof DataSerializableHolder)) return false;
+ DataSerializableHolder that = (DataSerializableHolder) o;
+ if (clusterSerializable != null ? !clusterSerializable.equals(that.clusterSerializable) : that.clusterSerializable != null) {
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ return clusterSerializable != null ? clusterSerializable.hashCode() : 0;
+ }
+
+ public ClusterSerializable clusterSerializable() {
+ return clusterSerializable;
+ }
+ }
+}
diff --git a/hazelcast3-cluster-manager/src/main/java/io/vertx/spi/cluster/hazelcast/impl/HandlerCallBackAdapter.java b/hazelcast3-cluster-manager/src/main/java/io/vertx/spi/cluster/hazelcast/impl/HandlerCallBackAdapter.java
new file mode 100644
index 0000000000..e01c35daed
--- /dev/null
+++ b/hazelcast3-cluster-manager/src/main/java/io/vertx/spi/cluster/hazelcast/impl/HandlerCallBackAdapter.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2020 Red Hat, Inc.
+ *
+ * Red Hat licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.vertx.spi.cluster.hazelcast.impl;
+
+import com.hazelcast.core.ExecutionCallback;
+import io.vertx.core.Promise;
+
+/**
+ * @author Jaromir Hamala
+ */
+public class HandlerCallBackAdapter implements ExecutionCallback {
+
+ private final Promise promise;
+
+ public HandlerCallBackAdapter(Promise promise) {
+ this.promise = promise;
+ }
+
+ @Override
+ public void onResponse(V v) {
+ promise.complete(v);
+ }
+
+ @Override
+ public void onFailure(Throwable throwable) {
+ promise.fail(throwable);
+ }
+}
diff --git a/hazelcast3-cluster-manager/src/main/java/io/vertx/spi/cluster/hazelcast/impl/HazelcastAsyncMap.java b/hazelcast3-cluster-manager/src/main/java/io/vertx/spi/cluster/hazelcast/impl/HazelcastAsyncMap.java
new file mode 100644
index 0000000000..b9a896b75b
--- /dev/null
+++ b/hazelcast3-cluster-manager/src/main/java/io/vertx/spi/cluster/hazelcast/impl/HazelcastAsyncMap.java
@@ -0,0 +1,165 @@
+/*
+ * Copyright (c) 2011-2013 The original author or authors
+ * ------------------------------------------------------
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * and Apache License v2.0 which accompanies this distribution.
+ *
+ * The Eclipse Public License is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * The Apache License v2.0 is available at
+ * http://www.opensource.org/licenses/apache2.0.php
+ *
+ * You may elect to redistribute this code under either of these licenses.
+ */
+
+package io.vertx.spi.cluster.hazelcast.impl;
+
+import com.hazelcast.core.IMap;
+import io.vertx.core.Future;
+import io.vertx.core.impl.VertxInternal;
+import io.vertx.core.impl.future.PromiseInternal;
+import io.vertx.core.shareddata.AsyncMap;
+
+import java.util.*;
+import java.util.Map.Entry;
+import java.util.concurrent.TimeUnit;
+
+import static io.vertx.spi.cluster.hazelcast.impl.ConversionUtils.convertParam;
+import static io.vertx.spi.cluster.hazelcast.impl.ConversionUtils.convertReturn;
+
+public class HazelcastAsyncMap implements AsyncMap {
+
+ private final VertxInternal vertx;
+ private final IMap map;
+
+ public HazelcastAsyncMap(VertxInternal vertx, IMap map) {
+ this.vertx = vertx;
+ this.map = map;
+ }
+
+ @Override
+ public Future get(K k) {
+ K kk = convertParam(k);
+ PromiseInternal promise = vertx.promise();
+ map.getAsync(kk).andThen(new HandlerCallBackAdapter<>(promise));
+ return promise.future().map(ConversionUtils::convertReturn);
+ }
+
+ @Override
+ public Future put(K k, V v) {
+ K kk = convertParam(k);
+ V vv = convertParam(v);
+ PromiseInternal promise = vertx.promise();
+ map.setAsync(kk, HazelcastServerID.convertServerID(vv)).andThen(new HandlerCallBackAdapter<>(promise));
+ return promise.future();
+ }
+
+ @Override
+ public Future putIfAbsent(K k, V v) {
+ K kk = convertParam(k);
+ V vv = convertParam(v);
+ return vertx.executeBlocking(fut -> {
+ fut.complete(convertReturn(map.putIfAbsent(kk, HazelcastServerID.convertServerID(vv))));
+ }, false);
+ }
+
+ @Override
+ public Future put(K k, V v, long ttl) {
+ K kk = convertParam(k);
+ V vv = convertParam(v);
+ return vertx.executeBlocking(fut -> {
+ map.set(kk, HazelcastServerID.convertServerID(vv), ttl, TimeUnit.MILLISECONDS);
+ fut.complete();
+ }, false);
+ }
+
+ @Override
+ public Future putIfAbsent(K k, V v, long ttl) {
+ K kk = convertParam(k);
+ V vv = convertParam(v);
+ return vertx.executeBlocking(fut -> fut.complete(convertReturn(map.putIfAbsent(kk, HazelcastServerID.convertServerID(vv),
+ ttl, TimeUnit.MILLISECONDS))), false);
+ }
+
+ @Override
+ public Future remove(K k) {
+ K kk = convertParam(k);
+ PromiseInternal promise = vertx.promise();
+ map.removeAsync(kk).andThen(new HandlerCallBackAdapter<>(promise));
+ return promise.future().map(ConversionUtils::convertReturn);
+ }
+
+ @Override
+ public Future removeIfPresent(K k, V v) {
+ K kk = convertParam(k);
+ V vv = convertParam(v);
+ return vertx.executeBlocking(fut -> fut.complete(map.remove(kk, vv)), false);
+ }
+
+ @Override
+ public Future replace(K k, V v) {
+ K kk = convertParam(k);
+ V vv = convertParam(v);
+ return vertx.executeBlocking(fut -> fut.complete(convertReturn(map.replace(kk, vv))), false);
+ }
+
+ @Override
+ public Future replaceIfPresent(K k, V oldValue, V newValue) {
+ K kk = convertParam(k);
+ V vv = convertParam(oldValue);
+ V vvv = convertParam(newValue);
+ return vertx.executeBlocking(fut -> fut.complete(map.replace(kk, vv, vvv)), false);
+ }
+
+ @Override
+ public Future clear() {
+ return vertx.executeBlocking(fut -> {
+ map.clear();
+ fut.complete();
+ }, false);
+ }
+
+ @Override
+ public Future size() {
+ return vertx.executeBlocking(fut -> fut.complete(map.size()), false);
+ }
+
+ @Override
+ public Future> keys() {
+ return vertx.executeBlocking(fut -> {
+ Set set = new HashSet<>();
+ for (K kk : map.keySet()) {
+ K k = ConversionUtils.convertReturn(kk);
+ set.add(k);
+ }
+ fut.complete(set);
+ }, false);
+ }
+
+ @Override
+ public Future> values() {
+ return vertx.executeBlocking(fut -> {
+ List list = new ArrayList<>();
+ for (V vv : map.values()) {
+ V v = ConversionUtils.convertReturn(vv);
+ list.add(v);
+ }
+ fut.complete(list);
+ }, false);
+ }
+
+ @Override
+ public Future