Skip to content

Commit

Permalink
add support to float overheads (enable subsecond overheads)
Browse files Browse the repository at this point in the history
  • Loading branch information
cortze committed Aug 27, 2023
1 parent 97e5119 commit b23ce47
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 34 deletions.
68 changes: 38 additions & 30 deletions dht/dht.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ def has_closer_nodes(prev, new):
continue
return False

base_overhead = self.network.connection_overheads.get_overhead_for_node(self.ID)
closestnodes = self.rt.get_closest_nodes_to(key)
nodestotry = closestnodes.copy()
triednodes = deque()
Expand All @@ -85,20 +86,21 @@ def has_closer_nodes(prev, new):
lookupsummary['connectionAttempts'] += 1

try:
connection, conndelay = self.network.connect_to_node(self.ID, node)
connection, conndelay = self.network.connect_to_node(self.ID, node, base_overhead)
newnodes, val, _, closestdelay = connection.get_closest_nodes_to(key)
# we only want to aggregate the difference between the base + conn delay - the already aggregated one
# this allows to simulate de delay of a proper scheduler
operationdelay = (conndelay + closestdelay)
if len(alpha_results) < self.alpha:
alpha_results.append((operationdelay, newnodes, val))
alpha_results.append((operationdelay, newnodes, val, base_overhead))
alpha_results = deque(sorted(alpha_results, key=lambda pair: pair[0]))
else:
print("huge error here")

except ConnectionError as e:
errortype, errordelay = e.get_delay()
alpha_results.append((errordelay, {}, ""))
errortype = e.error_type()
errordelay = e.get_delay()
alpha_results.append((errordelay, {}, "", base_overhead))
alpha_results = deque(sorted(alpha_results, key=lambda pair: pair[0]))

# check if the concurrency array is full
Expand Down Expand Up @@ -168,33 +170,37 @@ def get_closest_nodes_to(self, key: Hash):
val, ok = self.ks.read(key)
return closernodes, val, ok

def provide_block_segment(self, segment) -> dict:
def provide_block_segment(self, segment):
""" looks for the closest nodes in the network, and sends them a """
providesummary = {
'succesNodeIDs': deque(),
'failedNodeIDs': deque(),
'startTime': time.time(),
'aggrDelay': 0,
}
segH = Hash(segment)
closestnodes, _, lookupsummary, lookupdelay = self.lookup_for_hash(segH, finishwithfirstvalue=False)
providesummary['aggrDelay'] += lookupdelay
provAggrDelay = []
for cn in closestnodes:
overhead = self.network.connection_overheads.get_overhead_for_node(cn)
try:
connection, conndelay = self.network.connect_to_node(self.ID, cn)
providesummary['aggrDelay'] += conndelay
connection, conndelay = self.network.connect_to_node(self.ID, cn, overhead)
storedelay = connection.store_segment(segment)
providesummary['aggrDelay'] += storedelay
provAggrDelay.append(conndelay+storedelay)
providesummary['succesNodeIDs'].append(cn)
except ConnectionError:
except ConnectionError as e:
providesummary['failedNodeIDs'].append(cn)
provAggrDelay.append(e.get_delay())

provideDelay = max(provAggrDelay)
providesummary.update({
'contactedPeers': lookupsummary['connectionAttempts'],
'closestNodes': closestnodes.keys(),
'finishTime': time.time(),
'contactedPeers': lookupsummary['connectionAttempts'],
'lookupDelay': lookupdelay,
'provideDelay': provideDelay,
'operationDelay': lookupdelay+provideDelay,
})
return providesummary, providesummary['aggrDelay']
return providesummary, providesummary['operationDelay']

def store_segment(self, segment):
segH = Hash(segment)
Expand Down Expand Up @@ -245,7 +251,7 @@ def len(self):

class ConnectionError(Exception):
""" custom connection error exection to notify an errored connection """
def __init__(self, err_id: int, f: int, to: int, error: str, delay, gamma: int):
def __init__(self, err_id: int, f: int, to: int, error: str, delay, gamma):
self.error_id = err_id
self.f = f
self.to = to
Expand All @@ -258,7 +264,10 @@ def description(self) -> str:
return f"unable to connect node {self.to} from {self.f}. {self.error}"

def get_delay(self):
return self.error, self.delay+self.overhead
return self.delay+self.overhead

def error_type(self):
return self.error

