Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

set createTime per node and purge old nodes if maxNodes is reached #57

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 87 additions & 22 deletions dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ type Config struct {
// If true, the node will read the routing table from disk on startup and save routing
// table snapshots on disk every few minutes. Default value: true.
SaveRoutingTable bool
// do not reply to any incoming queries
PassivMode bool
// How often to save the routing table to disk. Default value: 5 minutes.
SavePeriod time.Duration
// Maximum packets per second to be processed. Disabled if negative. Default value: 100.
Expand All @@ -93,6 +95,10 @@ type Config struct {
ThrottlerTrackedClients int64
//Protocol for UDP connections, udp4= IPv4, udp6 = IPv6
UDPProto string
// max get_peer requests per hash to prevent infinity loop
MaxSearchQueries int
// number of concurrent listeners on same port
ConnPoolSize int
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this will work? The code is not safe for concurrent use, right? If you want to use multiple goroutines you need different DHT instances

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as mentioned by email I broke my fork by commiting testing code and have no clue how to revert

}

// Creates a *Config populated with default values.
Expand All @@ -105,13 +111,16 @@ func NewConfig() *Config {
MaxNodes: 500,
CleanupPeriod: 15 * time.Minute,
SaveRoutingTable: true,
PassivMode: false,
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is redundant with the RateLimit, right?

(In English we would spell it Passive, I think.)

SavePeriod: 5 * time.Minute,
RateLimit: 100,
MaxInfoHashes: 2048,
MaxInfoHashPeers: 256,
ClientPerMinuteLimit: 50,
ThrottlerTrackedClients: 1000,
UDPProto: "udp4",
MaxSearchQueries: 1000,
ConnPoolSize: 1,
}
}

