Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

支持Nacos Cluster级别的服务同步,基于Group级别的服务同步增强 #348

Open
iYaDongWang opened this issue Feb 26, 2024 · 1 comment

Comments

@iYaDongWang
Copy link

iYaDongWang commented Feb 26, 2024

nacos-sync默认是Service级别的服务同步,需要在控制台配置ServiceName和GroupName才可以进行服务同步。
pr304之后也支持基于Ns-Group级别的服务同步。
image
通过将ServiceName配置为”ALL“以实现Group级别的同步。
这个pr给了我很大的帮助,我在这个基础上做了一些拓展,让nacos-sync支持Cluster级别的服务同步,只需要在新建同步任务的时候将ServiceName和GroupName均配置为”ALL“即可。

主要改动点在com.alibaba.nacossync.timer.CheckRunningStatusAllThread#run

改动前逻辑:根据GroupName查询服务列表,然后进行同步
改动后逻辑:判断GroupName是否为ALL,如果为false,逻辑和之前一致,根据GroupName查询服务列表,然后进行同步;
如果为true,通过增强后的NacosEnhanceNamingService查询集群下所有服务列表,然后进行同步。
对原有功能无影响。


    public void run() {
        Long startTime = System.currentTimeMillis();
        try {
            // 查找所有ServiceName为”ALL“的任务
            List<TaskDO> taskDOS = taskAccessService.findServiceNameIsNull()
                    .stream().filter(t -> t.getStatus() == null || t.getStatus() == 0)
                    .collect(Collectors.toList());
            if (CollectionUtils.isEmpty(taskDOS)) {
                return;
            }
            for (TaskDO taskDO : taskDOS) {
                // 改动点:根据GroupName查询Group下所有服务 =》 如果GroupName为”ALL“,查询集群下所有服务,根据GroupName进行分组;如果不为”ALL“,则逻辑不变。
                Map<String, List<String>> serviceNameListGroupByGroupName = getServiceNameListGroupByGroupName(taskDO);
                if (serviceNameListGroupByGroupName.isEmpty()) {
                    continue;
                }
                for (Map.Entry<String, List<String>> entry : serviceNameListGroupByGroupName.entrySet()) {
                    taskDO.setGroupName(entry.getKey());
                    List<String> serviceNameList = entry.getValue();

                    //如果是null,证明此时没有处理完成
                    List<String> filterService = serviceNameList.stream()
                            .filter(serviceName -> skyWalkerCacheServices.getFinishedTask(taskDO.getTaskId() + serviceName ) == null)
                            .collect(Collectors.toList());

                    if (CollectionUtils.isEmpty(filterService)) {
                        continue;
                    }

                    // 当删除任务后,此时任务的状态为DELETE,不会执行数据同步
                    if (TaskStatusEnum.SYNC.getCode().equals(taskDO.getTaskStatus())) {
                        fastSyncHelper.syncWithThread(taskDO, filterService);
                    }
                }
            }
        }catch (Exception e) {
            log.warn("CheckRunningStatusThread Exception ", e);
        }
        metricsManager.record(MetricsStatisticsType.DISPATCHER_TASK, System.currentTimeMillis() - startTime);
    }

    private Map<String, List<String>> getServiceNameListGroupByGroupName(TaskDO taskDO) throws NacosException {
        Map<String, List<String>> map = new HashMap<>();
        NamingService namingService = nacosServerHolder.get(taskDO.getSourceClusterId());
        if (SkyWalkerConstants.ALL.equals(taskDO.getGroupName())) {
            // 如果serviceName和GroupName都是ALL,查询集群下所有服务
            NacosEnhanceNamingService enhanceNamingService = new NacosEnhanceNamingService(namingService);
            CatalogServiceResult catalogServiceResult = enhanceNamingService.catalogServices(null, null);
            if (catalogServiceResult == null || catalogServiceResult.getCount() <= 0) {
                return map;
            }
            List<ServiceView> serviceList = catalogServiceResult.getServiceList();
            return serviceList.stream()
                    .collect(Collectors.groupingBy(ServiceView::getGroupName, Collectors.mapping(ServiceView::getName, Collectors.toList())));

        }
        // GroupName不为ALL,查询该Group下的所有服务
        ListView<String> servicesOfServer = namingService.getServicesOfServer(0, Integer.MAX_VALUE,
                taskDO.getGroupName());
        map.put(taskDO.getGroupName(), servicesOfServer.getData());
        return map;
    }
@iYaDongWang
Copy link
Author

iYaDongWang commented Feb 26, 2024

#344

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant