-
Notifications
You must be signed in to change notification settings - Fork 3
/
asyncio_tools.py
130 lines (98 loc) · 3.6 KB
/
asyncio_tools.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
from __future__ import annotations
import asyncio
from functools import cached_property
import typing as t
__VERSION__ = "1.0.0"
class CompoundException(Exception):
"""
Is used to aggregate several exceptions into a single exception, with a
combined message. It contains a reference to the constituent exceptions.
"""
def __init__(self, exceptions: t.List[Exception]):
self.exceptions = exceptions
def __str__(self):
return (
f"CompoundException, {len(self.exceptions)} errors ["
+ "; ".join(
[
f"{i.__class__.__name__}: {i.__str__()}"
for i in self.exceptions
]
)
+ "]"
)
@cached_property
def exception_types(self) -> t.List[t.Type[Exception]]:
"""
Returns the constituent exception types.
Useful for checks like this:
if TransactionError in compound_exception.exception_types:
some_transaction_cleanup()
"""
return [i.__class__ for i in self.exceptions]
class GatheredResults:
# __dict__ is required for cached_property
__slots__ = ("__results", "__dict__")
def __init__(self, results: t.List[t.Any]):
self.__results = results
###########################################################################
@property
def results(self):
return self.__results
@property
def all(self) -> t.List[t.Any]:
"""
Just a proxy.
"""
return self.__results
###########################################################################
@cached_property
def exceptions(self) -> t.List[t.Type[Exception]]:
"""
Returns all exception instances which were returned by asyncio.gather.
"""
return [i for i in self.results if isinstance(i, Exception)]
def exceptions_of_type(
self, exception_type: t.Type[Exception]
) -> t.List[t.Type[Exception]]:
"""
Returns any exceptions of the given type.
"""
return [i for i in self.exceptions if isinstance(i, exception_type)]
@cached_property
def exception_types(self) -> t.List[t.Type[Exception]]:
"""
Returns the exception types which appeared in the response.
"""
return [i.__class__ for i in self.exceptions]
@cached_property
def exception_count(self) -> int:
return len(self.exceptions)
###########################################################################
@cached_property
def successes(self) -> t.List[t.Any]:
"""
Returns all values in the response which aren't exceptions.
"""
return [i for i in self.results if not isinstance(i, Exception)]
@cached_property
def success_count(self) -> int:
return len(self.successes)
###########################################################################
def compound_exception(self) -> t.Optional[CompoundException]:
"""
Create a single exception which combines all of the exceptions.
A function instead of a property to leave room for some extra args
in the future.
raise gathered_response.compound_exception()
"""
if not self.exceptions:
return False
return CompoundException(self.exceptions)
async def gather(*coroutines: t.Sequence[t.Coroutine]) -> GatheredResults:
"""
A wrapper on top of asyncio.gather which makes handling the results
easier.
"""
results = await asyncio.gather(*coroutines, return_exceptions=True)
return GatheredResults(results)