Skip to content

Commit

Permalink
Async RPClient: WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
HardNorth committed Aug 15, 2023
1 parent 5a6da9d commit 6ef4137
Showing 1 changed file with 65 additions and 12 deletions.
77 changes: 65 additions & 12 deletions reportportal_client/client_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import asyncio
import logging
import threading
from queue import LifoQueue
from typing import Union, Tuple, List, Dict, Any, Optional, TextIO

Expand Down Expand Up @@ -65,6 +66,7 @@ def __init__(
self,
endpoint: str,
project: str,
*,
api_key: str = None,
log_batch_size: int = 20,
is_skipped_an_issue: bool = True,
Expand Down Expand Up @@ -127,7 +129,7 @@ async def start_launch(self,
attributes: Optional[Union[List, Dict]] = None,
rerun: bool = False,
rerun_of: Optional[str] = None,
**kwargs) -> Optional[str]:
**kwargs) -> Optional[Union[asyncio.Future, str]]:
pass

async def start_test_item(self,
Expand All @@ -143,8 +145,10 @@ async def start_test_item(self,
code_ref: Optional[str] = None,
retry: bool = False,
test_case_id: Optional[str] = None,
**_: Any) -> Optional[str]:
pass
**_: Any) -> Optional[Union[asyncio.Future, str]]:
parent = parent_item_id
if parent_item_id and asyncio.isfuture(parent_item_id):
parent = await parent_item_id

async def update_test_item(self, item_uuid: Union[asyncio.Future, str],
attributes: Optional[Union[List, Dict]] = None,
Expand Down Expand Up @@ -204,11 +208,12 @@ async def start_test_item(self,
code_ref: Optional[str] = None,
retry: bool = False,
test_case_id: Optional[str] = None,
**_: Any) -> Optional[str]:
**kwargs: Any) -> Optional[Union[asyncio.Future, str]]:
item_id = await super().start_test_item(name, start_time, item_type, description=description,
attributes=attributes, parameters=parameters,
parent_item_id=parent_item_id, has_stats=has_stats,
code_ref=code_ref, retry=retry, test_case_id=test_case_id)
code_ref=code_ref, retry=retry, test_case_id=test_case_id,
**kwargs)
if item_id and item_id is not NOT_FOUND:
super()._add_current_item(item_id)
return item_id
Expand All @@ -224,15 +229,63 @@ async def finish_test_item(self,
retry: bool = False,
**kwargs: Any) -> Optional[str]:
result = await super().finish_test_item(item_id, end_time, status=status, issue=issue,
attributes=attributes, description=description, retry=retry)
attributes=attributes, description=description, retry=retry,
**kwargs)
super()._remove_current_item()
return result


class RPClientSync:

client: _RPClient

def __init__(self, client: _RPClient):
self.client = client
class RPClientSync(_RPClient):
loop: asyncio.AbstractEventLoop
thread: threading.Thread

def __init__(
self,
endpoint: str,
project: str,
*,
api_key: str = None,
log_batch_size: int = 20,
is_skipped_an_issue: bool = True,
verify_ssl: bool = True,
retries: int = None,
max_pool_size: int = 50,
launch_id: str = None,
http_timeout: Union[float, Tuple[float, float]] = (10, 10),
log_batch_payload_size: int = MAX_LOG_BATCH_PAYLOAD_SIZE,
mode: str = 'DEFAULT',
launch_uuid_print: bool = False,
print_output: Optional[TextIO] = None,
**kwargs: Any
) -> None:
super().__init__(endpoint, project, api_key=api_key, log_batch_size=log_batch_size,
is_skipped_an_issue=is_skipped_an_issue, verify_ssl=verify_ssl, retries=retries,
max_pool_size=max_pool_size, launch_id=launch_id, http_timeout=http_timeout,
log_batch_payload_size=log_batch_payload_size, mode=mode,
launch_uuid_print=launch_uuid_print, print_output=print_output, **kwargs)
self.loop = asyncio.new_event_loop()
self.thread = threading.Thread(target=self.loop.run_forever(), name='RP-Async-Client', daemon=True)
self.thread.start()

def start_test_item(self,
name: str,
start_time: str,
item_type: str,
*,
description: Optional[str] = None,
attributes: Optional[List[Dict]] = None,
parameters: Optional[Dict] = None,
parent_item_id: Optional[Union[asyncio.Future, str]] = None,
has_stats: bool = True,
code_ref: Optional[str] = None,
retry: bool = False,
test_case_id: Optional[str] = None,
**kwargs: Any) -> Optional[Union[asyncio.Future, str]]:
item_id_coro = super().start_test_item(name, start_time, item_type, description=description,
attributes=attributes, parameters=parameters,
parent_item_id=parent_item_id, has_stats=has_stats,
code_ref=code_ref, retry=retry, test_case_id=test_case_id,
**kwargs)
item_id_task = self.loop.create_task(item_id_coro)
super()._add_current_item(item_id_task)
return item_id_task

0 comments on commit 6ef4137

Please sign in to comment.