Skip to content

Commit

Permalink
update Network, Connections and Errors to include aggregated overhead
Browse files Browse the repository at this point in the history
  • Loading branch information
cortze committed Aug 25, 2023
1 parent 54bca40 commit 77335ef
Showing 1 changed file with 124 additions and 50 deletions.
174 changes: 124 additions & 50 deletions dht/dht.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ def __init__(self, nodeID: int, errorTime):
self.time = errorTime

def description(self) -> str:
return "node not in node-store"
return "node_not_found"

class NodeStore():
""" Storage unit of all the available nodes in the network, or that a client saw """
Expand Down Expand Up @@ -245,61 +245,105 @@ def len(self):

class ConnectionError(Exception):
""" custom connection error exection to notify an errored connection """
def __init__(self, nodeid: int, error, time, d):
self.erroredNode = nodeid
def __init__(self, err_id: int, f: int, to: int, error: str, delay, gamma: int):
self.error_id = err_id
self.f = f
self.to = to
self.error = error
self.errorTime = time
self.delay = 0 # ms
if (error == "fast error") and (d is not None):
self.delay = random.sample(d, 1)[0]
elif (error == "slow error") and (d is not None):
self.delay = d
self.time = time.time()
self.delay = delay
self.overhead = gamma

def description(self) -> str:
return f"unable to connect node {self.erroredNode}. {self.error}"
return f"unable to connect node {self.to} from {self.f}. {self.error}"

def get_delay(self):
return self.error, self.delay
return self.error, self.delay+self.overhead

def summary(self):
return {
'id': self.error_id,
'time': self.time,
'from': self.f,
'to': self.to,
'error': self.error,
'delay': self.delay,
'overhead': self.overhead}

class Connection():
""" connection simbolizes the interaction that 2 DHTClients could have with eachother """
def __init__(self, connid: int, f: int, to: DHTClient, delayrange):
self.connid = connid
""" connection simbolizes the interaction that 2 DHTClients could have with each other """
def __init__(self, conn_id: int, f: int, to: DHTClient, delay, overhead):
self.conn_id = conn_id
self.time = time.time()
self.f = f
self.to = to
if delayrange is not None:
self.delay = random.sample(delayrange, 1)[0]
else:
self.delay = 0 # ms
self.delay = delay
self.overhead = overhead
self.final_delay = delay + overhead

def get_closest_nodes_to(self, key: Hash):
closernodes, val, ok = self.to.get_closest_nodes_to(key)
return closernodes, val, ok, self.delay
closer_nodes, val, ok = self.to.get_closest_nodes_to(key)
return closer_nodes, val, ok, self.final_delay

def store_segment(self, segment):
self.to.store_segment(segment)
return self.delay
return self.final_delay

def retrieve_segment(self, key: Hash):
seg, ok = self.to.retrieve_segment(key)
return seg, ok, self.delay
return seg, ok, self.final_delay

def summary(self):
return {
'id': self.conn_id,
'time': self.time,
'from': self.f,
'to': self.to,
'error': "None",
'delay': self.delay,
'overhead': self.overhead,
}

class OverheadTracker:
"""keeps tracks of the overhead for each node in the network, which will be increased
after a connection is established. This overhead will be added to each of the operations
until the overhead is reset (finishing the end of the concurren operations)"""
def __init__(self, gamma_overhead: int):
self.gamma_overhead = gamma_overhead
self.nodes = defaultdict(int)

def get_overhead_for_node(self, node_id: int):
"""returns and increases the overhead for the given peer"""
try:
overhead = self.nodes[node_id] + self.gamma_overhead
except KeyError:
overhead = 0
self.nodes[node_id] = overhead
return overhead

def reset_overhead_for_node(self, node_id: int):
del self.nodes[node_id]

def reset_overheads(self):
self.nodes = defaultdict(int)


class DHTNetwork:
""" serves a the shared point between all the nodes participating in the simulation,
allows node to communicat with eachother without needing to implement an API or similar"""

def __init__(self, networkid: int, fasterrorrate: int=0, slowerrorrate: int=0, conndelayrange = None, fastdelayrange = None, slowdelay = None):
def __init__(self, networkid: int, fasterrorrate: int=0, slowerrorrate: int=0, conndelayrange = None, fastdelayrange = None, slowdelayrange = None, gammaoverhead: int = 0):
""" class initializer, it allows to define the networkID and the delays between nodes """
self.networkid = networkid
self.fasterrorrate = fasterrorrate # %
self.slowerrorrate = slowerrorrate # %
self.conndelatrange = conndelayrange
self.fastdelayrange = fastdelayrange # list() in ms -> i.e., (5, 100) ms | None
self.slowdelay = slowdelay # timeot delay
self.conn_delay_range = conndelayrange
self.fast_delay_range = fastdelayrange # list() in ms -> i.e., (5, 100) ms | None
self.slow_delay_range = slowdelayrange # list() in ms -> i.e., (5, 100) ms | None
self.nodestore = NodeStore()
self.errortracker = deque() # every time that an error is tracked, add it to the queue
self.connectiontracker = deque() # every time that a connection was stablished
self.error_tracker = deque() # every time that an error is tracked, add it to the queue
self.connection_tracker = deque() # every time that a connection was established
self.connection_overheads = OverheadTracker(gammaoverhead)
self.connectioncnt = 0

