Skip to content

Commit

Permalink
Refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
HardNorth committed Sep 12, 2023
1 parent 868da81 commit d6a4870
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 75 deletions.
8 changes: 8 additions & 0 deletions reportportal_client/aio/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,11 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License

from reportportal_client.aio.tasks import Task, BatchedTask, BatchedTaskFactory

__all__ = [
'Task',
'BatchedTask',
'BatchedTaskFactory'
]
89 changes: 14 additions & 75 deletions reportportal_client/aio/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,16 @@
import threading
import time
import warnings
from asyncio import Future
from os import getenv
from queue import LifoQueue
from typing import Union, Tuple, List, Dict, Any, Optional, TextIO, Coroutine, TypeVar, Generic, Generator, \
Awaitable
from typing import Union, Tuple, List, Dict, Any, Optional, TextIO, Coroutine, TypeVar

import aiohttp
import certifi

# noinspection PyProtectedMember
from reportportal_client._local import set_current
from reportportal_client.aio import Task, BatchedTaskFactory
from reportportal_client.core.rp_issues import Issue
from reportportal_client.core.rp_requests import (LaunchStartRequest, AsyncHttpRequest, AsyncItemStartRequest,
AsyncItemFinishRequest, LaunchFinishRequest)
Expand All @@ -53,67 +52,7 @@
TASK_TIMEOUT: int = 60
SHUTDOWN_TIMEOUT: int = 120


T = TypeVar('T')


class Task(Generic[T], asyncio.Task, metaclass=AbstractBaseClass):
__metaclass__ = AbstractBaseClass

def __init__(
self,
coro: Union[Generator[Future[object], None, T], Awaitable[T]],
*,
loop: asyncio.AbstractEventLoop,
name: Optional[str] = None
) -> None:
super().__init__(coro, loop=loop, name=name)

@abstractmethod
def blocking_result(self) -> T:
raise NotImplementedError('"blocking_result" method is not implemented!')


class BatchedTask(Generic[T], Task[T]):
__loop: asyncio.AbstractEventLoop
__thread: threading.Thread

def __init__(
self,
coro: Union[Generator[Future[object], None, T], Awaitable[T]],
*,
loop: asyncio.AbstractEventLoop,
name: Optional[str] = None,
thread: threading.Thread
) -> None:
super().__init__(coro, loop=loop, name=name)
self.__loop = loop
self.__thread = thread

def blocking_result(self) -> T:
if self.done():
return self.result()
if self.__thread is not threading.current_thread():
warnings.warn("The method was called from different thread which was used to create the"
"task, unexpected behavior is possible during the execution.", RuntimeWarning,
stacklevel=3)
return self.__loop.run_until_complete(self)


class _BatchedTaskFactory:
__loop: asyncio.AbstractEventLoop
__thread: threading.Thread

def __init__(self, loop: asyncio.AbstractEventLoop, thread: threading.Thread):
self.__loop = loop
self.__thread = thread

def __call__(
self,
loop: asyncio.AbstractEventLoop,
factory: Union[Coroutine[Any, Any, T], Generator[Any, None, T]]
) -> Task[T]:
return BatchedTask(factory, loop=self.__loop, thread=self.__thread)
_T = TypeVar('_T')


