Skip to content

Commit

Permalink
Fix known issue (#1353)
Browse files Browse the repository at this point in the history
* fix known issue

* fix known issue

* lint
  • Loading branch information
BalaBalaYi authored Nov 21, 2024
1 parent cdfae74 commit b3bf606
Show file tree
Hide file tree
Showing 9 changed files with 48 additions and 24 deletions.
19 changes: 17 additions & 2 deletions dlrover/python/diagnosis/common/diagnosis_action.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,24 @@ def to_json(self):
data = {k.lstrip("_"): v for k, v in self.__dict__.items()}
return json.dumps(data)

def is_needed(self):
return (
not self.is_expired()
and self._action_type != DiagnosisActionType.NONE
)

@classmethod
def from_json(cls, json_data):
return cls(**json.loads(json_data))

def __repr__(self):
return (
f"action_type:{self._action_type};"
f"instance:{self._instance};"
f"timestamp:{self._timestamp};"
f"expired_time_period:{self._expired_time_period}"
)


class NoAction(DiagnosisAction):
def __init__(self):
Expand Down Expand Up @@ -187,8 +201,9 @@ def add_action(self, new_action: DiagnosisAction):
self._actions[instance] = Queue(maxsize=10)
ins_actions = self._actions[instance]
try:
ins_actions.put(new_action, timeout=3)
logger.info(f"New diagnosis action {new_action}")
if new_action.is_needed():
ins_actions.put(new_action, timeout=3)
logger.info(f"New diagnosis action {new_action}")
except queue.Full:
logger.warning(
f"Diagnosis actions for {instance} is full, "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from dlrover.python.common.log import default_logger as logger
from dlrover.python.diagnosis.common.constants import EnvConfigKey
from dlrover.python.diagnosis.datacollector.data_collector import DataCollector
from dlrover.python.util.common_util import is_port_in_use


class XpuTimerMetricsCollector(DataCollector):
Expand Down Expand Up @@ -63,4 +64,6 @@ def _preprocess_metrics(self, metric_str):
return ""

def is_enabled(self) -> bool:
return self._metric_endpoint is not None
return self._metric_endpoint is not None and is_port_in_use(
self._metric_port
)
5 changes: 1 addition & 4 deletions dlrover/python/master/node/dist_job_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -866,10 +866,7 @@ def _should_relaunch(self, node: Node, status_change_flow: NodeStateFlow):
msg = "Disable relaunch"
elif node.exit_reason == NodeExitReason.OOM:
mem = node.config_resource.memory
if (
node.is_resource_scalable()
and mem >= NodeResourceLimit.MAX_MEMORY
):
if mem >= NodeResourceLimit.MAX_MEMORY:
should_relaunch = False
logger.warning(
f"The memory of node {mem} is beyond the limit "
Expand Down
10 changes: 2 additions & 8 deletions dlrover/python/master/resource/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -554,14 +554,8 @@ def _get_free_gpu_node(self):

def adjust_oom_resource(self, node: Node):
"""Adjust the resource configuration for OOM nodes"""

if node.config_resource.memory > NodeResourceLimit.MAX_MEMORY:
# no memory extension if the current value > the default max
pass
else:
node.config_resource.memory = min(
node.config_resource.memory * 2, NodeResourceLimit.MAX_MEMORY
)
# no adjustment for now(for allreduce type)
pass

def get_config_resource(self):
job_config = JobResource()
Expand Down
3 changes: 3 additions & 0 deletions dlrover/python/tests/test_common_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,6 @@ class CommonUtilTest(unittest.TestCase):
def test_get_dlrover_version(self):
self.assertIsNotNone(cu.get_dlrover_version())
self.assertNotEqual(cu.get_dlrover_version(), "Unknown")

def test_is_port_in_use(self):
self.assertFalse(cu.is_port_in_use(65530))
4 changes: 4 additions & 0 deletions dlrover/python/tests/test_diagnosis.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ def test_action_basic(self):
self.assertEqual(
basic_action._instance, DiagnosisConstant.LOCAL_INSTANCE
)
self.assertFalse(basic_action.is_needed())

event_action = EventAction(
"info", "job", "test", "test123", {"k1": "v1"}
Expand All @@ -53,6 +54,7 @@ def test_action_basic(self):
self.assertEqual(event_action.event_action, "test")
self.assertEqual(event_action.event_msg, "test123")
self.assertEqual(event_action.event_labels, {"k1": "v1"})
self.assertTrue(event_action.is_needed())

node_relaunch_action = NodeAction(
node_id=1,
Expand All @@ -68,6 +70,7 @@ def test_action_basic(self):
self.assertEqual(node_relaunch_action.node_id, 1)
self.assertEqual(node_relaunch_action.node_status, NodeStatus.FAILED)
self.assertEqual(node_relaunch_action.reason, "hang")
self.assertTrue(event_action.is_needed())

node_relaunch_action = NodeAction(
node_id=1,
Expand All @@ -79,6 +82,7 @@ def test_action_basic(self):
node_relaunch_action.action_type,
DiagnosisActionType.RESTART_WORKER,
)
self.assertTrue(event_action.is_needed())

def test_action_queue(self):
action_queue = DiagnosisActionQueue()
Expand Down
5 changes: 5 additions & 0 deletions dlrover/python/tests/test_diagnosis_data_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# limitations under the License.

import os
import socket
import unittest
from unittest.mock import patch

Expand Down Expand Up @@ -66,8 +67,12 @@ def test_xpu_timer_metric_collector(self):
self.assertFalse(collector.is_enabled())

env_utils.set_env(EnvConfigKey.XPU_TIMER_PORT, 18889)
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.bind(("localhost", 18889))
sock.listen(1)
collector = XpuTimerMetricsCollector()
self.assertTrue(collector.is_enabled())
sock.close()

self.assertEqual(collector.collect_data(), "")

Expand Down
10 changes: 1 addition & 9 deletions dlrover/python/tests/test_resource_optimizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -290,15 +290,7 @@ def test_adjust_oom_resource(self):
optimizer = AllreduceJobResourceOptimizer(worker_resource, "test-job")
node = Node("worker", 1, config_resource=NodeResource(1, 3276))
optimizer.adjust_oom_resource(node)
self.assertEqual(node.config_resource.memory, 6552)

node = Node("worker", 1, config_resource=NodeResource(1, 32768))
optimizer.adjust_oom_resource(node)
self.assertEqual(node.config_resource.memory, 65536)

node = Node("worker", 1, config_resource=NodeResource(1, 32769))
optimizer.adjust_oom_resource(node)
self.assertEqual(node.config_resource.memory, 65536)
self.assertEqual(node.config_resource.memory, 3276)

node = Node("worker", 1, config_resource=NodeResource(1, 65538))
optimizer.adjust_oom_resource(node)
Expand Down
11 changes: 11 additions & 0 deletions dlrover/python/util/common_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

import importlib.metadata
import re
import socket

import dlrover.python.util.file_util as fu

Expand Down Expand Up @@ -57,3 +58,13 @@ def get_installed_version(package_name):
return version
except importlib.metadata.PackageNotFoundError:
return None


def is_port_in_use(port=0) -> bool:
"""
Check if the port is in use.
"""

with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
result = sock.connect_ex(("localhost", int(port)))
return result == 0

0 comments on commit b3bf606

Please sign in to comment.