-
Notifications
You must be signed in to change notification settings - Fork 7
/
tests_twisted.py
135 lines (109 loc) · 3.71 KB
/
tests_twisted.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
"""
With Twisted available, test optional Twisted support in smokesignal
"""
from twisted.internet import reactor, defer, task
from twisted.trial import unittest
from mock import patch
import smokesignal
class TestTwisted(unittest.TestCase):
def _emit(self, expectSuccess=None, expectFailure=None):
"""
Fire a signal and compare results to expectations
"""
assert (not expectSuccess) != (not expectFailure), "pass either expectSuccess or expectFailure"
if expectFailure:
def checkFailure(f):
assert f.type is expectFailure
d = smokesignal.emit('hello', 'world', errback=checkFailure)
else:
d = smokesignal.emit('hello', 'world')
def check(r):
assert len(r) == 1
assert r[0] == expectSuccess
return d.addCallback(check)
def test_max_calls(self):
"""
Do I decrement _max_calls immediately when the function is called,
even when I'm passing through a reactor loop?
Discussion: In a naive implementation, max_calls was handled when the
function was called, but we use maybeDeferred, which passes the call
through a reactor loop.
max_calls needs to be decremented before it passes through the
reactor, lest we call it multiple times per loop because the decrement
hasn't happened yet.
"""
def cb():
return 19
smokesignal.on('19', cb, max_calls=19)
d1 = smokesignal.emit('19')
assert list(smokesignal.receivers['19'])[0]._max_calls == 18
d2 = smokesignal.emit('19')
assert list(smokesignal.receivers['19'])[0]._max_calls == 17
return defer.DeferredList([d1, d2])
def test_synchronous(self):
"""
Blocking callbacks should simply fire the deferred
"""
smokesignal.once('hello', synchronous)
return self._emit(expectSuccess='synchronous done')
def test_asynchronous(self):
"""
Callbacks that return deferred should be resolved
"""
smokesignal.once('hello', asynchronous)
return self._emit(expectSuccess='asynchronous done')
def test_synchronous_failure(self):
"""
Callbacks that raise an exception without returning a deferred should
trigger errbacks.
"""
smokesignal.once('hello', synchronous_failure)
return self._emit(expectFailure=ZeroDivisionError)
def test_asynchronous_failure(self):
"""
Callbacks that raise an exception inside a deferred should trigger
errbacks.
"""
smokesignal.once('hello', asynchronous_failure)
return self._emit(expectFailure=KeyError)
def test_delay(self):
"""
Similar to test_asynchronous but use deferLater
"""
smokesignal.once('hello', delay)
clock = task.Clock()
pCallLater = patch.object(reactor, 'callLater', clock.callLater)
with pCallLater:
r = self._emit(expectSuccess='delay done')
clock.advance(20)
return r
def synchronous(target):
"""
Return synchronously
"""
assert target == 'world'
return 'synchronous done'
def asynchronous(target):
"""
Return async simple value immediately
"""
return defer.succeed('asynchronous done')
def synchronous_failure(target):
"""
Fail synchronously
"""
1/0
def asynchronous_failure(target):
"""
Fail asynchronously
"""
def err():
{}['key_error']
return task.deferLater(reactor, 0, err)
def delay(target):
"""
Succeed after a delay
"""
def done():
return 'delay done'
return task.deferLater(reactor, 15, done)