class _LifoQueue(LifoQueue):
Expand Down Expand Up @@ -755,7 +694,7 @@ class ThreadedRPClient(RPClient):
_item_stack: _LifoQueue
__loop: Optional[asyncio.AbstractEventLoop]
__thread: Optional[threading.Thread]
__task_list: List[Task[T]]
__task_list: List[Task[_T]]
self_loop: bool
self_thread: bool
launch_uuid: Optional[Task[str]]
Expand Down Expand Up @@ -787,7 +726,7 @@ def __init__(self, endpoint: str, project: str, *, launch_uuid: Optional[Task[st
self.__loop = asyncio.new_event_loop()
self.self_loop = True

def create_task(self, coro: Coroutine[Any, Any, T]) -> Task[T]:
def create_task(self, coro: Coroutine[Any, Any, _T]) -> Task[_T]:
loop = self.__loop
result = loop.create_task(coro)
self.__task_list.append(result)
Expand Down Expand Up @@ -899,15 +838,15 @@ def update_test_item(self,
result_task = self.create_task(result_coro)
return result_task

def _add_current_item(self, item: Task[T]) -> None:
def _add_current_item(self, item: Task[_T]) -> None:
"""Add the last item from the self._items queue."""
self._item_stack.put(item)

def _remove_current_item(self) -> Task[T]:
def _remove_current_item(self) -> Task[_T]:
"""Remove the last item from the self._items queue."""
return self._item_stack.get()

def current_item(self) -> Task[T]:
def current_item(self) -> Task[_T]:
"""Retrieve the last item reported by the client."""
return self._item_stack.last()

Expand Down Expand Up @@ -978,7 +917,7 @@ class BatchedRPClient(RPClient):
__client: _AsyncRPClient
_item_stack: _LifoQueue
__loop: asyncio.AbstractEventLoop
__task_list: List[Task[T]]
__task_list: List[Task[_T]]
__task_mutex: threading.Lock
__last_run_time: float
__thread: threading.Thread
Expand Down Expand Up @@ -1006,7 +945,7 @@ def __init__(self, endpoint: str, project: str, *, launch_uuid: Optional[Task[st
self.__last_run_time = time.time()
self.__loop = asyncio.new_event_loop()
self.__thread = threading.current_thread()
self.__loop.set_task_factory(_BatchedTaskFactory(self.__loop, self.__thread))
self.__loop.set_task_factory(BatchedTaskFactory(self.__loop, self.__thread))

def __ready_to_run(self) -> bool:
current_time = time.time()
Expand All @@ -1018,7 +957,7 @@ def __ready_to_run(self) -> bool:
return True
return False

def create_task(self, coro: Coroutine[Any, Any, T]) -> Task[T]:
def create_task(self, coro: Coroutine[Any, Any, _T]) -> Task[_T]:
result = self.__loop.create_task(coro)
tasks = None
with self.__task_mutex:
Expand Down Expand Up @@ -1124,15 +1063,15 @@ def update_test_item(self,
result_task = self.create_task(result_coro)
return result_task

def _add_current_item(self, item: Task[T]) -> None:
def _add_current_item(self, item: Task[_T]) -> None:
"""Add the last item from the self._items queue."""
self._item_stack.put(item)

def _remove_current_item(self) -> Task[T]:
def _remove_current_item(self) -> Task[_T]:
"""Remove the last item from the self._items queue."""
return self._item_stack.get()

def current_item(self) -> Task[T]:
def current_item(self) -> Task[_T]:
"""Retrieve the last item reported by the client."""
return self._item_stack.last()

Expand Down
82 changes: 82 additions & 0 deletions reportportal_client/aio/tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
# Copyright (c) 2023 EPAM Systems
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License

import asyncio
import threading
import warnings
from abc import abstractmethod
from asyncio import Future
from typing import TypeVar, Generic, Union, Generator, Awaitable, Optional, Coroutine, Any

from reportportal_client.static.abstract import AbstractBaseClass

_T = TypeVar('_T')


class Task(Generic[_T], asyncio.Task, metaclass=AbstractBaseClass):
__metaclass__ = AbstractBaseClass

def __init__(
self,
coro: Union[Generator[Future[object], None, _T], Awaitable[_T]],
*,
loop: asyncio.AbstractEventLoop,
name: Optional[str] = None
) -> None:
super().__init__(coro, loop=loop, name=name)

@abstractmethod
def blocking_result(self) -> _T:
raise NotImplementedError('"blocking_result" method is not implemented!')


class BatchedTask(Generic[_T], Task[_T]):
__loop: asyncio.AbstractEventLoop
__thread: threading.Thread

def __init__(
self,
coro: Union[Generator[Future[object], None, _T], Awaitable[_T]],
*,
loop: asyncio.AbstractEventLoop,
name: Optional[str] = None,
thread: threading.Thread
) -> None:
super().__init__(coro, loop=loop, name=name)
self.__loop = loop
self.__thread = thread

def blocking_result(self) -> _T:
if self.done():
return self.result()
if self.__thread is not threading.current_thread():
warnings.warn("The method was called from different thread which was used to create the"
"task, unexpected behavior is possible during the execution.", RuntimeWarning,
stacklevel=3)
return self.__loop.run_until_complete(self)


class BatchedTaskFactory:
__loop: asyncio.AbstractEventLoop
__thread: threading.Thread

def __init__(self, loop: asyncio.AbstractEventLoop, thread: threading.Thread):
self.__loop = loop
self.__thread = thread

def __call__(
self,
loop: asyncio.AbstractEventLoop,
factory: Union[Coroutine[Any, Any, _T], Generator[Any, None, _T]]
) -> Task[_T]:
return BatchedTask(factory, loop=self.__loop, thread=self.__thread)

0 comments on commit d6a4870

Please sign in to comment.