Skip to content

Commit

Permalink
black
Browse files Browse the repository at this point in the history
  • Loading branch information
wjsi committed Jul 16, 2022
1 parent 47397a3 commit 9ad836a
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 15 deletions.
32 changes: 20 additions & 12 deletions mars/services/scheduling/supervisor/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,9 +172,7 @@ async def _get_execution_ref(self, band: BandType):

return await mo.actor_ref(SubtaskExecutionActor.default_uid(), address=band[0])

async def set_subtask_result(
self, result: SubtaskResult, band: BandType
):
async def set_subtask_result(self, result: SubtaskResult, band: BandType):
info = self._subtask_infos[result.subtask_id]
subtask_id = info.subtask.subtask_id
notify_task_service = True
Expand Down Expand Up @@ -346,28 +344,38 @@ async def batch_submit_subtask_to_band(self, args_list, kwargs_list):
async def _submit_subtasks_to_band(self, band: BandType, subtask_ids: List[str]):
execution_ref = await self._get_execution_ref(band)
delays = []
task_stage_count = defaultdict(lambda: 0)

async with redirect_subtask_errors(
self, self._get_subtasks_by_ids(subtask_ids)
):
for subtask_id in subtask_ids:
subtask_info = self._subtask_infos[subtask_id]
subtask = subtask_info.subtask
self._submitted_subtask_count.record(
1,
{
"session_id": self._session_id,
"task_id": subtask.task_id,
"stage_id": subtask.stage_id,
},
)
logger.debug("Start run subtask %s in band %s.", subtask_id, band)
task_stage_count[(subtask.task_id, subtask.stage_id)] += 1
delays.append(
execution_ref.run_subtask.delay(subtask, band[1], self.address)
)
subtask_info.band_futures[band] = asyncio.Future()
subtask_info.start_time = time.time()
self._speculation_execution_scheduler.add_subtask(subtask_info)

for (task_id, stage_id), cnt in task_stage_count.items():
self._submitted_subtask_count.record(
cnt,
{
"session_id": self._session_id,
"task_id": task_id,
"stage_id": stage_id,
},
)

logger.debug(
"Start run %d subtasks %r in band %s.",
len(subtask_ids),
subtask_ids,
band,
)
await execution_ref.run_subtask.batch(*delays, send=False)

async def cancel_subtasks(
Expand Down
8 changes: 6 additions & 2 deletions mars/services/scheduling/supervisor/tests/test_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,12 +109,16 @@ async def task_fun():
result.status = SubtaskStatus.cancelled
result.error = ex
result.traceback = ex.__traceback__
await manager_ref.set_subtask_result.tell(result, (self.address, band_name))
await manager_ref.set_subtask_result.tell(
result, (self.address, band_name)
)
raise
else:
result.status = SubtaskStatus.succeeded
result.execution_end_time = time.time()
await manager_ref.set_subtask_result.tell(result, (self.address, band_name))
await manager_ref.set_subtask_result.tell(
result, (self.address, band_name)
)

self._subtask_aiotasks[subtask.subtask_id][band_name] = asyncio.create_task(
task_fun()
Expand Down
4 changes: 3 additions & 1 deletion mars/services/scheduling/worker/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -572,7 +572,9 @@ async def subtask_caller():
manager_ref = await self._get_manager_ref(
subtask.session_id, supervisor_address
)
await manager_ref.set_subtask_result.tell(res, (self.address, band_name))
await manager_ref.set_subtask_result.tell(
res, (self.address, band_name)
)
finally:
self._subtask_info.pop(subtask_id, None)
self._finished_subtask_count.record(1, {"band": self.address})
Expand Down

0 comments on commit 9ad836a

Please sign in to comment.