diff --git a/nacossync-worker/src/main/java/com/alibaba/nacossync/extension/impl/NacosSyncToNacosServiceImpl.java b/nacossync-worker/src/main/java/com/alibaba/nacossync/extension/impl/NacosSyncToNacosServiceImpl.java index 41147b3..2c956f0 100644 --- a/nacossync-worker/src/main/java/com/alibaba/nacossync/extension/impl/NacosSyncToNacosServiceImpl.java +++ b/nacossync-worker/src/main/java/com/alibaba/nacossync/extension/impl/NacosSyncToNacosServiceImpl.java @@ -168,6 +168,7 @@ public boolean delete(TaskDO taskDO) { String taskId = taskDO.getTaskId(); NamingService sourceNamingService = nacosServerHolder.getSourceNamingService(taskId, taskDO.getSourceClusterId()); + String groupName = getGroupNameOrDefault(taskDO.getGroupName()); int level = clusterAccessService.findClusterLevel(taskDO.getSourceClusterId()); if ("ALL".equals(taskDO.getServiceName())) { @@ -177,26 +178,28 @@ public boolean delete(TaskDO taskDO) { } //处理group级别的服务任务删除 - ListView servicesOfServer = sourceNamingService.getServicesOfServer(0, Integer.MAX_VALUE, - getGroupNameOrDefault(taskDO.getGroupName())); + ListView servicesOfServer = sourceNamingService.getServicesOfServer( + 0, Integer.MAX_VALUE, groupName); List serviceNames = servicesOfServer.getData(); for (String serviceName : serviceNames) { String operationKey = taskId + serviceName; skyWalkerCacheServices.removeFinishedTask(operationKey); allSyncTaskMap.remove(operationKey); + sourceNamingService.unsubscribe(serviceName, groupName, listenerMap.remove(operationKey)); + + //popNamingService方法需要传入serviceName属性为实际服务名的TaskDO对象 TaskDO task = new TaskDO(); BeanUtils.copyProperties(taskDO, task); task.setServiceName(serviceName); - task.setOperationId(taskDO.getTaskId() + serviceName); + task.setOperationId(operationKey); NamingService destNamingService = popNamingService(task); - sourceNamingService.unsubscribe(serviceName, getGroupNameOrDefault(task.getGroupName()), - listenerMap.remove(task.getTaskId() + serviceName)); if (Objects.isNull(destNamingService)) { - log.warn("task {} not found NamingService", task.getTaskId() + serviceName); + log.warn("task {} not found NamingService", operationKey); continue; } - List sourceInstances = sourceNamingService.getAllInstances(serviceName, - getGroupNameOrDefault(task.getGroupName()), new ArrayList<>(), false); + + List sourceInstances = sourceNamingService.getAllInstances( + serviceName, groupName, new ArrayList<>(), false); List needDeregisterInstances = new ArrayList<>(); for (Instance instance : sourceInstances) { if (needSync(instance.getMetadata(), level, taskDO.getDestClusterId())) { @@ -206,8 +209,8 @@ public boolean delete(TaskDO taskDO) { } } if (CollectionUtils.isNotEmpty(needDeregisterInstances)) { - doDeregisterInstance(taskDO, destNamingService, serviceName, - getGroupNameOrDefault(task.getGroupName()), needDeregisterInstances); + doDeregisterInstance(taskDO, destNamingService, + serviceName, groupName, needDeregisterInstances); } } } else { @@ -218,29 +221,27 @@ public boolean delete(TaskDO taskDO) { return false; } - NamingService destNamingService = popNamingService(taskDO); String serviceName = taskDO.getServiceName(); - sourceNamingService.unsubscribe(serviceName, getGroupNameOrDefault(taskDO.getGroupName()), - listenerMap.remove(operationId)); + sourceNamingService.unsubscribe(serviceName, groupName, listenerMap.remove(operationId)); + + NamingService destNamingService = popNamingService(taskDO); if (Objects.isNull(destNamingService)) { - log.warn("task {} not found NamingService", taskId + serviceName); + log.warn("task {} not found NamingService", operationId); return false; } - List sourceInstances = sourceNamingService.getAllInstances(serviceName, - getGroupNameOrDefault(taskDO.getGroupName()), new ArrayList<>(), false); - + List sourceInstances = sourceNamingService.getAllInstances( + serviceName, groupName, new ArrayList<>(), false); List needDeregisterInstances = new ArrayList<>(); for (Instance instance : sourceInstances) { - if (needSync(instance.getMetadata(), level, taskDO.getDestClusterId())){ + if (needSync(instance.getMetadata(), level, taskDO.getDestClusterId())) { removeUnwantedAttrsForNacosRedo(instance); log.debug("需要反注册的实例: {}", instance); needDeregisterInstances.add(instance); } } if (CollectionUtils.isNotEmpty(needDeregisterInstances)) { - doDeregisterInstance(taskDO, destNamingService, serviceName, - getGroupNameOrDefault(taskDO.getGroupName()), needDeregisterInstances); + doDeregisterInstance(taskDO, destNamingService, serviceName, groupName, needDeregisterInstances); } // 移除任务 skyWalkerCacheServices.removeFinishedTask(operationId);