Skip to content

Commit

Permalink
Merge pull request #128 from nacos-group/develop
Browse files Browse the repository at this point in the history
Develop
  • Loading branch information
paderlol authored Dec 27, 2019
2 parents 9a79110 + 40c7e09 commit c2a60a8
Show file tree
Hide file tree
Showing 15 changed files with 82 additions and 65 deletions.
2 changes: 1 addition & 1 deletion nacossync-console/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
<parent>
<artifactId>nacossync-parent</artifactId>
<groupId>com.alibaba.nacossync</groupId>
<version>0.3.3</version>
<version>0.3.4</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
2 changes: 1 addition & 1 deletion nacossync-distribution/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>nacossync-parent</artifactId>
<groupId>com.alibaba.nacossync</groupId>
<version>0.3.3</version>
<version>0.3.4</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<packaging>pom</packaging>
Expand Down
5 changes: 2 additions & 3 deletions nacossync-test/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
<parent>
<artifactId>nacossync-parent</artifactId>
<groupId>com.alibaba.nacossync</groupId>
<version>0.3.3</version>
<version>0.3.4</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
Expand All @@ -28,7 +28,6 @@
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>

<dependencies>

<dependency>
Expand All @@ -40,7 +39,7 @@
<dependency>
<groupId>com.alibaba.nacossync</groupId>
<artifactId>nacossync-worker</artifactId>
<version>0.3.3</version>
<version>0.3.4</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
Expand Down
4 changes: 2 additions & 2 deletions nacossync-worker/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@
<parent>
<artifactId>nacossync-parent</artifactId>
<groupId>com.alibaba.nacossync</groupId>
<version>0.3.3</version>
<version>0.3.4</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>nacossync-worker</artifactId>
<version>0.3.3</version>
<version>0.3.4</version>
<properties>
<zookeeper.version>3.4.9</zookeeper.version>
<curator.version>4.1.0</curator.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,6 @@
*/
package com.alibaba.nacossync.cache;

import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantLock;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;

import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference;
import com.alibaba.nacossync.constant.ClusterTypeEnum;
Expand All @@ -36,6 +25,16 @@
import com.alibaba.nacossync.pojo.model.ClusterDO;
import com.alibaba.nacossync.pojo.model.TaskDO;
import com.alibaba.nacossync.util.SkyWalkerUtil;
import org.jboss.netty.util.internal.ThreadLocalRandom;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;

import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;

