-
Notifications
You must be signed in to change notification settings - Fork 5
/
httpnn.py
81 lines (68 loc) · 2.38 KB
/
httpnn.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
import asyncio
import aiohttp
import logging
from util import aretry
from threading import Thread, Event
import random
class HTTPNN:
def __init__(self, url, keyprefix):
self.url = url
self.keyprefix = keyprefix
self.locks = {}
def get_lock(self, key):
if key not in self.locks:
self.locks[key] = asyncio.Lock()
return self.locks[key]
async def queued_for_key(self, key):
if self.get_lock(key)._waiters is None:
return 0
return len(self.get_lock(key)._waiters)
@aretry(5)
async def put_(self, key, message):
async with self.client.post(self.url + "put", json={'key': self.keyprefix + ':' + key, 'text': message}) as response:
assert response.status == 200
rj = await response.json()
async def put(self, key, message):
async with self.get_lock(key):
return await self.put_(key, message)
@aretry(5)
async def get_(self, key, bad_words):
async with self.client.post(self.url + 'get', json={'key': self.keyprefix + ':' + key, 'bad_words': bad_words}) as response:
assert response.status == 200
rj = await response.json()
return rj['text']
async def get(self, key, bad_words = []):
async with self.get_lock(key):
return await self.get_(key, bad_words)
def initialize2(self):
self.client = aiohttp.ClientSession(timeout = aiohttp.ClientTimeout(1800))
async def initialize(self):
self.client = aiohttp.ClientSession(loop = self.loop, timeout = aiohttp.ClientTimeout(1800))
async def consume_queue(self):
while True:
try:
coro = await self.queue.get()
await coro
except Exception as e:
logging.getLogger(__file__).exception('Exception in coro')
self.queue.task_done()
def run_from_thread(self, func, *args):
assert self.queue
def f():
self.queue.put_nowait(asyncio.ensure_future(func(*args)))
#print(self.queue.__dict__)
self.loop.call_soon_threadsafe(f)
def run_thread(self):
evt = Event()
def tgt():
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
self.loop.run_until_complete(self.initialize())
self.queue = asyncio.Queue()
evt.set()
self.loop.run_until_complete(self.consume_queue())
logging.getLogger(__file__).critical('Returned from consume_queue()?')
thread = Thread(target=tgt, args=())
thread.daemon = True
thread.start()
evt.wait()