Skip to content

Commit

Permalink
Merge pull request #132 from nacos-group/develop
Browse files Browse the repository at this point in the history
Deal with metadata format problems caused by Dubbo version #102
  • Loading branch information
paderlol authored Jan 10, 2020
2 parents d6f98bb + f97e6a1 commit 4a6df3b
Show file tree
Hide file tree
Showing 7 changed files with 137 additions and 117 deletions.
2 changes: 1 addition & 1 deletion nacossync-console/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
<parent>
<artifactId>nacossync-parent</artifactId>
<groupId>com.alibaba.nacossync</groupId>
<version>0.3.4</version>
<version>0.3.5</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
2 changes: 1 addition & 1 deletion nacossync-distribution/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>nacossync-parent</artifactId>
<groupId>com.alibaba.nacossync</groupId>
<version>0.3.4</version>
<version>0.3.5</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<packaging>pom</packaging>
Expand Down
4 changes: 2 additions & 2 deletions nacossync-test/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
<parent>
<artifactId>nacossync-parent</artifactId>
<groupId>com.alibaba.nacossync</groupId>
<version>0.3.4</version>
<version>0.3.5</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
Expand All @@ -39,7 +39,7 @@
<dependency>
<groupId>com.alibaba.nacossync</groupId>
<artifactId>nacossync-worker</artifactId>
<version>0.3.4</version>
<version>0.3.5</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
Expand Down
4 changes: 2 additions & 2 deletions nacossync-worker/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@
<parent>
<artifactId>nacossync-parent</artifactId>
<groupId>com.alibaba.nacossync</groupId>
<version>0.3.4</version>
<version>0.3.5</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>nacossync-worker</artifactId>
<version>0.3.4</version>
<version>0.3.5</version>
<properties>
<zookeeper.version>3.4.9</zookeeper.version>
<curator.version>4.1.0</curator.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,6 @@
*/
package com.alibaba.nacossync.extension.impl;

import static com.alibaba.nacossync.util.DubboConstants.CATALOG_KEY;
import static com.alibaba.nacossync.util.DubboConstants.GROUP_KEY;
import static com.alibaba.nacossync.util.DubboConstants.INSTANCE_IP_KEY;
import static com.alibaba.nacossync.util.DubboConstants.INSTANCE_PORT_KEY;
import static com.alibaba.nacossync.util.DubboConstants.INTERFACE_KEY;
import static com.alibaba.nacossync.util.DubboConstants.PROTOCOL_KEY;
import static com.alibaba.nacossync.util.DubboConstants.VERSION_KEY;
import static com.alibaba.nacossync.util.DubboConstants.WEIGHT_KEY;
import static com.alibaba.nacossync.util.StringUtils.convertDubboProvidersPath;
import static com.alibaba.nacossync.util.StringUtils.parseIpAndPortString;
import static com.alibaba.nacossync.util.StringUtils.parseQueryString;

import com.alibaba.nacos.api.naming.NamingService;
import com.alibaba.nacos.api.naming.pojo.Instance;
import com.alibaba.nacossync.cache.SkyWalkerCacheServices;
Expand All @@ -36,20 +24,23 @@
import com.alibaba.nacossync.extension.holder.ZookeeperServerHolder;
import com.alibaba.nacossync.monitor.MetricsManager;
import com.alibaba.nacossync.pojo.model.TaskDO;
import com.google.common.base.Joiner;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Predicate;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.utils.CloseableUtils;
import org.springframework.beans.factory.annotation.Autowired;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Predicate;

import static com.alibaba.nacossync.util.DubboConstants.*;
import static com.alibaba.nacossync.util.StringUtils.*;

