Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support batchRegisterInstance of nacos2.x for develop branch #336

Open
wants to merge 24 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
7cb94d5
support ephemeral instances synchronization
zrlw Oct 30, 2023
98daf16
fix handlerEphemeralInstance and handlerPersistenceInstance
zrlw Oct 30, 2023
bf0223b
fix comment
zrlw Oct 31, 2023
fe94b4f
remove tailing white spaces
zrlw Oct 31, 2023
2edd6a6
support dubbo application level instance synchronization
zrlw Nov 2, 2023
4a5210c
fix comment
zrlw Nov 2, 2023
74b3670
fix interface name extraction logic
zrlw Nov 2, 2023
8a1c86d
support consul,eureka,zookeeper sync to nacos2.x
zrlw Nov 5, 2023
7de3a2a
optimize delete function
zrlw Nov 5, 2023
ca11fa1
fix nacosServiceNameMap remove logic
zrlw Nov 5, 2023
ec16731
fix register instance variable at processEvent
zrlw Nov 6, 2023
17ca7e1
optimize revision metadata synchronization
zrlw Nov 9, 2023
14e73fa
get interface names by querying metadata center
zrlw Nov 16, 2023
b1c4106
fix queryMappingMetaData
zrlw Nov 16, 2023
2810031
change mapping metadata search mode to accurate
zrlw Nov 17, 2023
1f04860
optimize logic to reduce metadata sync times
zrlw Nov 19, 2023
ea869b3
fix mapping metadata cache
zrlw Nov 19, 2023
2339d27
add cluster layer to mapping metadata
zrlw Nov 19, 2023
b06a761
fix cluster mapping metadata cache
zrlw Nov 19, 2023
e688b68
update NacosClient version to 2.3.0
zrlw Dec 1, 2023
b9dee4e
update lombok version to be compatible with jdk17+
zrlw May 15, 2024
520e7f7
collect interfaces by querying mapping metadata if revision not existed
zrlw May 15, 2024
07ebf11
Merge remote-tracking branch 'origin/develop'
zrlw May 15, 2024
59e0c31
optimize method delete of NacosSyncToNacosServiceImpl
zrlw May 16, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ The path to the target file:

