Skip to content

Commit

Permalink
Add batched client implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
HardNorth committed Sep 9, 2023
1 parent c89b566 commit 04f957c
Showing 1 changed file with 206 additions and 0 deletions.
206 changes: 206 additions & 0 deletions reportportal_client/aio/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -906,3 +906,209 @@ def clone(self) -> 'ScheduledRPClient':
if current_item:
cloned._add_current_item(current_item)
return cloned


class BatchedRPClient(RPClient):
__client: _AsyncRPClient
_item_stack: _LifoQueue
__loop: Optional[asyncio.AbstractEventLoop]
__thread: Optional[threading.Thread]
__task_list: List[asyncio.Task]
launch_uuid: Optional[asyncio.Task]
use_own_launch: bool
step_reporter: StepReporter

def __init__(self, endpoint: str, project: str, *, launch_uuid: Optional[asyncio.Task] = None,
client: Optional[_AsyncRPClient] = None, loop: Optional[asyncio.AbstractEventLoop] = None,
**kwargs: Any) -> None:
set_current(self)
self.step_reporter = StepReporter(self)
self._item_stack = _LifoQueue()
if client:
self.__client = client
else:
self.__client = _AsyncRPClient(endpoint, project, **kwargs)
if launch_uuid:
self.launch_uuid = launch_uuid
self.use_own_launch = False
else:
self.use_own_launch = True

self.__task_list = []
self.__loop = asyncio.new_event_loop()

def run_tasks(self) -> None:
tasks = self.__task_list
if len(tasks) <= 0:
return
self.__task_list = []
self.__loop.run_until_complete(asyncio.gather(*tasks))

def create_task(self, coro: Any) -> asyncio.Task:
result = self.__loop.create_task(coro)
self.__task_list.append(result)
if len(self.__task_list) >= 10:
self.run_tasks()
return result

def finish_tasks(self):
self.run_tasks()

async def __empty_line(self):
return ""

def start_launch(self,
name: str,
start_time: str,
description: Optional[str] = None,
attributes: Optional[Union[List, Dict]] = None,
rerun: bool = False,
rerun_of: Optional[str] = None,
**kwargs) -> asyncio.Task:
if not self.use_own_launch:
return self.launch_uuid
launch_uuid_coro = self.__client.start_launch(name, start_time, description=description,
attributes=attributes, rerun=rerun, rerun_of=rerun_of,
**kwargs)
self.launch_uuid = self.create_task(launch_uuid_coro)
return self.launch_uuid

def start_test_item(self,
name: str,
start_time: str,
item_type: str,
*,
description: Optional[str] = None,
attributes: Optional[List[Dict]] = None,
parameters: Optional[Dict] = None,
parent_item_id: Optional[asyncio.Task] = None,
has_stats: bool = True,
code_ref: Optional[str] = None,
retry: bool = False,
test_case_id: Optional[str] = None,
**kwargs: Any) -> asyncio.Task:

item_id_coro = self.__client.start_test_item(self.launch_uuid, name, start_time, item_type,
description=description, attributes=attributes,
parameters=parameters, parent_item_id=parent_item_id,
has_stats=has_stats, code_ref=code_ref, retry=retry,
test_case_id=test_case_id, **kwargs)
item_id_task = self.create_task(item_id_coro)
self._add_current_item(item_id_task)
return item_id_task

def finish_test_item(self,
item_id: asyncio.Task,
end_time: str,
*,
status: str = None,
issue: Optional[Issue] = None,
attributes: Optional[Union[List, Dict]] = None,
description: str = None,
retry: bool = False,
**kwargs: Any) -> asyncio.Task:
result_coro = self.__client.finish_test_item(self.launch_uuid, item_id, end_time, status=status,
issue=issue, attributes=attributes,
description=description,
retry=retry, **kwargs)
result_task = self.create_task(result_coro)
self._remove_current_item()
return result_task

def finish_launch(self,
end_time: str,
status: str = None,
attributes: Optional[Union[List, Dict]] = None,
**kwargs: Any) -> asyncio.Task:
if self.use_own_launch:
result_coro = self.__client.finish_launch(self.launch_uuid, end_time, status=status,
attributes=attributes, **kwargs)
else:
result_coro = self.create_task(self.__empty_line())

result_task = self.create_task(result_coro)
self.finish_tasks()
return result_task

def update_test_item(self,
item_uuid: asyncio.Task,
attributes: Optional[Union[List, Dict]] = None,
description: Optional[str] = None) -> asyncio.Task:
result_coro = self.__client.update_test_item(item_uuid, attributes=attributes,
description=description)
result_task = self.create_task(result_coro)
return result_task

def _add_current_item(self, item: asyncio.Task) -> None:
"""Add the last item from the self._items queue."""
self._item_stack.put(item)

def _remove_current_item(self) -> asyncio.Task:
"""Remove the last item from the self._items queue."""
return self._item_stack.get()

async def __empty_dict(self):
return {}

async def __none_value(self):
return

def current_item(self) -> asyncio.Task:
"""Retrieve the last item reported by the client."""
return self._item_stack.last()

def get_launch_info(self) -> asyncio.Task:
if not self.launch_uuid:
return self.create_task(self.__empty_dict())
result_coro = self.__client.get_launch_info(self.launch_uuid)
result_task = self.create_task(result_coro)
return result_task

def get_item_id_by_uuid(self, item_uuid_future: asyncio.Task) -> asyncio.Task:
result_coro = self.__client.get_item_id_by_uuid(item_uuid_future)
result_task = self.create_task(result_coro)
return result_task

def get_launch_ui_id(self) -> asyncio.Task:
if not self.launch_uuid:
return self.create_task(self.__none_value())
result_coro = self.__client.get_launch_ui_id(self.launch_uuid)
result_task = self.create_task(result_coro)
return result_task

def get_launch_ui_url(self) -> asyncio.Task:
if not self.launch_uuid:
return self.create_task(self.__none_value())
result_coro = self.__client.get_launch_ui_url(self.launch_uuid)
result_task = self.create_task(result_coro)
return result_task

def get_project_settings(self) -> asyncio.Task:
result_coro = self.__client.get_project_settings()
result_task = self.create_task(result_coro)
return result_task

def log(self, time: str, message: str, level: Optional[Union[int, str]] = None,
attachment: Optional[Dict] = None, item_id: Optional[str] = None) -> None:
# TODO: implement logging
return None

def clone(self) -> 'ScheduledRPClient':
"""Clone the client object, set current Item ID as cloned item ID.
:returns: Cloned client object
:rtype: ScheduledRPClient
"""
cloned_client = self.__client.clone()
# noinspection PyTypeChecker
cloned = ScheduledRPClient(
endpoint=None,
project=None,
launch_uuid=self.launch_uuid,
client=cloned_client,
loop=self.__loop
)
current_item = self.current_item()
if current_item:
cloned._add_current_item(current_item)
return cloned

0 comments on commit 04f957c

Please sign in to comment.