def summary(self):
return {
Expand Down Expand Up @@ -298,7 +307,7 @@ def summary(self):
'id': self.conn_id,
'time': self.time,
'from': self.f,
'to': self.to,
'to': self.to.ID,
'error': "None",
'delay': self.delay,
'overhead': self.overhead,
Expand All @@ -308,7 +317,7 @@ class OverheadTracker:
"""keeps tracks of the overhead for each node in the network, which will be increased
after a connection is established. This overhead will be added to each of the operations
until the overhead is reset (finishing the end of the concurren operations)"""
def __init__(self, gamma_overhead: int):
def __init__(self, gamma_overhead: float):
self.gamma_overhead = gamma_overhead
self.nodes = defaultdict(int)

Expand All @@ -332,7 +341,7 @@ class DHTNetwork:
""" serves a the shared point between all the nodes participating in the simulation,
allows node to communicat with eachother without needing to implement an API or similar"""

def __init__(self, networkid: int, fasterrorrate: int=0, slowerrorrate: int=0, conndelayrange = None, fastdelayrange = None, slowdelayrange = None, gammaoverhead: int = 0):
def __init__(self, networkid: int, fasterrorrate: int=0, slowerrorrate: int=0, conndelayrange = None, fastdelayrange = None, slowdelayrange = None, gammaoverhead: float = 0.0):
""" class initializer, it allows to define the networkID and the delays between nodes """
self.networkid = networkid
self.fasterrorrate = fasterrorrate # %
Expand Down Expand Up @@ -419,7 +428,7 @@ def add_new_node(self, newnode: DHTClient):
""" add a new node to the DHT network """
self.nodestore.add_node(newnode)

def connect_to_node(self, ognode: int, targetnode: int):
def connect_to_node(self, ognode: int, targetnode: int, overhead: float = 0.0):
""" get connection to the DHTclient target from the PeerStore
and an associated delay or raise an error """
self.connectioncnt += 1
Expand All @@ -432,7 +441,6 @@ def get_delay_from_range(range):
conn_delay = get_delay_from_range(self.conn_delay_range)
fast_delay = get_delay_from_range(self.fast_delay_range)
slow_delay = get_delay_from_range(self.slow_delay_range)
overhead = self.connection_overheads.get_overhead_for_node(ognode)
try:
# check the error rate (avoid stablishing the connection if there is an error)
if random.randint(0, 99) < self.fasterrorrate:
Expand Down Expand Up @@ -479,9 +487,9 @@ def summary(self):
'successful': len(self.connection_tracker),
'failures': len(self.error_tracker)}

def concurrent_network_details(self):
def connection_metrics(self):
"""aggregate all the connection and errors into a single dict -> easily translatable to panda.df"""
network_details = {
network_metrics = {
'conn_id': [],
'time': [],
'from': [],
Expand All @@ -491,14 +499,14 @@ def concurrent_network_details(self):
'overhead': []
}
for conn in (self.connection_tracker + self.error_tracker):
network_details['conn_id'].append(conn['id'])
network_details['time'].append(conn['time'])
network_details['from'].append(conn['from'])
network_details['to'].append(conn['to'])
network_details['error'].append(conn['error'])
network_details['delay'].append(conn['delay'])
network_details['overhead'].append(conn['overhead'])
return network_details
network_metrics['conn_id'].append(conn['id'])
network_metrics['time'].append(conn['time'])
network_metrics['from'].append(conn['from'])
network_metrics['to'].append(conn['to'])
network_metrics['error'].append(conn['error'])
network_metrics['delay'].append(conn['delay'])
network_metrics['overhead'].append(conn['overhead'])
return network_metrics


def len(self) -> int:
Expand Down
7 changes: 3 additions & 4 deletions tests/test_network.py
Original file line number Diff line number Diff line change
Expand Up @@ -477,7 +477,7 @@ def test_gamma_overhead(self):
a = 1
b = k
stepstop = 3
overhead = 1
overhead = 0.250
network = DHTNetwork(
networkid=netid,
gammaoverhead=overhead,
Expand All @@ -500,9 +500,8 @@ def test_gamma_overhead(self):
closestnodes, val, summary, aggrdelay = inode.lookup_for_hash(key=segH)
self.assertEqual(randomSegment, val)

supossed_overhead = 0
for i in range(summary['successfulCons']+1):
supossed_overhead += i*overhead
supossed_overhead = summary['successfulCons']*overhead
print(supossed_overhead, aggrdelay)
self.assertEqual(aggrdelay, supossed_overhead)


Expand Down

0 comments on commit b23ce47

Please sign in to comment.