Expand Down Expand Up @@ -149,7 +158,7 @@ type DHT struct {
config Config
routingTable *routingTable
peerStore *peerStore
conn *net.UDPConn
conn []*net.UDPConn
Logger Logger
exploredNeighborhood bool
remoteNodeAcquaintance chan string
Expand Down Expand Up @@ -180,7 +189,7 @@ func New(config *Config) (node *DHT, err error) {
node = &DHT{
config: cfg,
routingTable: newRoutingTable(),
peerStore: newPeerStore(cfg.MaxInfoHashes, cfg.MaxInfoHashPeers),
peerStore: newPeerStore(cfg.MaxInfoHashes, cfg.MaxInfoHashPeers, cfg.MaxNodes),
PeersRequestResults: make(chan map[InfoHash][]string, 1),
stop: make(chan bool),
exploredNeighborhood: false,
Expand Down Expand Up @@ -339,14 +348,18 @@ func (d *DHT) Run() error {
// initSocket initializes the udp socket
// listening to incoming dht requests
func (d *DHT) initSocket() (err error) {
d.conn, err = listen(d.config.Address, d.config.Port, d.config.UDPProto)
if err != nil {
return err
}
for i := 1; i <= d.config.ConnPoolSize ; i++ {
//d.conn[i], err = listen(d.config.Address, d.config.Port, d.config.UDPProto)
conn, err := listen(d.config.Address, d.config.Port, d.config.UDPProto)
if err != nil {
return err
}

// Update the stored port number in case it was set 0, meaning it was
// set automatically by the system
d.config.Port = d.conn.LocalAddr().(*net.UDPAddr).Port
d.conn = append(d.conn,conn)
// Update the stored port number in case it was set 0, meaning it was
// set automatically by the system
d.config.Port = conn.LocalAddr().(*net.UDPAddr).Port
}
return nil
}

Expand All @@ -365,26 +378,50 @@ func (d *DHT) bootstrap() {
d.getMorePeers(nil)
}

func (d *DHT) chanProcessPacket (socketChan chan packetType, bytesArena arena,i int) {
fmt.Printf("chanProcessPacket %d\n",i);
for p := range socketChan {
//for {
//select {
//case p := <-socketChan:
fmt.Printf("process pakcet %d\n",i);
totalRecv.Add(1)
d.processPacket(p)
bytesArena.Push(p.b)
//}
}
}

// loop is the main working section of dht.
// It bootstraps a routing table, if necessary,
// and listens for incoming DHT requests until d.Stop()
// is called from another go routine.
func (d *DHT) loop() {
// Close socket
defer d.conn.Close()
for _, conn:= range d.conn {
defer conn.Close()
}

// There is goroutine pushing and one popping items out of the arena.
// One passes work to the other. So there is little contention in the
// arena, so it doesn't need many items (it used to have 500!). If
// readFromSocket or the packet processing ever need to be
// parallelized, this would have to be bumped.
bytesArena := newArena(maxUDPPacketSize, 3)
socketChan := make(chan packetType)
d.wg.Add(1)
go func() {
defer d.wg.Done()
readFromSocket(d.conn, socketChan, bytesArena, d.stop)
}()
bytesArena := newArena(maxUDPPacketSize, 3*d.config.ConnPoolSize)
//socketChan := make(chan packetType,10000)
var socketChan [100]chan packetType
d.wg.Add(d.config.ConnPoolSize)
for i, conn := range d.conn {
socketChan[i] = make(chan packetType)
go func(i int) {
defer d.wg.Done()
readFromSocket(conn, socketChan[i], bytesArena, d.stop, i)
}(i)
go func(i int) {
defer d.wg.Done()
d.chanProcessPacket(socketChan[i], bytesArena, i)
}(i)
}

d.bootstrap()

Expand All @@ -403,7 +440,7 @@ func (d *DHT) loop() {
log.Warning("rate limiting disabled")
} else {
// Token bucket for limiting the number of packets per second.
fillTokenBucket = time.Tick(time.Second / 10)
//fillTokenBucket = time.Tick(time.Second / 10)
if d.config.RateLimit > 0 && d.config.RateLimit < 10 {
// Less than 10 leads to rounding problems.
d.config.RateLimit = 10
Expand Down Expand Up @@ -458,10 +495,12 @@ func (d *DHT) loop() {
break L
}
}
d.routingTable.Lock()
for ih, _ := range m {
d.findNode(string(ih))
}

d.routingTable.Unlock()
/*
case p := <-socketChan:
totalRecv.Add(1)
if d.config.RateLimit > 0 {
Expand All @@ -476,12 +515,13 @@ func (d *DHT) loop() {
d.processPacket(p)
}
bytesArena.Push(p.b)

*/
case <-fillTokenBucket:
if tokenBucket < d.config.RateLimit {
tokenBucket += d.config.RateLimit / 10
}
case <-cleanupTicker:
d.routingTable.Lock()
needPing := d.routingTable.cleanup(d.config.CleanupPeriod, d.peerStore)
d.wg.Add(1)
go func() {
Expand All @@ -491,18 +531,21 @@ func (d *DHT) loop() {
if d.needMoreNodes() {
d.bootstrap()
}
d.routingTable.Unlock()
case node := <-d.pingRequest:
d.pingNode(node)
case <-secretRotateTicker:
d.tokenSecrets = []string{newTokenSecret(), d.tokenSecrets[0]}
case d.portRequest <- d.config.Port:
continue
case <-saveTicker:
d.routingTable.Lock()
tbl := d.routingTable.reachableNodes()
if len(tbl) > 5 {
d.store.Remotes = tbl
saveStore(*d.store)
}
d.routingTable.Unlock()
}
}
}
Expand All @@ -512,6 +555,10 @@ func (d *DHT) needMoreNodes() bool {
return n < minNodes || n*2 < d.config.MaxNodes
}

func (d *DHT) GetNumNodes() int {
return d.routingTable.numNodes()
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need this? I don't think we do and I think it's better to only expose methods when we really need to. Besides, I think this isn't safe to be used concurrently while the DHT is running?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree. was just usefull for my debuging.

}

func (d *DHT) needMorePeers(ih InfoHash) bool {
return d.peerStore.alive(ih) < d.config.NumTargetPeers
}
Expand Down Expand Up @@ -563,9 +610,11 @@ func (d *DHT) processPacket(p packetType) {
}
r, err := readResponse(p)
if err != nil {
log.Warningf("DHT: readResponse Error: %v, %q", err, string(p.b))
//log.Warningf("DHT: readResponse Error: %v, %q", err, string(p.b))
return
}
d.routingTable.Lock()
defer d.routingTable.Unlock()
switch {
// Response.
case r.Y == "r":
Expand Down Expand Up @@ -651,6 +700,10 @@ func (d *DHT) processPacket(p packetType) {
d.ping(addr)
}
}
// don't reply to any queries if in passivMode
if d.config.PassivMode {
return
}
log.V(5).Infof("DHT processing %v request", r.Q)
switch r.Q {
case "ping":
Expand Down Expand Up @@ -680,10 +733,12 @@ func (d *DHT) ping(address string) {

func (d *DHT) pingNode(r *remoteNode) {
log.V(3).Infof("DHT: ping => %+v", r.address)
t := r.newQuery("ping")
r.Lock()
t := r.newQuery("pine")

queryArguments := map[string]interface{}{"id": d.nodeId}
query := queryMessage{t, "q", "ping", queryArguments}
r.Unlock()
sendMsg(d.conn, r.address, query)
totalSentPing.Add(1)
}
Expand All @@ -692,7 +747,12 @@ func (d *DHT) getPeersFrom(r *remoteNode, ih InfoHash) {
if r == nil {
return
}
r.Lock()
totalSentGetPeers.Add(1)
cnt := d.peerStore.addSearchCount(ih)
if d.config.MaxSearchQueries > 0 && cnt > d.config.MaxSearchQueries {
return
}
ty := "get_peers"
transId := r.newQuery(ty)
if _, ok := r.pendingQueries[transId]; ok {
Expand All @@ -710,13 +770,15 @@ func (d *DHT) getPeersFrom(r *remoteNode, ih InfoHash) {
log.V(3).Infof("DHT sending get_peers. nodeID: %x@%v, InfoHash: %x , distance: %x", r.id, r.address, ih, x)
}
r.lastSearchTime = time.Now()
r.Unlock()
sendMsg(d.conn, r.address, query)
}

func (d *DHT) findNodeFrom(r *remoteNode, id string) {
if r == nil {
return
}
r.Lock()
totalSentFindNode.Add(1)
ty := "find_node"
transId := r.newQuery(ty)
Expand All @@ -737,6 +799,7 @@ func (d *DHT) findNodeFrom(r *remoteNode, id string) {
log.V(3).Infof("DHT sending find_node. nodeID: %x@%v, target ID: %x , distance: %x", r.id, r.address, id, x)
}
r.lastSearchTime = time.Now()
r.Unlock()
sendMsg(d.conn, r.address, query)
}

Expand All @@ -751,7 +814,9 @@ func (d *DHT) announcePeer(address net.UDPAddr, ih InfoHash, token string) {
}
ty := "announce_peer"
log.V(3).Infof("DHT: announce_peer => address: %v, ih: %x, token: %x", address, ih, token)
r.Lock()
transId := r.newQuery(ty)
r.Unlock()
queryArguments := map[string]interface{}{
"id": d.nodeId,
"info_hash": ih,
Expand Down
6 changes: 3 additions & 3 deletions dht_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func ExampleDHT() {
return
}

infoHash, err := DecodeInfoHash("d1c5676ae7ac98e8b19f63565905105e3c4c37a2")
infoHash, err := DecodeInfoHash("c3c5fe05c329ae51c6eca464f6b30ba0a457b2ca")
if err != nil {
fmt.Printf("DecodeInfoHash faiure: %v", err)
return
Expand Down Expand Up @@ -63,7 +63,7 @@ M:
// fmt.Println(DecodePeerAddress(peer))
//}

if fmt.Sprintf("%x", ih) == "d1c5676ae7ac98e8b19f63565905105e3c4c37a2" {
if fmt.Sprintf("%x", ih) == "c3c5fe05c329ae51c6eca464f6b30ba0a457b2ca" {
fmt.Println("Peer found for the requested infohash or the test was skipped")
return
}
Expand Down Expand Up @@ -122,7 +122,7 @@ func TestDHTLocal(t *testing.T) {
return
}
searchRetryPeriod = time.Second
infoHash, err := DecodeInfoHash("d1c5676ae7ac98e8b19f63565905105e3c4c37a2")
infoHash, err := DecodeInfoHash("c3c5fe05c329ae51c6eca464f6b30ba0a457b2ca")
if err != nil {
t.Fatalf(err.Error())
}
Expand Down
Loading