Skip to content

Commit

Permalink
fix batch register
Browse files Browse the repository at this point in the history
  • Loading branch information
123 committed Oct 27, 2023
1 parent e35ffa5 commit cf8d244
Showing 1 changed file with 13 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ private void doSync(String taskId, TaskDO taskDO, NamingService sourceNamingServ

int level = clusterAccessService.findClusterLevel(taskDO.getSourceClusterId());
if (CollectionUtils.isNotEmpty(sourceInstances) && sourceInstances.get(0).isEphemeral()) {
//处理实例的批量数据同步,批量注册接口需要传递全量服务实例
//处理实例的批量数据同步,批量注册接口需要传递直接注册到源集群的全量实例
handlerEphemeralInstance(taskDO, destNamingService, sourceInstances, level);
} else if (CollectionUtils.isEmpty(sourceInstances)) {
//如果当前源集群是空的 ,那么注销目标集群中来自当前源集群的同步实例
Expand All @@ -297,37 +297,24 @@ private void doSync(String taskId, TaskDO taskDO, NamingService sourceNamingServ
private void handlerEphemeralInstance(TaskDO taskDO, NamingService destNamingService,
List<Instance> sourceInstances, int level) throws NacosException {

//nacos2批量注册接口是全量更新,所以要构建全量实例列表
List<Instance> fullSyncInstances = new ArrayList<>();
//构建源集群需要同步的全量实例列表(nacos2批量注册接口采用全量更新方式)
List<Instance> sourceClusterSyncInstances = new ArrayList<>();

//直接注册到源集群的实例:排除从其他集群同步过来的实例,以避免A->B->A循环同步实例
List<Instance> needSourceClusterSyncInstances = sourceInstances.stream()
//直接注册到源集群的全量实例列表:排除从其他集群同步过来的实例,以避免A->B->A循环同步实例
List<Instance> sourceClusterDirectlyRegisteredInstances = sourceInstances.stream()
.filter(instance -> !isSyncInstance(instance)).collect(Collectors.toList());;

for (Instance instance : needSourceClusterSyncInstances) {
for (Instance instance : sourceClusterDirectlyRegisteredInstances) {
Instance sourceClusterSyncInstance = buildSyncInstance(instance, taskDO);
log.debug("从源集群同步到目标集群的实例:{}", sourceClusterSyncInstance);
fullSyncInstances.add(sourceClusterSyncInstance);
log.debug("需要从源集群同步到目标集群的实例:{}", sourceClusterSyncInstance);
sourceClusterSyncInstances.add(sourceClusterSyncInstance);
}

List<Instance> destAllInstances = destNamingService.getAllInstances(taskDO.getServiceName(),
getGroupNameOrDefault(taskDO.getGroupName()), new ArrayList<>(), true);

//直接注册到目标集群的实例以及其他集群同步到目标集群的实例(排除从源集群同步的实例)
List<Instance> needAddedToFullSyncInstances = destAllInstances.stream()
.filter(instance -> !isSyncFromCluster(instance, taskDO.getSourceClusterId())).collect(Collectors.toList());

for (Instance instance : needAddedToFullSyncInstances) {
removeUnwantedParamsForBatchOperation(instance);
log.debug("保留直接注册到目标集群的实例以及其他集群同步到目标集群的实例:{}", instance);
fullSyncInstances.add(instance);
}

if (CollectionUtils.isNotEmpty(fullSyncInstances)) {
if (CollectionUtils.isNotEmpty(sourceClusterSyncInstances)) {
//批量注册
log.debug("全量批量注册: {}", taskDO);
log.debug("直接注册到源集群的全量实例批量同步到目标集群: {}", taskDO);
destNamingService.batchRegisterInstance(taskDO.getServiceName(), getGroupNameOrDefault(taskDO.getGroupName()),
fullSyncInstances);
sourceClusterSyncInstances);
} else {
//注销目标集群中来自当前源集群的同步实例
processDeRegisterInstances(taskDO, destNamingService);
Expand All @@ -342,14 +329,6 @@ private boolean isSyncInstance(Instance instance) {
return false;
}

private boolean isSyncFromCluster(Instance instance, String sourceClusterId) {
if (instance.getMetadata() != null) {
String sourceClusterKey = instance.getMetadata().get(SkyWalkerConstants.SOURCE_CLUSTERID_KEY);
return sourceClusterKey != null && sourceClusterKey.equals(sourceClusterId);
}
return false;
}

/**
* 当源集群需要同步的实例个数为0时,目标集群如果还有源集群同步的实例,执行反注册
*
Expand All @@ -364,11 +343,11 @@ private void processDeRegisterInstances(TaskDO taskDO, NamingService destNamingS
if (CollectionUtils.isEmpty(destInstances)) {
return;
}
// 筛选当前源集群同步的实例
// 筛选当前源集群的同步实例
List<Instance> newDestInstance = deRegisterFilter(destInstances, taskDO.getSourceClusterId());
if (CollectionUtils.isNotEmpty(newDestInstance)) {
//批量反注册当前源集群同步的实例
log.debug("批量反注册: {}", taskDO);
log.debug("批量反注册来自源集群的同步实例: {}", taskDO);
destNamingService.batchDeregisterInstance(taskDO.getServiceName(),
getGroupNameOrDefault(taskDO.getGroupName()), newDestInstance);
}
Expand Down

0 comments on commit cf8d244

Please sign in to comment.