-
Notifications
You must be signed in to change notification settings - Fork 0
/
discoverer.py
96 lines (71 loc) · 3.24 KB
/
discoverer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
import socket
import threading
import time
import logging
from typing import Literal
class Discoverer:
def __init__(self,
id: str,
type_: Literal['ftp', 'coordinator'],
addr: tuple[str,int],
udp_broadcast_port=16000,
reciving_port=16001) -> None:
# node properties
self.id = id
self.host=addr[0]
self.port=addr[1]
self.type=type_
# discoverer ports
self.udp_port=udp_broadcast_port
self.reciving_port= reciving_port
# discovered nodes info
self.ftp_table: dict[str,tuple[str,int]] = {}
self.coordinator_table: dict[str,tuple[str,int]] = {}
def udp_listener(self):
# broadcast messages reciver
logging.info("Starting Discoverer Broadcast Listener")
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as udp_socket:
udp_socket.bind(('',self.udp_port))
while True:
message, addr=udp_socket.recvfrom(1024)
if addr[0] == self.host:
continue
command, host, port= message.decode().split()
#logging.debug(f"Recibed Broadcast message '{command}' from '{host}'")
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.connect((host, int(port)))
sock.send(f'{self.id} {self.type} {self.port}'.encode())
def send_identify_broadcast(self):
logging.debug("Broadcasting discover message!")
message = f"IDENTIFY {self.host} {self.reciving_port}"
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as udp_socket:
udp_socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
udp_socket.settimeout(1.0)
udp_socket.sendto(message.encode(), ('<broadcast>', self.udp_port))
#udp_socket.sendto(message.encode(), ('255.255.255.0', self.udp_port))
def register_listener(self):
logging.info("Starting Discoverer Listener")
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.bind((self.host, self.reciving_port))
sock.listen()
while True:
conn, addr=sock.accept()
id,type_,port =conn.recv(1024).decode().split()
if type_ == 'ftp':
table_to_use = self.ftp_table
elif type_ == 'coordinator':
table_to_use = self.coordinator_table
else:
logging.warning(f"Wrong type recived by discover ({type_})")
continue
table_to_use[id] = (addr[0],int(port))
#logging.debug(f"Registered addr {table_to_use[id]} as {id}")
def start_discovering(self):
threading.Thread(target=self.udp_listener).start()
threading.Thread(target=self.register_listener).start()
logging.info("Starting Discoverer!")
# print("Listening ports")
# id, host, port = input('id:'), input('host:'), int(input('port:'))
# threading.Thread(target=udp_listener, args=(id, host, port)).start()
# threading.Thread(target=register_listener, args=(host, port)).start()
# broadcast_sender(host, port)