/**
* @author NacosSync
Expand All @@ -52,7 +51,7 @@ public class SkyWalkerCacheServices {
public String getClusterConnectKey(String clusterId) {
List<String> allClusterConnectKey = getAllClusterConnectKey(clusterId);

Random random = new Random();
Random random = new ThreadLocalRandom();
return allClusterConnectKey.get(random.nextInt(allClusterConnectKey.size()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,36 +14,38 @@

import com.alibaba.nacossync.cache.SkyWalkerCacheServices;
import com.google.common.base.Joiner;
import lombok.extern.slf4j.Slf4j;
import org.apache.logging.log4j.util.Strings;
import org.springframework.beans.factory.annotation.Autowired;

import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;

/**
* @author paderlol
* @date 2018-12-24 22:08
*/
@Slf4j
public abstract class AbstractServerHolder<T> implements Holder {
public abstract class AbstractServerHolderImpl<T> implements Holder {

private final Map<String, T> serviceMap = new ConcurrentHashMap<>();
@Autowired
protected SkyWalkerCacheServices skyWalkerCacheServices;

@Override
public T get(String clusterId, String namespace) {
final String finalNamespace = Optional.ofNullable(namespace).orElse("");
String key = Joiner.on("_").join(clusterId,finalNamespace);
log.info("starting create cluster server,clusterId={}", clusterId);
final String finalNamespace = Optional.ofNullable(namespace).orElse(Strings.EMPTY);
String key = Joiner.on("_").join(clusterId, finalNamespace);

serviceMap.computeIfAbsent(key, clusterKey -> {
try {
return createServer(clusterId,
() -> skyWalkerCacheServices.getClusterConnectKey(clusterId),
finalNamespace);
log.info("Starting create cluster server, clusterId={}", clusterId);
return createServer(clusterId, () -> skyWalkerCacheServices.getClusterConnectKey(clusterId),
finalNamespace);
} catch (Exception e) {
log.error(String.format("clusterId=%s,start server failed", clusterId), e);
log.error(String.format("clusterId=%s, start server failed", clusterId), e);
return null;
}
});
Expand All @@ -52,16 +54,14 @@ public T get(String clusterId, String namespace) {
}

/**
* create real cluster client instance
* Create real cluster client instance
*
* @param clusterId cluster id
* @param serverAddressSupplier server address
* @param namespace name space
* @return cluster client instance
*/
abstract T createServer(String clusterId, Supplier<String> serverAddressSupplier,
String namespace)
throws Exception;

abstract T createServer(String clusterId, Supplier<String> serverAddressSupplier, String namespace)
throws Exception;

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
*/
@Service
@Slf4j
public class ConsulServerHolder extends AbstractServerHolder<ConsulClient> {
public class ConsulServerHolder extends AbstractServerHolderImpl<ConsulClient> {

public static final String HTTP = "http://";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
*/
@Service
@Slf4j
public class EurekaServerHolder extends AbstractServerHolder<EurekaNamingService> {
public class EurekaServerHolder extends AbstractServerHolderImpl<EurekaNamingService> {
@Override
EurekaNamingService createServer(String clusterId, Supplier<String> serverAddressSupplier, String namespace) throws Exception {
RestTemplateTransportClientFactory restTemplateTransportClientFactory =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
package com.alibaba.nacossync.extension.holder;

/**
* cluster client service
* Cluster client service
* @author paderlol
* @date 2018-12-24 21:59
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
*/
@Service
@Slf4j
public class NacosServerHolder extends AbstractServerHolder<NamingService> {
public class NacosServerHolder extends AbstractServerHolderImpl<NamingService> {

@Override
NamingService createServer(String clusterId, Supplier<String> serverAddressSupplier, String namespace) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
*/
@Service
@Slf4j
public class ZookeeperServerHolder extends AbstractServerHolder<CuratorFramework> {
public class ZookeeperServerHolder extends AbstractServerHolderImpl<CuratorFramework> {


@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ public boolean delete(TaskDO taskDO) {
List<Instance> allInstances = destNamingService.getAllInstances(taskDO.getServiceName());
for (Instance instance : allInstances) {
if (needDelete(instance.getMetadata(), taskDO)) {

destNamingService.deregisterInstance(taskDO.getServiceName(), instance.getIp(), instance.getPort());
}
}
Expand All @@ -94,26 +95,26 @@ public boolean sync(TaskDO taskDO) {
Response<List<HealthService>> response =
consulClient.getHealthServices(taskDO.getServiceName(), true, QueryParams.DEFAULT);
List<HealthService> healthServiceList = response.getValue();
Set<String> instanceKeySet = new HashSet<>();
Set<String> instanceKeys = new HashSet<>();
for (HealthService healthService : healthServiceList) {
if (needSync(ConsulUtils.transferMetadata(healthService.getService().getTags()))) {

destNamingService.registerInstance(taskDO.getServiceName(),
buildSyncInstance(healthService, taskDO));
instanceKeySet.add(composeInstanceKey(healthService.getService().getAddress(),
instanceKeys.add(composeInstanceKey(healthService.getService().getAddress(),
healthService.getService().getPort()));
}
}
List<Instance> allInstances = destNamingService.getAllInstances(taskDO.getServiceName());
for (Instance instance : allInstances) {
if (needDelete(instance.getMetadata(), taskDO)
&& !instanceKeySet.contains(composeInstanceKey(instance.getIp(), instance.getPort()))) {
&& !instanceKeys.contains(composeInstanceKey(instance.getIp(), instance.getPort()))) {

destNamingService.deregisterInstance(taskDO.getServiceName(), instance.getIp(), instance.getPort());
}
}
specialSyncEventBus.subscribe(taskDO, this::sync);
} catch (Exception e) {
log.error("sync task from consul to nacos was failed, taskId:{}", taskDO.getTaskId(), e);
log.error("Sync task from consul to nacos was failed, taskId:{}", taskDO.getTaskId(), e);
metricsManager.recordError(MetricsStatisticsType.SYNC_ERROR);
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
*/
package com.alibaba.nacossync.extension.impl;

import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.naming.NamingService;
import com.alibaba.nacos.api.naming.pojo.Instance;
import com.alibaba.nacossync.cache.SkyWalkerCacheServices;
Expand All @@ -29,10 +30,12 @@
import com.netflix.appinfo.InstanceInfo;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.util.CollectionUtils;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;

/**
* eureka
Expand All @@ -44,8 +47,7 @@
@NacosSyncService(sourceCluster = ClusterTypeEnum.EUREKA, destinationCluster = ClusterTypeEnum.NACOS)
public class EurekaSyncToNacosServiceImpl implements SyncService {

@Autowired
private MetricsManager metricsManager;
private final MetricsManager metricsManager;

private final EurekaServerHolder eurekaServerHolder;
private final SkyWalkerCacheServices skyWalkerCacheServices;
Expand All @@ -55,11 +57,14 @@ public class EurekaSyncToNacosServiceImpl implements SyncService {
private final SpecialSyncEventBus specialSyncEventBus;

@Autowired
public EurekaSyncToNacosServiceImpl(EurekaServerHolder eurekaServerHolder, SkyWalkerCacheServices skyWalkerCacheServices, NacosServerHolder nacosServerHolder, SpecialSyncEventBus specialSyncEventBus) {
public EurekaSyncToNacosServiceImpl(EurekaServerHolder eurekaServerHolder,
SkyWalkerCacheServices skyWalkerCacheServices, NacosServerHolder nacosServerHolder,
SpecialSyncEventBus specialSyncEventBus, MetricsManager metricsManager) {
this.eurekaServerHolder = eurekaServerHolder;
this.skyWalkerCacheServices = skyWalkerCacheServices;
this.nacosServerHolder = nacosServerHolder;
this.specialSyncEventBus = specialSyncEventBus;
this.metricsManager = metricsManager;
}

@Override
Expand All @@ -69,14 +74,10 @@ public boolean delete(TaskDO taskDO) {
specialSyncEventBus.unsubscribe(taskDO);
NamingService destNamingService = nacosServerHolder.get(taskDO.getDestClusterId(), taskDO.getNameSpace());
List<Instance> allInstances = destNamingService.getAllInstances(taskDO.getServiceName());
for (Instance instance : allInstances) {
if (needDelete(instance.getMetadata(), taskDO)) {
destNamingService.deregisterInstance(taskDO.getServiceName(), instance.getIp(), instance.getPort());
}
}
deleteAllInstance(taskDO, destNamingService, allInstances);

} catch (Exception e) {
log.error("delete task from eureka to nacos was failed, taskId:{}", taskDO.getTaskId(), e);
log.error("delete a task from eureka to nacos was failed, taskId:{}", taskDO.getTaskId(), e);
metricsManager.recordError(MetricsStatisticsType.DELETE_ERROR);
return false;
}
Expand All @@ -88,24 +89,27 @@ public boolean sync(TaskDO taskDO) {
try {
EurekaNamingService eurekaNamingService = eurekaServerHolder.get(taskDO.getSourceClusterId(), null);
NamingService destNamingService = nacosServerHolder.get(taskDO.getDestClusterId(), null);
List<InstanceInfo> instanceInfos =
eurekaNamingService.getApplications(taskDO.getServiceName());
if (instanceInfos != null) {
List<InstanceInfo> instanceInfos = eurekaNamingService.getApplications(taskDO.getServiceName());
List<Instance> allInstances = destNamingService.getAllInstances(taskDO.getServiceName());

if (Objects.nonNull(instanceInfos)) {
for (InstanceInfo instanceInfo : instanceInfos) {
if (needSync(instanceInfo.getMetadata())) {
if (InstanceInfo.InstanceStatus.UP.equals(instanceInfo.getStatus())) {
if (CollectionUtils.isEmpty(allInstances)
|| isExistInNacosInstance(allInstances, instanceInfo)) {
destNamingService.registerInstance(taskDO.getServiceName(),
buildSyncInstance(instanceInfo, taskDO));
} else {
log.info("Remove invalid service instance from Nacos, serviceName={}, Ip={}, port={}",
instanceInfo.getAppName(), instanceInfo.getIPAddr(), instanceInfo.getPort());
destNamingService.deregisterInstance(instanceInfo.getAppName(), instanceInfo.getIPAddr(),
instanceInfo.getPort());
}
}

}
} else {
metricsManager.recordError(MetricsStatisticsType.SYNC_ERROR);
throw new RuntimeException("trying to connect to the server failed");
deleteAllInstance(taskDO, destNamingService, allInstances);
}
specialSyncEventBus.subscribe(taskDO, this::sync);
} catch (Exception e) {
Expand All @@ -116,15 +120,29 @@ public boolean sync(TaskDO taskDO) {
return true;
}

private boolean isExistInNacosInstance(List<Instance> allInstances, InstanceInfo instanceInfo) {
return allInstances.stream().anyMatch(instance -> instance.getIp().equals(instanceInfo.getIPAddr())
&& instance.getPort() == instanceInfo.getPort());
}

private void deleteAllInstance(TaskDO taskDO, NamingService destNamingService, List<Instance> allInstances)
throws NacosException {
for (Instance instance : allInstances) {
if (needDelete(instance.getMetadata(), taskDO)) {
destNamingService.deregisterInstance(taskDO.getServiceName(), instance);
}

}
}

private Instance buildSyncInstance(InstanceInfo instance, TaskDO taskDO) {
Instance temp = new Instance();
temp.setIp(instance.getIPAddr());
temp.setPort(instance.getPort());
temp.setServiceName(instance.getAppName());
temp.setHealthy(true);

Map<String, String> metaData = new HashMap<>();
metaData.putAll(instance.getMetadata());
Map<String, String> metaData = new HashMap<>(instance.getMetadata());
metaData.put(SkyWalkerConstants.DEST_CLUSTERID_KEY, taskDO.getDestClusterId());
metaData.put(SkyWalkerConstants.SYNC_SOURCE_KEY,
skyWalkerCacheServices.getClusterType(taskDO.getSourceClusterId()).getCode());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public boolean delete(TaskDO taskDO) {
}
}
} catch (Exception e) {
log.error("delete task from nacos to nacos was failed, taskId:{}", taskDO.getTaskId(), e);
log.error("delete a task from nacos to nacos was failed, taskId:{}", taskDO.getTaskId(), e);
metricsManager.recordError(MetricsStatisticsType.DELETE_ERROR);
return false;
}
Expand Down
Loading

0 comments on commit c2a60a8

Please sign in to comment.