-
Notifications
You must be signed in to change notification settings - Fork 0
/
parallel_runner.py
59 lines (51 loc) · 1.74 KB
/
parallel_runner.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
"""
This file details a method that can be used to fire multiple client requests parallely to the same server.
"""
import numpy as np
from concurrent import futures
from client_dynamo import client_put, client_get_memory
import time
import random
import concurrent
def parallel_runner_old(num_tasks=4):
s = time.time()
executor = futures.ThreadPoolExecutor(max_workers=1)
fut = set([])
ports = [2333,2334,2335,2336]
for i in range(1000):
key_val = random.randint(0,7) # assuming key space is of size 8
port = ports[random.randint(0,3)]
fut.add(executor.submit(client_get_memory, port))
done, not_done = futures.wait(fut)
e = time.time()
print(f"Time taken : {e - s} secs")
print(f"Pending requests {not_done}")
def timed(func):
def _w(*a, **k):
then = time.time()
res = func(*a, **k)
elapsed = time.time() - then
return elapsed, res
return _w
def run_parallel(requests, requests_params, key=1, val="1", start_port=2333, as_np=True):
# start = time.time()
executor = futures.ThreadPoolExecutor(max_workers=1)
fut = set([])
for request, request_params in zip(requests, requests_params):
fut.add(executor.submit(timed(request), **request_params))
durations = []
responses = []
try:
for it in futures.as_completed(fut):
if not it.exception():
duration, response = it.result()
durations.append(duration)
responses.append(response)
else:
print('exception in future:', it.exception())
except concurrent.futures.TimeoutError:
print('timeout')
pass
if as_np:
durations = np.array(durations)
return durations, responses