diff --git a/reportportal_client/aio/client.py b/reportportal_client/aio/client.py index 61d3d8d7..506c1e73 100644 --- a/reportportal_client/aio/client.py +++ b/reportportal_client/aio/client.py @@ -906,3 +906,209 @@ def clone(self) -> 'ScheduledRPClient': if current_item: cloned._add_current_item(current_item) return cloned + + +class BatchedRPClient(RPClient): + __client: _AsyncRPClient + _item_stack: _LifoQueue + __loop: Optional[asyncio.AbstractEventLoop] + __thread: Optional[threading.Thread] + __task_list: List[asyncio.Task] + launch_uuid: Optional[asyncio.Task] + use_own_launch: bool + step_reporter: StepReporter + + def __init__(self, endpoint: str, project: str, *, launch_uuid: Optional[asyncio.Task] = None, + client: Optional[_AsyncRPClient] = None, loop: Optional[asyncio.AbstractEventLoop] = None, + **kwargs: Any) -> None: + set_current(self) + self.step_reporter = StepReporter(self) + self._item_stack = _LifoQueue() + if client: + self.__client = client + else: + self.__client = _AsyncRPClient(endpoint, project, **kwargs) + if launch_uuid: + self.launch_uuid = launch_uuid + self.use_own_launch = False + else: + self.use_own_launch = True + + self.__task_list = [] + self.__loop = asyncio.new_event_loop() + + def run_tasks(self) -> None: + tasks = self.__task_list + if len(tasks) <= 0: + return + self.__task_list = [] + self.__loop.run_until_complete(asyncio.gather(*tasks)) + + def create_task(self, coro: Any) -> asyncio.Task: + result = self.__loop.create_task(coro) + self.__task_list.append(result) + if len(self.__task_list) >= 10: + self.run_tasks() + return result + + def finish_tasks(self): + self.run_tasks() + + async def __empty_line(self): + return "" + + def start_launch(self, + name: str, + start_time: str, + description: Optional[str] = None, + attributes: Optional[Union[List, Dict]] = None, + rerun: bool = False, + rerun_of: Optional[str] = None, + **kwargs) -> asyncio.Task: + if not self.use_own_launch: + return self.launch_uuid + launch_uuid_coro = self.__client.start_launch(name, start_time, description=description, + attributes=attributes, rerun=rerun, rerun_of=rerun_of, + **kwargs) + self.launch_uuid = self.create_task(launch_uuid_coro) + return self.launch_uuid + + def start_test_item(self, + name: str, + start_time: str, + item_type: str, + *, + description: Optional[str] = None, + attributes: Optional[List[Dict]] = None, + parameters: Optional[Dict] = None, + parent_item_id: Optional[asyncio.Task] = None, + has_stats: bool = True, + code_ref: Optional[str] = None, + retry: bool = False, + test_case_id: Optional[str] = None, + **kwargs: Any) -> asyncio.Task: + + item_id_coro = self.__client.start_test_item(self.launch_uuid, name, start_time, item_type, + description=description, attributes=attributes, + parameters=parameters, parent_item_id=parent_item_id, + has_stats=has_stats, code_ref=code_ref, retry=retry, + test_case_id=test_case_id, **kwargs) + item_id_task = self.create_task(item_id_coro) + self._add_current_item(item_id_task) + return item_id_task + + def finish_test_item(self, + item_id: asyncio.Task, + end_time: str, + *, + status: str = None, + issue: Optional[Issue] = None, + attributes: Optional[Union[List, Dict]] = None, + description: str = None, + retry: bool = False, + **kwargs: Any) -> asyncio.Task: + result_coro = self.__client.finish_test_item(self.launch_uuid, item_id, end_time, status=status, + issue=issue, attributes=attributes, + description=description, + retry=retry, **kwargs) + result_task = self.create_task(result_coro) + self._remove_current_item() + return result_task + + def finish_launch(self, + end_time: str, + status: str = None, + attributes: Optional[Union[List, Dict]] = None, + **kwargs: Any) -> asyncio.Task: + if self.use_own_launch: + result_coro = self.__client.finish_launch(self.launch_uuid, end_time, status=status, + attributes=attributes, **kwargs) + else: + result_coro = self.create_task(self.__empty_line()) + + result_task = self.create_task(result_coro) + self.finish_tasks() + return result_task + + def update_test_item(self, + item_uuid: asyncio.Task, + attributes: Optional[Union[List, Dict]] = None, + description: Optional[str] = None) -> asyncio.Task: + result_coro = self.__client.update_test_item(item_uuid, attributes=attributes, + description=description) + result_task = self.create_task(result_coro) + return result_task + + def _add_current_item(self, item: asyncio.Task) -> None: + """Add the last item from the self._items queue.""" + self._item_stack.put(item) + + def _remove_current_item(self) -> asyncio.Task: + """Remove the last item from the self._items queue.""" + return self._item_stack.get() + + async def __empty_dict(self): + return {} + + async def __none_value(self): + return + + def current_item(self) -> asyncio.Task: + """Retrieve the last item reported by the client.""" + return self._item_stack.last() + + def get_launch_info(self) -> asyncio.Task: + if not self.launch_uuid: + return self.create_task(self.__empty_dict()) + result_coro = self.__client.get_launch_info(self.launch_uuid) + result_task = self.create_task(result_coro) + return result_task + + def get_item_id_by_uuid(self, item_uuid_future: asyncio.Task) -> asyncio.Task: + result_coro = self.__client.get_item_id_by_uuid(item_uuid_future) + result_task = self.create_task(result_coro) + return result_task + + def get_launch_ui_id(self) -> asyncio.Task: + if not self.launch_uuid: + return self.create_task(self.__none_value()) + result_coro = self.__client.get_launch_ui_id(self.launch_uuid) + result_task = self.create_task(result_coro) + return result_task + + def get_launch_ui_url(self) -> asyncio.Task: + if not self.launch_uuid: + return self.create_task(self.__none_value()) + result_coro = self.__client.get_launch_ui_url(self.launch_uuid) + result_task = self.create_task(result_coro) + return result_task + + def get_project_settings(self) -> asyncio.Task: + result_coro = self.__client.get_project_settings() + result_task = self.create_task(result_coro) + return result_task + + def log(self, time: str, message: str, level: Optional[Union[int, str]] = None, + attachment: Optional[Dict] = None, item_id: Optional[str] = None) -> None: + # TODO: implement logging + return None + + def clone(self) -> 'ScheduledRPClient': + """Clone the client object, set current Item ID as cloned item ID. + + :returns: Cloned client object + :rtype: ScheduledRPClient + """ + cloned_client = self.__client.clone() + # noinspection PyTypeChecker + cloned = ScheduledRPClient( + endpoint=None, + project=None, + launch_uuid=self.launch_uuid, + client=cloned_client, + loop=self.__loop + ) + current_item = self.current_item() + if current_item: + cloned._add_current_item(current_item) + return cloned \ No newline at end of file