Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RFC] Refactor the master using Golang. #1374

Open
workingloong opened this issue Dec 4, 2024 · 0 comments
Open

[RFC] Refactor the master using Golang. #1374

workingloong opened this issue Dec 4, 2024 · 0 comments

Comments

@workingloong
Copy link
Collaborator

workingloong commented Dec 4, 2024

Background

Currently, we are using Python to implement the DLRover master. However, when compared with Golang, it has several disadvantages as follows:

  • The k8s Python client has inherent limitations and lags behind the Go (and Java) clients. For instance, it lacks an informer implementation (refer to Enhance/Replace k8s python client. #1291).
  • There are more Golang packages available for custom resource definitions (CRDs) such as volcano/PodGroup and kubeflow/TrainingJob. In contrast, we can only use a Python dictionary to contain it. Moreover, the code readability of using a Python dictionary is rather poor, as can be seen in the code at
    job = self._retry_to_get_job(k8s_client)
    self.job_uuid = self._get_job_uuid(job)
    if "distributionStrategy" in job["spec"]:
    self.distribution_strategy = job["spec"]["distributionStrategy"]
    limit_config = job["spec"].get("resourceLimits", {})
    self.resource_limits.cpu = convert_cpu_to_decimal(
    limit_config.get("cpu", DefaultResourceLimits.CPU_LIMIT)
    )
    self.resource_limits.memory = convert_memory_to_byte(
    limit_config.get("memory", DefaultResourceLimits.MEMORY_LIMIT)
    )
    self.resource_limits.gpu_num = int(
    limit_config.get("gpu", DefaultResourceLimits.GPU_LIMIT)
    )
    self.optimize_mode = job["spec"].get(
    "optimizeMode", OptimizeMode.SINGLE_JOB
    )
    for replica, spec in job["spec"]["replicaSpecs"].items():
    if replica == NodeType.WORKER:
    restart_policy = spec["template"]["spec"].get(
    "restartPolicy", ""
    )
    self.relaunch_always = restart_policy == "Always"
    priority = spec.get("priority", "")
    num = int(spec.get("replicas", 0))
    container = spec["template"]["spec"]["containers"][0]
    resources = container.get("resources", {})
    requests = resources.get("requests", {})
    cpu = convert_cpu_to_decimal(requests.get("cpu", 0))
    if "memory" in requests:
    memory = convert_memory_to_mb(requests["memory"])
    else:
    memory = 0
    gpu_type = None
    gpu_num = 0
    for k, v in requests.items():
    if "nvidia.com" in k:
    gpu_type = k
    gpu_num = int(v)
    group_resource = NodeGroupResource(
    num,
    NodeResource(
    cpu=cpu,
    memory=memory,
    gpu_type=gpu_type,
    gpu_num=gpu_num,
    priority=priority,
    ),
    )
    restart_count = int(spec.get("restartCount", 3))
    auto_scale = parse_bool(str(spec.get("autoScale", "true")))
    restart_timeout = int(spec.get("restartTimeout", 0))
    critical_nodes = spec.get("criticalNodes", "")
    self.node_args[replica] = NodeArgs(
    .
  • Due to the Global Interpreter Lock (GIL), the execution efficiency of Python is lower than Golang. With Golang, we can utilize go-routine to accelerate the process of launching thousands of Pods. We will replace the gRPC with an HTTP service as the latter is more compatible. Additionally, the Golang HTTP implementation is more efficient than Python
  • The master will be handling an increasing amount of data analysis work, like fault diagnosis, where the execution speed is crucial.

Design

In the first stage, the Golang master will only support the Pytorch allreduce job and incorporate elastic training service as well as elastic scheduling.

Elastic Training Service

  • Node Discovery: The node discovery mechanism gathers information about the alive nodes where the dlrover-run started. It then assigns a node rank and the number of nodes to each started node. Furthermore, it can sort the node rank based on the switch topology of the nodes to optimize traffic communication.
  • Training Metrics Collector: This collector gathers the runtime metrics of each node. These runtime metrics includes the CPU and GPU workload, TCP and RDMA traffic, and the training profiling data obtained from the training frameworks.
  • Training Detector: The training detection function checks whether any exceptions occur during the training process, such as hangs or breakdowns. If an exception does happen, it can identify the root cause of the issue, like whether it's due to the breakdown of a node's GPU or network.

Elastic Scheduling

  • Auto Scaler: The auto scaler generates a scaling plan with the resource configuration of the nodes, including details like the number of nodes, GPU type, and so on. In case a node fails, the auto-scaler can create a new plan to remove the failed node and launch a new one within the cluster.
  • Node Scheduler: The scheduler is responsible for launching or removing nodes in accordance with the scaling plan. Different schedulers can be implemented to support various CRDs. For example, we can implement an elastic scheduler to launch nodes one by one, similar to the DLRover Python scheduler
    def _periodic_create_pod(self):
    logger.info("Start the thread to create Pod.")
    with ThreadPoolExecutor(max_workers=4) as executor:
    while self._started:
    while self._create_node_queue:
    executor.submit(
    self._create_pod_from_queue,
    self._create_node_queue.popleft(),
    )
    time.sleep(3)
    def _create_pod_from_queue(self, node_from_queue=None):
    """
    Notice: we must ensure the sync operation of getting node happens
    before the async execution, so we set 'node_from_queue' in the params
    instead of pop the element in the current function to avoid invalid
    async execution calls.
    Args:
    node_from_queue (Node): List of Node instances.
    """
    if node_from_queue is None:
    return True
    succeed = False
    if self._check_cluster_ready_for_pod(node_from_queue):
    pod = self._create_pod(node_from_queue)
    succeed = self._k8s_client.create_pod(pod)
    if not succeed:
    self._create_node_queue.appendleft(node_from_queue)
    else:
    # create svs for succeed pod
    if not self._create_service_for_pod(node_from_queue):
    self._create_node_queue.appendleft(node_from_queue)
    return succeed
    . Also, we can create a scheduler to generate a volcano/PodGroup to support volcano gang scheduling.
  • Node Watcher: The watcher monitors the node events from the cluster and reports these events to the auto scaler and the training detector.
@workingloong workingloong changed the title Re-implement the master using golang. [RFC] Re-implement the master using Golang. Dec 28, 2024
@workingloong workingloong changed the title [RFC] Re-implement the master using Golang. [RFC] Refactor the master using Golang. Dec 28, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

1 participant