/**
* @author paderlol
* @version 1.0
Expand All @@ -63,7 +54,7 @@ public class ZookeeperSyncToNacosServiceImpl implements SyncService {
private MetricsManager metricsManager;

/**
* Listener cache of Zookeeper format taskId -> PathChildrenCache instance
* Listener cache of Zookeeper format taskId -> PathChildrenCache instance
*/
private Map<String, PathChildrenCache> pathChildrenCacheMap = new ConcurrentHashMap<>();
/**
Expand All @@ -79,7 +70,7 @@ public class ZookeeperSyncToNacosServiceImpl implements SyncService {

@Autowired
public ZookeeperSyncToNacosServiceImpl(ZookeeperServerHolder zookeeperServerHolder,
NacosServerHolder nacosServerHolder, SkyWalkerCacheServices skyWalkerCacheServices) {
NacosServerHolder nacosServerHolder, SkyWalkerCacheServices skyWalkerCacheServices) {
this.zookeeperServerHolder = zookeeperServerHolder;
this.nacosServerHolder = nacosServerHolder;
this.skyWalkerCacheServices = skyWalkerCacheServices;
Expand All @@ -93,67 +84,57 @@ public boolean sync(TaskDO taskDO) {
}

PathChildrenCache pathChildrenCache = getPathCache(taskDO);
NamingService destNamingService = nacosServerHolder
.get(taskDO.getDestClusterId(), null);
NamingService destNamingService = nacosServerHolder.get(taskDO.getDestClusterId(), null);
List<ChildData> currentData = pathChildrenCache.getCurrentData();
for (ChildData childData : currentData) {
String path = childData.getPath();
Map<String, String> queryParam = parseQueryString(childData.getPath());
if (isMatch(taskDO, queryParam) && needSync(queryParam)) {
Map<String, String> ipAndPortParam = parseIpAndPortString(path);
Instance instance = buildSyncInstance(queryParam, ipAndPortParam, taskDO);
destNamingService.registerInstance(
getServiceNameFromCache(taskDO.getTaskId(), queryParam),
instance);
destNamingService.registerInstance(getServiceNameFromCache(taskDO.getTaskId(), queryParam),
instance);
}
}
Objects.requireNonNull(pathChildrenCache).getListenable()
.addListener((client, event) -> {
try {

String path = event.getData().getPath();
Map<String, String> queryParam = parseQueryString(path);

if (isMatch(taskDO, queryParam) && needSync(queryParam)) {
Map<String, String> ipAndPortParam = parseIpAndPortString(path);
Instance instance = buildSyncInstance(queryParam, ipAndPortParam,
taskDO);
switch (event.getType()) {
case CHILD_ADDED:
destNamingService.registerInstance(
getServiceNameFromCache(taskDO.getTaskId(),
queryParam), instance);
break;
case CHILD_UPDATED:

destNamingService.registerInstance(
getServiceNameFromCache(taskDO.getTaskId(),
queryParam), instance);
break;
case CHILD_REMOVED:

destNamingService.deregisterInstance(
getServiceNameFromCache(taskDO.getTaskId(),
queryParam),
ipAndPortParam.get(INSTANCE_IP_KEY),
Integer.parseInt(
ipAndPortParam.get(INSTANCE_PORT_KEY)));

break;
default:
break;
}
}
} catch (Exception e) {
log.error("event process from zookeeper to nacos was failed, taskId:{}",
taskDO.getTaskId(), e);
metricsManager.recordError(MetricsStatisticsType.SYNC_ERROR);
Objects.requireNonNull(pathChildrenCache).getListenable().addListener((client, event) -> {
try {

String path = event.getData().getPath();
Map<String, String> queryParam = parseQueryString(path);

if (isMatch(taskDO, queryParam) && needSync(queryParam)) {
Map<String, String> ipAndPortParam = parseIpAndPortString(path);
Instance instance = buildSyncInstance(queryParam, ipAndPortParam, taskDO);
switch (event.getType()) {
case CHILD_ADDED:
destNamingService.registerInstance(
getServiceNameFromCache(taskDO.getTaskId(), queryParam), instance);
break;
case CHILD_UPDATED:

destNamingService.registerInstance(
getServiceNameFromCache(taskDO.getTaskId(), queryParam), instance);
break;
case CHILD_REMOVED:

destNamingService.deregisterInstance(
getServiceNameFromCache(taskDO.getTaskId(), queryParam),
ipAndPortParam.get(INSTANCE_IP_KEY),
Integer.parseInt(ipAndPortParam.get(INSTANCE_PORT_KEY)));
nacosServiceNameMap.remove(taskDO.getTaskId());
break;
default:
break;
}
}
} catch (Exception e) {
log.error("event process from zookeeper to nacos was failed, taskId:{}", taskDO.getTaskId(), e);
metricsManager.recordError(MetricsStatisticsType.SYNC_ERROR);
}

});
});
} catch (Exception e) {
log.error("sync task from zookeeper to nacos was failed, taskId:{}", taskDO.getTaskId(),
e);
log.error("sync task from zookeeper to nacos was failed, taskId:{}", taskDO.getTaskId(), e);
metricsManager.recordError(MetricsStatisticsType.SYNC_ERROR);
return false;
}
Expand All @@ -165,26 +146,22 @@ public boolean delete(TaskDO taskDO) {
try {

CloseableUtils.closeQuietly(pathChildrenCacheMap.get(taskDO.getTaskId()));
NamingService destNamingService = nacosServerHolder
.get(taskDO.getDestClusterId(), null);
if(nacosServiceNameMap.containsKey(taskDO.getTaskId())){
NamingService destNamingService = nacosServerHolder.get(taskDO.getDestClusterId(), null);
if (nacosServiceNameMap.containsKey(taskDO.getTaskId())) {
List<Instance> allInstances =
destNamingService
.getAllInstances(nacosServiceNameMap.get(taskDO.getTaskId()));
destNamingService.getAllInstances(nacosServiceNameMap.get(taskDO.getTaskId()));
for (Instance instance : allInstances) {
if (needDelete(instance.getMetadata(), taskDO)) {
destNamingService
.deregisterInstance(instance.getServiceName(), instance.getIp(),
instance.getPort());
destNamingService.deregisterInstance(instance.getServiceName(), instance.getIp(),
instance.getPort());
}
nacosServiceNameMap.remove(taskDO.getTaskId());

}
}

} catch (Exception e) {
log.error("delete task from zookeeper to nacos was failed, taskId:{}",
taskDO.getTaskId(), e);
log.error("delete task from zookeeper to nacos was failed, taskId:{}", taskDO.getTaskId(), e);
metricsManager.recordError(MetricsStatisticsType.DELETE_ERROR);
return false;
}
Expand All @@ -198,33 +175,27 @@ protected PathChildrenCache getPathCache(TaskDO taskDO) {
return pathChildrenCacheMap.computeIfAbsent(taskDO.getTaskId(), (key) -> {
try {
PathChildrenCache pathChildrenCache =
new PathChildrenCache(
zookeeperServerHolder.get(taskDO.getSourceClusterId(), ""),
convertDubboProvidersPath(taskDO.getServiceName()), false);
new PathChildrenCache(zookeeperServerHolder.get(taskDO.getSourceClusterId(), ""),
convertDubboProvidersPath(taskDO.getServiceName()), false);
pathChildrenCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
return pathChildrenCache;
} catch (Exception e) {
log.error("zookeeper path children cache start failed, taskId:{}",
taskDO.getTaskId(), e);
log.error("zookeeper path children cache start failed, taskId:{}", taskDO.getTaskId(), e);
return null;
}
});

}



/**
* The instance information that needs to be synchronized is matched based on the dubbo version
* and the grouping name
* The instance information that needs to be synchronized is matched based on the dubbo version and the grouping
* name
*/
protected boolean isMatch(TaskDO taskDO, Map<String, String> queryParam) {
Predicate<TaskDO> isVersionEq =
(task) -> StringUtils.isBlank(taskDO.getVersion()) || StringUtils
.equals(task.getVersion(), queryParam.get(VERSION_KEY));
Predicate<TaskDO> isGroupEq =
(task) -> StringUtils.isBlank(taskDO.getGroupName()) || StringUtils
.equals(task.getGroupName(), queryParam.get(GROUP_KEY));
Predicate<TaskDO> isVersionEq = (task) -> StringUtils.isBlank(taskDO.getVersion())
|| StringUtils.equals(task.getVersion(), queryParam.get(VERSION_KEY));
Predicate<TaskDO> isGroupEq = (task) -> StringUtils.isBlank(taskDO.getGroupName())
|| StringUtils.equals(task.getGroupName(), queryParam.get(GROUP_KEY));
return isVersionEq.and(isGroupEq).test(taskDO);
}

