diff --git a/dht/dht.py b/dht/dht.py index d01ff5b..146259a 100644 --- a/dht/dht.py +++ b/dht/dht.py @@ -62,6 +62,7 @@ def has_closer_nodes(prev, new): continue return False + base_overhead = self.network.connection_overheads.get_overhead_for_node(self.ID) closestnodes = self.rt.get_closest_nodes_to(key) nodestotry = closestnodes.copy() triednodes = deque() @@ -85,20 +86,21 @@ def has_closer_nodes(prev, new): lookupsummary['connectionAttempts'] += 1 try: - connection, conndelay = self.network.connect_to_node(self.ID, node) + connection, conndelay = self.network.connect_to_node(self.ID, node, base_overhead) newnodes, val, _, closestdelay = connection.get_closest_nodes_to(key) # we only want to aggregate the difference between the base + conn delay - the already aggregated one # this allows to simulate de delay of a proper scheduler operationdelay = (conndelay + closestdelay) if len(alpha_results) < self.alpha: - alpha_results.append((operationdelay, newnodes, val)) + alpha_results.append((operationdelay, newnodes, val, base_overhead)) alpha_results = deque(sorted(alpha_results, key=lambda pair: pair[0])) else: print("huge error here") except ConnectionError as e: - errortype, errordelay = e.get_delay() - alpha_results.append((errordelay, {}, "")) + errortype = e.error_type() + errordelay = e.get_delay() + alpha_results.append((errordelay, {}, "", base_overhead)) alpha_results = deque(sorted(alpha_results, key=lambda pair: pair[0])) # check if the concurrency array is full @@ -168,33 +170,37 @@ def get_closest_nodes_to(self, key: Hash): val, ok = self.ks.read(key) return closernodes, val, ok - def provide_block_segment(self, segment) -> dict: + def provide_block_segment(self, segment): """ looks for the closest nodes in the network, and sends them a """ providesummary = { 'succesNodeIDs': deque(), 'failedNodeIDs': deque(), 'startTime': time.time(), - 'aggrDelay': 0, } segH = Hash(segment) closestnodes, _, lookupsummary, lookupdelay = self.lookup_for_hash(segH, finishwithfirstvalue=False) - providesummary['aggrDelay'] += lookupdelay + provAggrDelay = [] for cn in closestnodes: + overhead = self.network.connection_overheads.get_overhead_for_node(cn) try: - connection, conndelay = self.network.connect_to_node(self.ID, cn) - providesummary['aggrDelay'] += conndelay + connection, conndelay = self.network.connect_to_node(self.ID, cn, overhead) storedelay = connection.store_segment(segment) - providesummary['aggrDelay'] += storedelay + provAggrDelay.append(conndelay+storedelay) providesummary['succesNodeIDs'].append(cn) - except ConnectionError: + except ConnectionError as e: providesummary['failedNodeIDs'].append(cn) + provAggrDelay.append(e.get_delay()) + provideDelay = max(provAggrDelay) providesummary.update({ + 'contactedPeers': lookupsummary['connectionAttempts'], 'closestNodes': closestnodes.keys(), 'finishTime': time.time(), - 'contactedPeers': lookupsummary['connectionAttempts'], + 'lookupDelay': lookupdelay, + 'provideDelay': provideDelay, + 'operationDelay': lookupdelay+provideDelay, }) - return providesummary, providesummary['aggrDelay'] + return providesummary, providesummary['operationDelay'] def store_segment(self, segment): segH = Hash(segment) @@ -245,7 +251,7 @@ def len(self): class ConnectionError(Exception): """ custom connection error exection to notify an errored connection """ - def __init__(self, err_id: int, f: int, to: int, error: str, delay, gamma: int): + def __init__(self, err_id: int, f: int, to: int, error: str, delay, gamma): self.error_id = err_id self.f = f self.to = to @@ -258,7 +264,10 @@ def description(self) -> str: return f"unable to connect node {self.to} from {self.f}. {self.error}" def get_delay(self): - return self.error, self.delay+self.overhead + return self.delay+self.overhead + + def error_type(self): + return self.error def summary(self): return { @@ -298,7 +307,7 @@ def summary(self): 'id': self.conn_id, 'time': self.time, 'from': self.f, - 'to': self.to, + 'to': self.to.ID, 'error': "None", 'delay': self.delay, 'overhead': self.overhead, @@ -308,7 +317,7 @@ class OverheadTracker: """keeps tracks of the overhead for each node in the network, which will be increased after a connection is established. This overhead will be added to each of the operations until the overhead is reset (finishing the end of the concurren operations)""" - def __init__(self, gamma_overhead: int): + def __init__(self, gamma_overhead: float): self.gamma_overhead = gamma_overhead self.nodes = defaultdict(int) @@ -332,7 +341,7 @@ class DHTNetwork: """ serves a the shared point between all the nodes participating in the simulation, allows node to communicat with eachother without needing to implement an API or similar""" - def __init__(self, networkid: int, fasterrorrate: int=0, slowerrorrate: int=0, conndelayrange = None, fastdelayrange = None, slowdelayrange = None, gammaoverhead: int = 0): + def __init__(self, networkid: int, fasterrorrate: int=0, slowerrorrate: int=0, conndelayrange = None, fastdelayrange = None, slowdelayrange = None, gammaoverhead: float = 0.0): """ class initializer, it allows to define the networkID and the delays between nodes """ self.networkid = networkid self.fasterrorrate = fasterrorrate # % @@ -419,7 +428,7 @@ def add_new_node(self, newnode: DHTClient): """ add a new node to the DHT network """ self.nodestore.add_node(newnode) - def connect_to_node(self, ognode: int, targetnode: int): + def connect_to_node(self, ognode: int, targetnode: int, overhead: float = 0.0): """ get connection to the DHTclient target from the PeerStore and an associated delay or raise an error """ self.connectioncnt += 1 @@ -432,7 +441,6 @@ def get_delay_from_range(range): conn_delay = get_delay_from_range(self.conn_delay_range) fast_delay = get_delay_from_range(self.fast_delay_range) slow_delay = get_delay_from_range(self.slow_delay_range) - overhead = self.connection_overheads.get_overhead_for_node(ognode) try: # check the error rate (avoid stablishing the connection if there is an error) if random.randint(0, 99) < self.fasterrorrate: @@ -479,9 +487,9 @@ def summary(self): 'successful': len(self.connection_tracker), 'failures': len(self.error_tracker)} - def concurrent_network_details(self): + def connection_metrics(self): """aggregate all the connection and errors into a single dict -> easily translatable to panda.df""" - network_details = { + network_metrics = { 'conn_id': [], 'time': [], 'from': [], @@ -491,14 +499,14 @@ def concurrent_network_details(self): 'overhead': [] } for conn in (self.connection_tracker + self.error_tracker): - network_details['conn_id'].append(conn['id']) - network_details['time'].append(conn['time']) - network_details['from'].append(conn['from']) - network_details['to'].append(conn['to']) - network_details['error'].append(conn['error']) - network_details['delay'].append(conn['delay']) - network_details['overhead'].append(conn['overhead']) - return network_details + network_metrics['conn_id'].append(conn['id']) + network_metrics['time'].append(conn['time']) + network_metrics['from'].append(conn['from']) + network_metrics['to'].append(conn['to']) + network_metrics['error'].append(conn['error']) + network_metrics['delay'].append(conn['delay']) + network_metrics['overhead'].append(conn['overhead']) + return network_metrics def len(self) -> int: diff --git a/tests/test_network.py b/tests/test_network.py index 4b7a44c..4992cf3 100644 --- a/tests/test_network.py +++ b/tests/test_network.py @@ -477,7 +477,7 @@ def test_gamma_overhead(self): a = 1 b = k stepstop = 3 - overhead = 1 + overhead = 0.250 network = DHTNetwork( networkid=netid, gammaoverhead=overhead, @@ -500,9 +500,8 @@ def test_gamma_overhead(self): closestnodes, val, summary, aggrdelay = inode.lookup_for_hash(key=segH) self.assertEqual(randomSegment, val) - supossed_overhead = 0 - for i in range(summary['successfulCons']+1): - supossed_overhead += i*overhead + supossed_overhead = summary['successfulCons']*overhead + print(supossed_overhead, aggrdelay) self.assertEqual(aggrdelay, supossed_overhead)