``` xml

nacos-sync/nacossync-distribution/target/nacos-sync-0.4.8.tar.gz
nacos-sync/nacossync-distribution/target/nacos-sync-0.4.9-batchRegister.tar.gz

```

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ class Layout extends React.Component {
<Header />
<Row className="layout">
<Col fixedSpan="9" className="nav-bar">
<h1 className="title">Nacos-Sync 0.4.8</h1>
<h1 className="title">Nacos-Sync 0.4.9-batchRegister</h1>
<Menu />
</Col>
<Col className="main-panel">{this.props.children}</Col>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,12 @@ private List<Predicate> getPredicates(Root<ClusterDO> root, CriteriaBuilder crit
public int findClusterLevel(String sourceClusterId){
ClusterDO clusterDO = clusterRepository.findByClusterId(sourceClusterId);
if (clusterDO != null) {
return clusterDO.getClusterLevel();
Integer level = clusterDO.getClusterLevel();
if (level == null) {
//此字段未设置时取默认0按普通集群处理(目前控制台新增集群的代码并未设置此字段)
return 0;
}
return level;
}
return -1;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,24 +13,30 @@
package com.alibaba.nacossync.extension.holder;

import com.alibaba.nacos.api.PropertyKeyConst;
import com.alibaba.nacos.api.config.ConfigFactory;
import com.alibaba.nacos.api.config.ConfigService;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.naming.NamingFactory;
import com.alibaba.nacos.api.naming.NamingService;
import com.alibaba.nacos.client.naming.NacosNamingService;
import com.alibaba.nacos.client.naming.remote.NamingClientProxyDelegate;
import com.alibaba.nacos.client.naming.remote.http.NamingHttpClientProxy;
import com.alibaba.nacossync.constant.SkyWalkerConstants;
import com.alibaba.nacossync.dao.ClusterAccessService;
import com.alibaba.nacossync.dao.TaskAccessService;
import com.alibaba.nacossync.pojo.model.ClusterDO;
import com.alibaba.nacossync.pojo.model.TaskDO;
import com.google.common.base.Joiner;
import java.lang.reflect.Field;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;
import lombok.extern.slf4j.Slf4j;
import org.apache.logging.log4j.util.Strings;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.ReflectionUtils;

/**
* @author paderlol
Expand All @@ -44,7 +50,11 @@ public class NacosServerHolder extends AbstractServerHolderImpl<NamingService> {

private final TaskAccessService taskAccessService;

private static ConcurrentHashMap<String,NamingService> globalNameService = new ConcurrentHashMap<>(16);
private static ConcurrentHashMap<String,NamingService> globalNamingService = new ConcurrentHashMap<>(16);

private static ConcurrentHashMap<String,NamingHttpClientProxy> globalNamingHttpProxy = new ConcurrentHashMap<>(16);

private static ConcurrentHashMap<String,ConfigService> globalConfigService = new ConcurrentHashMap<>(16);

public NacosServerHolder(ClusterAccessService clusterAccessService, TaskAccessService taskAccessService) {
this.clusterAccessService = clusterAccessService;
Expand All @@ -68,29 +78,68 @@ NamingService createServer(String clusterId, Supplier<String> serverAddressSuppl
String serverList = Joiner.on(",").join(allClusterConnectKey);
Properties properties = new Properties();
properties.setProperty(PropertyKeyConst.SERVER_ADDR, serverList);
properties.setProperty(PropertyKeyConst.NAMESPACE, Optional.ofNullable(clusterDO.getNamespace()).orElse(
Strings.EMPTY));
Optional.ofNullable(clusterDO.getUserName()).ifPresent(value ->
properties.setProperty(PropertyKeyConst.USERNAME, value)
);

Optional.ofNullable(clusterDO.getPassword()).ifPresent(value ->
properties.setProperty(PropertyKeyConst.PASSWORD, value)
);
NamingService namingService = NamingFactory.createNamingService(properties);
globalNameService.put(clusterId,namingService);
return namingService;

// configService不能设置namespace,否则获取不到dubbo服务元数据
globalConfigService.computeIfAbsent(newClusterId, id -> {
try {
return ConfigFactory.createConfigService(properties);
} catch (NacosException e) {
log.error("start config service fail,clusterId:{}", id, e);
return null;
}
});

properties.setProperty(PropertyKeyConst.NAMESPACE, Optional.ofNullable(clusterDO.getNamespace()).orElse(
Strings.EMPTY));
return globalNamingService.computeIfAbsent(newClusterId, id -> {
try {
NacosNamingService namingService = (NacosNamingService) NamingFactory.createNamingService(properties);

// clientProxy
final Field clientProxyField = ReflectionUtils.findField(NacosNamingService.class, "clientProxy");
assert clientProxyField != null;
ReflectionUtils.makeAccessible(clientProxyField);
NamingClientProxyDelegate clientProxy = (NamingClientProxyDelegate) ReflectionUtils.getField(clientProxyField, namingService);

// httpClientProxy
final Field httpClientProxyField = ReflectionUtils.findField(NamingClientProxyDelegate.class, "httpClientProxy");
assert httpClientProxyField != null;
ReflectionUtils.makeAccessible(httpClientProxyField);
NamingHttpClientProxy httpClientProxy = (NamingHttpClientProxy) ReflectionUtils.getField(httpClientProxyField, clientProxy);
globalNamingHttpProxy.put(id, httpClientProxy);

return namingService;
} catch (NacosException e) {
log.error("start naming service fail,clusterId:{}", id, e);
return null;
}
});
}

/**
* Get NamingService for different clients
* @param clusterId clusterId
* @return Returns Naming Service objects for different clusters
*/
public NamingService getNameService(String clusterId){
return globalNameService.get(clusterId);
public NamingService getNamingService(String clusterId){
return globalNamingService.get(clusterId);
}


public NamingHttpClientProxy getNamingHttpProxy(String clusterId){
return globalNamingHttpProxy.get(clusterId);
}

public ConfigService getConfigService(String clusterId) {
return globalConfigService.get(clusterId);
}

public NamingService getSourceNamingService(String taskId, String sourceClusterId) {
String key = taskId + sourceClusterId;
return serviceMap.computeIfAbsent(key, k->{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.naming.NamingService;
import com.alibaba.nacos.api.naming.pojo.Instance;
import com.alibaba.nacos.common.utils.CollectionUtils;
import com.alibaba.nacossync.cache.SkyWalkerCacheServices;
import com.alibaba.nacossync.constant.ClusterTypeEnum;
import com.alibaba.nacossync.constant.MetricsStatisticsType;
Expand All @@ -32,6 +33,8 @@
import com.ecwid.consul.v1.QueryParams;
import com.ecwid.consul.v1.Response;
import com.ecwid.consul.v1.health.model.HealthService;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -77,16 +80,20 @@ public boolean delete(TaskDO taskDO) {
specialSyncEventBus.unsubscribe(taskDO);

NamingService destNamingService = nacosServerHolder.get(taskDO.getDestClusterId());
List<Instance> allInstances = destNamingService.getAllInstances(taskDO.getServiceName(),
NacosUtils.getGroupNameOrDefault(taskDO.getGroupName()));
String servideName = taskDO.getServiceName();
String groupName = NacosUtils.getGroupNameOrDefault(taskDO.getGroupName());
List<Instance> allInstances = destNamingService.getAllInstances(servideName, groupName);
List<Instance> needDeregisterInstances = new ArrayList<>();
for (Instance instance : allInstances) {
if (needDelete(instance.getMetadata(), taskDO)) {

destNamingService.deregisterInstance(taskDO.getServiceName(),
NacosUtils.getGroupNameOrDefault(taskDO.getGroupName()), instance.getIp(), instance.getPort());
NacosSyncToNacosServiceImpl.removeUnwantedAttrsForNacosRedo(instance);
log.debug("需要反注册的实例: {}", instance);
needDeregisterInstances.add(instance);
}
}

if (CollectionUtils.isNotEmpty(needDeregisterInstances)) {
NacosSyncToNacosServiceImpl.doDeregisterInstance(taskDO, destNamingService, servideName, groupName, needDeregisterInstances);
}
} catch (Exception e) {
log.error("delete task from consul to nacos was failed, taskId:{}", taskDO.getTaskId(), e);
metricsManager.recordError(MetricsStatisticsType.DELETE_ERROR);
Expand Down Expand Up @@ -117,26 +124,47 @@ public boolean sync(TaskDO taskDO, Integer index) {

private void cleanAllOldInstance(TaskDO taskDO, NamingService destNamingService, Set<String> instanceKeys)
throws NacosException {
List<Instance> allInstances = destNamingService.getAllInstances(taskDO.getServiceName());
String serviceName = taskDO.getServiceName();
String groupName = NacosUtils.getGroupNameOrDefault(taskDO.getGroupName());
List<Instance> allInstances = destNamingService.getAllInstances(serviceName, groupName);
List<Instance> needDeregisterInstances = new ArrayList<>();
for (Instance instance : allInstances) {
if (needDelete(instance.getMetadata(), taskDO)
&& !instanceKeys.contains(composeInstanceKey(instance.getIp(), instance.getPort()))) {

destNamingService.deregisterInstance(taskDO.getServiceName(),
NacosUtils.getGroupNameOrDefault(taskDO.getGroupName()), instance.getIp(), instance.getPort());
NacosSyncToNacosServiceImpl.removeUnwantedAttrsForNacosRedo(instance);
log.debug("需要反注册的实例: {}", instance);
needDeregisterInstances.add(instance);
}
}
if (CollectionUtils.isNotEmpty(needDeregisterInstances)) {
NacosSyncToNacosServiceImpl.doDeregisterInstance(taskDO, destNamingService, serviceName, groupName, needDeregisterInstances);
}
}

private void overrideAllInstance(TaskDO taskDO, NamingService destNamingService,
List<HealthService> healthServiceList, Set<String> instanceKeys) throws NacosException {
String serviceName = taskDO.getServiceName();
String groupName = NacosUtils.getGroupNameOrDefault(taskDO.getGroupName());
List<Instance> needRegisterInstances = new ArrayList<>();
for (HealthService healthService : healthServiceList) {
if (needSync(ConsulUtils.transferMetadata(healthService.getService().getTags()))) {
destNamingService.registerInstance(taskDO.getServiceName(),
NacosUtils.getGroupNameOrDefault(taskDO.getGroupName()),
buildSyncInstance(healthService, taskDO));
Instance syncInstance = buildSyncInstance(healthService, taskDO);
log.debug("需要从源集群同步到目标集群的实例:{}", syncInstance);
needRegisterInstances.add(syncInstance);
instanceKeys.add(composeInstanceKey(healthService.getService().getAddress(),
healthService.getService().getPort()));
healthService.getService().getPort()));
}
}
if (CollectionUtils.isNotEmpty(needRegisterInstances)) {
if (needRegisterInstances.get(0).isEphemeral()) {
//批量注册
log.debug("将源集群指定service的临时实例全量同步到目标集群: {}", taskDO);
destNamingService.batchRegisterInstance(serviceName, groupName, needRegisterInstances);
} else {
for (Instance instance : needRegisterInstances) {
log.debug("从源集群同步到目标集群的持久实例:{}", instance);
destNamingService.registerInstance(serviceName, groupName, instance);
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import com.alibaba.nacossync.pojo.model.TaskDO;
import com.alibaba.nacossync.util.NacosUtils;
import com.netflix.appinfo.InstanceInfo;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -118,13 +120,29 @@ public boolean sync(TaskDO taskDO,Integer index) {

private void addValidInstance(TaskDO taskDO, NamingService destNamingService, List<InstanceInfo> eurekaInstances)
throws NacosException {
String serviceName = taskDO.getServiceName();
String groupName = NacosUtils.getGroupNameOrDefault(taskDO.getGroupName());
List<Instance> needRegisterInstances = new ArrayList<>();
for (InstanceInfo instance : eurekaInstances) {
if (needSync(instance.getMetadata())) {
log.info("Add service instance from Eureka, serviceName={}, Ip={}, port={}",
instance.getAppName(), instance.getIPAddr(), instance.getPort());
destNamingService.registerInstance(taskDO.getServiceName(),
NacosUtils.getGroupNameOrDefault(taskDO.getGroupName()), buildSyncInstance(instance,
taskDO));
instance.getAppName(), instance.getIPAddr(), instance.getPort());
Instance syncInstance = buildSyncInstance(instance, taskDO);
log.debug("需要从源集群同步到目标集群的实例:{}", syncInstance);
needRegisterInstances.add(syncInstance);
}
}
if (CollectionUtils.isEmpty(needRegisterInstances)) {
return;
}
if (needRegisterInstances.get(0).isEphemeral()) {
// 批量注册
log.debug("将源集群指定service的临时实例全量同步到目标集群: {}", taskDO);
destNamingService.batchRegisterInstance(serviceName, groupName, needRegisterInstances);
} else {
for (Instance instance : needRegisterInstances) {
log.debug("从源集群同步到目标集群的持久实例:{}", instance);
destNamingService.registerInstance(serviceName, groupName, instance);
}
}
}
Expand All @@ -135,26 +153,40 @@ private void deleteAllInstanceFromEureka(TaskDO taskDO, NamingService destNaming
if (CollectionUtils.isEmpty(eurekaInstances)) {
return;
}
List<Instance> needDeregisterInstances = new ArrayList<>();
for (InstanceInfo instance : eurekaInstances) {
if (needSync(instance.getMetadata())) {
log.info("Delete service instance from Eureka, serviceName={}, Ip={}, port={}",
instance.getAppName(), instance.getIPAddr(), instance.getPort());
destNamingService.deregisterInstance(taskDO.getServiceName(),
NacosUtils.getGroupNameOrDefault(taskDO.getGroupName()), buildSyncInstance(instance, taskDO));
Instance needDeregisterInstance = buildSyncInstance(instance, taskDO);
log.debug("需要反注册的实例: {}", needDeregisterInstance);
needDeregisterInstances.add(needDeregisterInstance);
}
}
if (CollectionUtils.isEmpty(needDeregisterInstances)) {
return;
}
NacosSyncToNacosServiceImpl.doDeregisterInstance(taskDO, destNamingService, taskDO.getServiceName(),
NacosUtils.getGroupNameOrDefault(taskDO.getGroupName()), needDeregisterInstances);
}

private void removeInvalidInstance(TaskDO taskDO, NamingService destNamingService,
List<InstanceInfo> eurekaInstances, List<Instance> nacosInstances) throws NacosException {
List<Instance> needDeregisterInstances = new ArrayList<>();
for (Instance instance : nacosInstances) {
if (!isExistInEurekaInstance(eurekaInstances, instance) && needDelete(instance.getMetadata(), taskDO)) {
log.info("Remove invalid service instance from Nacos, serviceName={}, Ip={}, port={}",
instance.getServiceName(), instance.getIp(), instance.getPort());
destNamingService.deregisterInstance(taskDO.getServiceName(),
NacosUtils.getGroupNameOrDefault(taskDO.getGroupName()), instance.getIp(), instance.getPort());
NacosSyncToNacosServiceImpl.removeUnwantedAttrsForNacosRedo(instance);
log.debug("需要反注册的实例: {}", instance);
needDeregisterInstances.add(instance);
}
}
if (CollectionUtils.isEmpty(needDeregisterInstances)) {
return;
}
NacosSyncToNacosServiceImpl.doDeregisterInstance(taskDO, destNamingService, taskDO.getServiceName(),
NacosUtils.getGroupNameOrDefault(taskDO.getGroupName()), needDeregisterInstances);
}

private boolean isExistInEurekaInstance(List<InstanceInfo> eurekaInstances, Instance nacosInstance) {
Expand All @@ -165,20 +197,26 @@ private boolean isExistInEurekaInstance(List<InstanceInfo> eurekaInstances, Inst

private void deleteAllInstance(TaskDO taskDO, NamingService destNamingService, List<Instance> allInstances)
throws NacosException {
List<Instance> needDeregisterInstances = new ArrayList<>();
for (Instance instance : allInstances) {
if (needDelete(instance.getMetadata(), taskDO)) {
destNamingService.deregisterInstance(taskDO.getServiceName(),
NacosUtils.getGroupNameOrDefault(taskDO.getGroupName()), instance);
NacosSyncToNacosServiceImpl.removeUnwantedAttrsForNacosRedo(instance);
log.debug("需要反注册的实例: {}", instance);
needDeregisterInstances.add(instance);
}

}
if (CollectionUtils.isEmpty(needDeregisterInstances)) {
return;
}
NacosSyncToNacosServiceImpl.doDeregisterInstance(taskDO, destNamingService, taskDO.getServiceName(),
NacosUtils.getGroupNameOrDefault(taskDO.getGroupName()), needDeregisterInstances);
}

private Instance buildSyncInstance(InstanceInfo instance, TaskDO taskDO) {
Instance temp = new Instance();
temp.setIp(instance.getIPAddr());
temp.setPort(instance.getPort());
temp.setServiceName(instance.getAppName());
//查询nacos集群实例返回的serviceName含组名前缀,但Nacos2服务端检查批量注册请求serviceName参数时不能包含组名前缀,因此注册实例到目标集群时不再设置serviceName。
temp.setHealthy(true);

Map<String, String> metaData = new HashMap<>(instance.getMetadata());
Expand Down
Loading