Expand All @@ -234,22 +205,20 @@ protected boolean isMatch(TaskDO taskDO, Map<String, String> queryParam) {
* @param queryParam dubbo metadata
* @param ipAndPortMap dubbo ip and address
*/
protected Instance buildSyncInstance(Map<String, String> queryParam,
Map<String, String> ipAndPortMap,
TaskDO taskDO) {
protected Instance buildSyncInstance(Map<String, String> queryParam, Map<String, String> ipAndPortMap,
TaskDO taskDO) {
Instance temp = new Instance();
temp.setIp(ipAndPortMap.get(INSTANCE_IP_KEY));
temp.setPort(Integer.parseInt(ipAndPortMap.get(INSTANCE_PORT_KEY)));
temp.setServiceName(getServiceNameFromCache(taskDO.getTaskId(), queryParam));
temp.setWeight(Double.valueOf(
queryParam.get(WEIGHT_KEY) == null ? "1.0" : queryParam.get(WEIGHT_KEY)));
temp.setWeight(Double.valueOf(queryParam.get(WEIGHT_KEY) == null ? "1.0" : queryParam.get(WEIGHT_KEY)));
temp.setHealthy(true);

Map<String, String> metaData = new HashMap<>(queryParam);
metaData.put(PROTOCOL_KEY, ipAndPortMap.get(PROTOCOL_KEY));
metaData.put(SkyWalkerConstants.DEST_CLUSTERID_KEY, taskDO.getDestClusterId());
metaData.put(SkyWalkerConstants.SYNC_SOURCE_KEY,
skyWalkerCacheServices.getClusterType(taskDO.getSourceClusterId()).getCode());
skyWalkerCacheServices.getClusterType(taskDO.getSourceClusterId()).getCode());
metaData.put(SkyWalkerConstants.SOURCE_CLUSTERID_KEY, taskDO.getSourceClusterId());
temp.setMetadata(metaData);
return temp;
Expand All @@ -262,10 +231,7 @@ protected Instance buildSyncInstance(Map<String, String> queryParam,
* @param queryParam dubbo metadata
*/
protected String getServiceNameFromCache(String taskId, Map<String, String> queryParam) {
return nacosServiceNameMap
.computeIfAbsent(taskId, (key) -> Joiner.on(":").skipNulls().join(CATALOG_KEY,
queryParam.get(INTERFACE_KEY), queryParam.get(VERSION_KEY),
queryParam.get(GROUP_KEY)));
return nacosServiceNameMap.computeIfAbsent(taskId, (key) -> createServiceName(queryParam));
}

}
Loading

0 comments on commit 4a6df3b

Please sign in to comment.