Skip to content

Commit

Permalink
Update pydocs. And import fixes.
Browse files Browse the repository at this point in the history
  • Loading branch information
HardNorth committed Sep 26, 2023
1 parent 5fc8b64 commit 27d78f8
Show file tree
Hide file tree
Showing 5 changed files with 147 additions and 67 deletions.
18 changes: 10 additions & 8 deletions reportportal_client/aio/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,17 @@
# See the License for the specific language governing permissions and
# limitations under the License

from reportportal_client.aio.tasks import (Task, TriggerTaskList, BatchedTask, BatchedTaskFactory,
from reportportal_client.aio.tasks import (Task, TriggerTaskBatcher, BatchedTask, BatchedTaskFactory,
ThreadedTask, ThreadedTaskFactory, BlockingOperationError,
BackgroundTaskList, DEFAULT_TASK_TRIGGER_NUM,
BackgroundTaskBatcher, DEFAULT_TASK_TRIGGER_NUM,
DEFAULT_TASK_TRIGGER_INTERVAL)

DEFAULT_TASK_TIMEOUT: float = 60.0
DEFAULT_SHUTDOWN_TIMEOUT: float = 120.0
from reportportal_client.aio.client import (ThreadedRPClient, BatchedRPClient, AsyncRPClient,
DEFAULT_TASK_TIMEOUT, DEFAULT_SHUTDOWN_TIMEOUT)

__all__ = [
'Task',
'TriggerTaskList',
'BackgroundTaskList',
'TriggerTaskBatcher',
'BackgroundTaskBatcher',
'BatchedTask',
'BatchedTaskFactory',
'ThreadedTask',
Expand All @@ -31,5 +30,8 @@
'DEFAULT_TASK_TIMEOUT',
'DEFAULT_SHUTDOWN_TIMEOUT',
'DEFAULT_TASK_TRIGGER_NUM',
'DEFAULT_TASK_TRIGGER_INTERVAL'
'DEFAULT_TASK_TRIGGER_INTERVAL',
'AsyncRPClient',
'BatchedRPClient',
'ThreadedRPClient'
]
25 changes: 14 additions & 11 deletions reportportal_client/aio/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@
from reportportal_client import RP
# noinspection PyProtectedMember
from reportportal_client._local import set_current
from reportportal_client.aio import (Task, BatchedTaskFactory, ThreadedTaskFactory, DEFAULT_TASK_TIMEOUT,
DEFAULT_SHUTDOWN_TIMEOUT, DEFAULT_TASK_TRIGGER_NUM, TriggerTaskList,
DEFAULT_TASK_TRIGGER_INTERVAL, BackgroundTaskList)
from reportportal_client.aio.tasks import (Task, BatchedTaskFactory, ThreadedTaskFactory, TriggerTaskBatcher,
BackgroundTaskBatcher, DEFAULT_TASK_TRIGGER_NUM,
DEFAULT_TASK_TRIGGER_INTERVAL)
from reportportal_client.aio.http import RetryingClientSession
from reportportal_client.core.rp_issues import Issue
from reportportal_client.core.rp_requests import (LaunchStartRequest, AsyncHttpRequest, AsyncItemStartRequest,
Expand All @@ -54,6 +54,9 @@

_T = TypeVar('_T')

DEFAULT_TASK_TIMEOUT: float = 60.0
DEFAULT_SHUTDOWN_TIMEOUT: float = 120.0


class Client:
api_v1: str
Expand Down Expand Up @@ -882,7 +885,7 @@ async def _close(self):

class ThreadedRPClient(_RPClient):
_loop: Optional[asyncio.AbstractEventLoop]
__task_list: BackgroundTaskList[Task[_T]]
__task_list: BackgroundTaskBatcher[Task[_T]]
__task_mutex: threading.RLock
__thread: Optional[threading.Thread]

Expand All @@ -894,21 +897,21 @@ def __init__(
launch_uuid: Optional[Task[str]] = None,
client: Optional[Client] = None,
log_batcher: Optional[LogBatcher] = None,
task_list: Optional[BackgroundTaskList[Task[_T]]] = None,
task_list: Optional[BackgroundTaskBatcher[Task[_T]]] = None,
task_mutex: Optional[threading.RLock] = None,
loop: Optional[asyncio.AbstractEventLoop] = None,
**kwargs: Any
) -> None:
super().__init__(endpoint, project, launch_uuid=launch_uuid, client=client, log_batcher=log_batcher,
**kwargs)
self.__task_list = task_list or BackgroundTaskList()
self.__task_list = task_list or BackgroundTaskBatcher()
self.__task_mutex = task_mutex or threading.RLock()
self.__thread = None
if loop:
self._loop = loop
else:
self._loop = asyncio.new_event_loop()
self._loop.set_task_factory(ThreadedTaskFactory(self._loop, self._task_timeout))
self._loop.set_task_factory(ThreadedTaskFactory(self._task_timeout))
self.__heartbeat()
self.__thread = threading.Thread(target=self._loop.run_forever, name='RP-Async-Client',
daemon=True)
Expand Down Expand Up @@ -966,7 +969,7 @@ def clone(self) -> 'ThreadedRPClient':

class BatchedRPClient(_RPClient):
_loop: asyncio.AbstractEventLoop
__task_list: TriggerTaskList[Task[_T]]
__task_list: TriggerTaskBatcher[Task[_T]]
__task_mutex: threading.RLock
__last_run_time: float
__trigger_num: int
Expand All @@ -979,7 +982,7 @@ def __init__(
launch_uuid: Optional[Task[str]] = None,
client: Optional[Client] = None,
log_batcher: Optional[LogBatcher] = None,
task_list: Optional[TriggerTaskList] = None,
task_list: Optional[TriggerTaskBatcher] = None,
task_mutex: Optional[threading.RLock] = None,
loop: Optional[asyncio.AbstractEventLoop] = None,
trigger_num: int = DEFAULT_TASK_TRIGGER_NUM,
Expand All @@ -988,14 +991,14 @@ def __init__(
) -> None:
super().__init__(endpoint, project, launch_uuid=launch_uuid, client=client, log_batcher=log_batcher,
**kwargs)
self.__task_list = task_list or TriggerTaskList()
self.__task_list = task_list or TriggerTaskBatcher()
self.__task_mutex = task_mutex or threading.RLock()
self.__last_run_time = datetime.time()
if loop:
self._loop = loop
else:
self._loop = asyncio.new_event_loop()
self._loop.set_task_factory(BatchedTaskFactory(self._loop))
self._loop.set_task_factory(BatchedTaskFactory())
self.__trigger_num = trigger_num
self.__trigger_interval = trigger_interval

Expand Down
140 changes: 108 additions & 32 deletions reportportal_client/aio/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
# See the License for the specific language governing permissions and
# limitations under the License

"""This module contains customized asynchronous Tasks and Task Factories for the ReportPortal client."""

import asyncio
import sys
import time
Expand All @@ -30,6 +32,12 @@ class BlockingOperationError(RuntimeError):


class Task(Generic[_T], asyncio.Task, metaclass=AbstractBaseClass):
"""Base class for ReportPortal client tasks.
Its main function to provide interface of 'blocking_result' method which is used to block current Thread
until the result computation.
"""

__metaclass__ = AbstractBaseClass

def __init__(
Expand All @@ -39,14 +47,44 @@ def __init__(
loop: asyncio.AbstractEventLoop,
name: Optional[str] = None
) -> None:
"""Initialize an instance of the Task.
:param coro: Future, Coroutine or a Generator of these objects, which will be executed
:param loop: Event Loop which will be used to execute the Task
:param name: the name of the task
"""
super().__init__(coro, loop=loop, name=name)

@abstractmethod
def blocking_result(self) -> _T:
"""Block current Thread and wait for the task result.
:return: execution result or raise an error
"""
raise NotImplementedError('"blocking_result" method is not implemented!')

def __repr__(self) -> str:
"""Return the result's repr function output if the Task is completed, or the Task's if not.
:return: canonical string representation of the result or the current Task
"""
if self.done():
return repr(self.result())
return super().__repr__()

def __str__(self):
"""Return the result's str function output if the Task is completed, or the Task's if not.
:return: string object from the result or from the current Task
"""
if self.done():
return str(self.result())
return super().__str__()


class BatchedTask(Generic[_T], Task[_T]):
"""Represents a Task which uses the current Thread to execute itself."""

__loop: asyncio.AbstractEventLoop

def __init__(
Expand All @@ -56,26 +94,28 @@ def __init__(
loop: asyncio.AbstractEventLoop,
name: Optional[str] = None
) -> None:
"""Initialize an instance of the Task.
:param coro: Future, Coroutine or a Generator of these objects, which will be executed
:param loop: Event Loop which will be used to execute the Task
:param name: the name of the task
"""
super().__init__(coro, loop=loop, name=name)
self.__loop = loop

def blocking_result(self) -> _T:
"""Use current Thread to execute the Task and return the result if not yet executed.
:return: execution result or raise an error, or return immediately if already executed
"""
if self.done():
return self.result()
return self.__loop.run_until_complete(self)

def __repr__(self) -> str:
if self.done():
return repr(self.result())
return super().__repr__()

def __str__(self):
if self.done():
return str(self.result())
return super().__str__()


class ThreadedTask(Generic[_T], Task[_T]):
"""Represents a Task which runs is a separate Thread."""

__loop: asyncio.AbstractEventLoop
__wait_timeout: float

Expand All @@ -87,11 +127,21 @@ def __init__(
loop: asyncio.AbstractEventLoop,
name: Optional[str] = None
) -> None:
"""Initialize an instance of the Task.
:param coro: Future, Coroutine or a Generator of these objects, which will be executed
:param loop: Event Loop which will be used to execute the Task
:param name: the name of the task
"""
super().__init__(coro, loop=loop, name=name)
self.__loop = loop
self.__wait_timeout = wait_timeout

def blocking_result(self) -> _T:
"""Pause current Thread until the Task completion and return the result if not yet executed.
:return: execution result or raise an error, or return immediately if already executed
"""
if self.done():
return self.result()
if not self.__loop.is_running() or self.__loop.is_closed():
Expand All @@ -104,38 +154,32 @@ def blocking_result(self) -> _T:
raise BlockingOperationError('Timed out waiting for the task execution')
return self.result()

def __repr__(self) -> str:
if self.done():
return repr(self.result())
return super().__repr__()

def __str__(self):
if self.done():
return str(self.result())
return super().__str__()


class BatchedTaskFactory:
__loop: asyncio.AbstractEventLoop

def __init__(self, loop: asyncio.AbstractEventLoop):
self.__loop = loop
"""Factory protocol which creates Batched Tasks."""

def __call__(
self,
loop: asyncio.AbstractEventLoop,
factory: Union[Coroutine[Any, Any, _T], Generator[Any, None, _T]],
**_
) -> Task[_T]:
return BatchedTask(factory, loop=self.__loop)
"""Create Batched Task in appropriate Event Loop.
:param loop: Event Loop which will be used to execute the Task
:param factory: Future, Coroutine or a Generator of these objects, which will be executed
"""
return BatchedTask(factory, loop=loop)


class ThreadedTaskFactory:
__loop: asyncio.AbstractEventLoop
__wait_timeout: float

def __init__(self, loop: asyncio.AbstractEventLoop, wait_timeout: float):
self.__loop = loop
def __init__(self, wait_timeout: float):
"""Initialize an instance of the Factory.
:param wait_timeout: Task wait timeout in case of blocking result get
"""
self.__wait_timeout = wait_timeout

def __call__(
Expand All @@ -144,18 +188,30 @@ def __call__(
factory: Union[Coroutine[Any, Any, _T], Generator[Any, None, _T]],
**_
) -> Task[_T]:
return ThreadedTask(factory, self.__wait_timeout, loop=self.__loop)
"""Create Threaded Task in appropriate Event Loop.
:param loop: Event Loop which will be used to execute the Task
:param factory: Future, Coroutine or a Generator of these objects, which will be executed
"""
return ThreadedTask(factory, self.__wait_timeout, loop=loop)


class TriggerTaskList(Generic[_T]):
class TriggerTaskBatcher(Generic[_T]):
"""Batching class which compile its batches by object number or by passed time."""

__task_list: List[_T]
__last_run_time: float
__trigger_num: int
__trigger_interval: float

def __init__(self,
trigger_num: int = DEFAULT_TASK_TRIGGER_NUM,
trigger_interval: float = DEFAULT_TASK_TRIGGER_INTERVAL):
trigger_interval: float = DEFAULT_TASK_TRIGGER_INTERVAL) -> None:
"""Initialize an instance of the Batcher.
:param trigger_num: object number threshold which triggers batch return and reset
:param trigger_interval: amount of time after which return and reset batch
"""
self.__task_list = []
self.__last_run_time = time.time()
self.__trigger_num = trigger_num
Expand All @@ -173,23 +229,35 @@ def __ready_to_run(self) -> bool:
return False

def append(self, value: _T) -> Optional[List[_T]]:
"""Add an object to internal batch and return the batch if it's triggered.
:param value: an object to add to the batch
:return: a batch or None
"""
self.__task_list.append(value)
if self.__ready_to_run():
tasks = self.__task_list
self.__task_list = []
return tasks

def flush(self) -> Optional[List[_T]]:
"""Immediately return everything what's left in the internal batch.
:return: a batch or None
"""
if len(self.__task_list) > 0:
tasks = self.__task_list
self.__task_list = []
return tasks


class BackgroundTaskList(Generic[_T]):
class BackgroundTaskBatcher(Generic[_T]):
"""Batching class which collects Tasks into internal batch and removes when they complete."""

__task_list: List[_T]

def __init__(self):
"""Initialize an instance of the Batcher."""
self.__task_list = []

def __remove_finished(self):
Expand All @@ -201,10 +269,18 @@ def __remove_finished(self):
self.__task_list = self.__task_list[i + 1:]

def append(self, value: _T) -> None:
"""Add an object to internal batch.
:param value: an object to add to the batch
"""
self.__remove_finished()
self.__task_list.append(value)

def flush(self) -> Optional[List[_T]]:
"""Immediately return everything what's left unfinished in the internal batch.
:return: a batch or None
"""
self.__remove_finished()
if len(self.__task_list) > 0:
tasks = self.__task_list
Expand Down
Loading

0 comments on commit 27d78f8

Please sign in to comment.