def get_closest_nodes_to_hash(self, target: Hash, beta):
Expand Down Expand Up @@ -379,32 +423,34 @@ def connect_to_node(self, ognode: int, targetnode: int):
""" get connection to the DHTclient target from the PeerStore
and an associated delay or raise an error """
self.connectioncnt += 1
def get_delay_from_range(range):
if range == None:
delay = 0
else:
delay = random.sample(range, 1)[0]
return delay
conn_delay = get_delay_from_range(self.conn_delay_range)
fast_delay = get_delay_from_range(self.fast_delay_range)
slow_delay = get_delay_from_range(self.slow_delay_range)
overhead = self.connection_overheads.get_overhead_for_node(ognode)
try:
# check the error rate (avoid stablishing the connection if there is an error)
if random.randint(0, 99) < self.fasterrorrate:
connerror = ConnectionError(targetnode, "fast error", time.time(), self.fastdelayrange)
self.errortracker.append(connerror)
raise connerror
conn_error = ConnectionError(self.connectioncnt, ognode, targetnode, "fast", fast_delay, overhead)
self.error_tracker.append(conn_error.summary())
raise conn_error
if random.randint(0, 99) < self.slowerrorrate:
connerror = ConnectionError(targetnode, "slow error", time.time(), self.slowdelay)
self.errortracker.append(connerror)
raise connerror
connection = Connection(self.connectioncnt, ognode, self.nodestore.get_node(targetnode), self.conndelatrange)
self.connectiontracker.append({
'time': time.time(),
'from': ognode,
'to': targetnode,
'delay': connection.delay})
conn_error = ConnectionError(self.connectioncnt, ognode, targetnode, "slow", slow_delay, overhead)
self.error_tracker.append(conn_error.summary())
raise conn_error
connection = Connection(self.connectioncnt, ognode, self.nodestore.get_node(targetnode), conn_delay, overhead)
self.connection_tracker.append(connection.summary())
return connection, connection.delay

except NodeNotInStoreError as e:
connerror = ConnectionError(e.missingNode, e.description, e.time, 0)
self.errortracker.append({
'time': connerror.errorTime,
'from': ognode,
'error': connerror.description,
'delay': 0})
raise connerror
except NodeNotInStoreError:
conn_error = ConnectionError(self.connectioncnt, ognode, targetnode, "node_not_found", slow_delay, overhead)
self.error_tracker.append(conn_error.summary())
raise conn_error

def bootstrap_node(self, nodeid: int, bucketsize: int): # ( accuracy: int = 100 )
""" checks among all the existing nodes in the network, which are the correct ones to
Expand All @@ -419,13 +465,41 @@ def bootstrap_node(self, nodeid: int, bucketsize: int): # ( accuracy: int = 100
rt.new_discovered_peer(node)
return rt.get_routing_nodes()

def reset_network_metrics(self):
"""reset the connection tracker and the overhead, emulates the end of concurrent operations"""
self.error_tracker = deque()
self.connection_tracker = deque()
self.connection_overheads.reset_overheads()

def summary(self):
""" return the summary of what happened in the network """
return {
'total_nodes': self.nodestore.len(),
'attempts': self.connectioncnt,
'successful': len(self.connectiontracker),
'failures': len(self.errortracker)}
'successful': len(self.connection_tracker),
'failures': len(self.error_tracker)}

def concurrent_network_details(self):
"""aggregate all the connection and errors into a single dict -> easily translatable to panda.df"""
network_details = {
'conn_id': [],
'time': [],
'from': [],
'to': [],
'error': [],
'delay': [],
'overhead': []
}
for conn in (self.connection_tracker + self.error_tracker):
network_details['conn_id'].append(conn['id'])
network_details['time'].append(conn['time'])
network_details['from'].append(conn['from'])
network_details['to'].append(conn['to'])
network_details['error'].append(conn['error'])
network_details['delay'].append(conn['delay'])
network_details['overhead'].append(conn['overhead'])
return network_details


def len(self) -> int:
return self.nodestore.len()
Expand Down

0 comments on commit 77335ef

Please sign in to comment.