From 62d459942894027423e372b6b18ca12137da00eb Mon Sep 17 00:00:00 2001 From: mipa123 Date: Mon, 6 Aug 2018 09:45:41 +0200 Subject: [PATCH 1/4] set createTime per node and purge old nodes if maxNodes is reached --- dht.go | 6 +++++- krpc.go | 2 ++ peer_store.go | 4 +++- routing_table.go | 6 ++++++ 4 files changed, 16 insertions(+), 2 deletions(-) diff --git a/dht.go b/dht.go index 5828395..3f4d240 100644 --- a/dht.go +++ b/dht.go @@ -180,7 +180,7 @@ func New(config *Config) (node *DHT, err error) { node = &DHT{ config: cfg, routingTable: newRoutingTable(), - peerStore: newPeerStore(cfg.MaxInfoHashes, cfg.MaxInfoHashPeers), + peerStore: newPeerStore(cfg.MaxInfoHashes, cfg.MaxInfoHashPeers, cfg.MaxNodes), PeersRequestResults: make(chan map[InfoHash][]string, 1), stop: make(chan bool), exploredNeighborhood: false, @@ -512,6 +512,10 @@ func (d *DHT) needMoreNodes() bool { return n < minNodes || n*2 < d.config.MaxNodes } +func (d *DHT) GetNumNodes() int { + return d.routingTable.numNodes() +} + func (d *DHT) needMorePeers(ih InfoHash) bool { return d.peerStore.alive(ih) < d.config.NumTargetPeers } diff --git a/krpc.go b/krpc.go index ddaf365..8613459 100644 --- a/krpc.go +++ b/krpc.go @@ -32,6 +32,7 @@ type remoteNode struct { pendingQueries map[string]*queryType // key: transaction ID pastQueries map[string]*queryType // key: transaction ID reachable bool + createTime time.Time lastResponseTime time.Time lastSearchTime time.Time ActiveDownloads []string // List of infohashes we know this peer is downloading. @@ -44,6 +45,7 @@ func newRemoteNode(addr net.UDPAddr, id string) *remoteNode { lastQueryID: newTransactionId(), id: id, reachable: false, + createTime: time.Now(), pendingQueries: map[string]*queryType{}, pastQueries: map[string]*queryType{}, } diff --git a/peer_store.go b/peer_store.go index e091424..64eb8d7 100644 --- a/peer_store.go +++ b/peer_store.go @@ -124,12 +124,13 @@ func (p *peerContactsSet) Alive() int { return ret } -func newPeerStore(maxInfoHashes, maxInfoHashPeers int) *peerStore { +func newPeerStore(maxInfoHashes, maxInfoHashPeers, maxNodes int) *peerStore { return &peerStore{ infoHashPeers: lru.New(maxInfoHashes), localActiveDownloads: make(map[InfoHash]bool), maxInfoHashes: maxInfoHashes, maxInfoHashPeers: maxInfoHashPeers, + maxNodes: maxNodes, } } @@ -141,6 +142,7 @@ type peerStore struct { localActiveDownloads map[InfoHash]bool maxInfoHashes int maxInfoHashPeers int + maxNodes int } func (h *peerStore) get(ih InfoHash) *peerContactsSet { diff --git a/routing_table.go b/routing_table.go index 9ae40cf..150d8ca 100644 --- a/routing_table.go +++ b/routing_table.go @@ -206,6 +206,12 @@ func (r *routingTable) cleanup(cleanupPeriod time.Duration, p *peerStore) (needP r.kill(n, p) continue } + // kill old and currently unused nodes if nodeCount is > maxNodes + if len(r.addresses) > p.maxNodes && time.Since(n.createTime) > cleanupPeriod && len(n.pendingQueries) == 0 { + log.V(4).Infof("DHT: Old node with 0 pendingQueries. Deleting") + r.kill(n, p) + continue + } if n.reachable { if len(n.pendingQueries) == 0 { goto PING From e1c56f8b154858cbae7ddaa168c552e6f72afa71 Mon Sep 17 00:00:00 2001 From: mipa123 Date: Mon, 6 Aug 2018 10:30:39 +0200 Subject: [PATCH 2/4] implement PassivMode to not respond to any queries --- dht.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/dht.go b/dht.go index 3f4d240..83f0edc 100644 --- a/dht.go +++ b/dht.go @@ -74,6 +74,8 @@ type Config struct { // If true, the node will read the routing table from disk on startup and save routing // table snapshots on disk every few minutes. Default value: true. SaveRoutingTable bool + // do not reply to any incoming queries + PassivMode bool // How often to save the routing table to disk. Default value: 5 minutes. SavePeriod time.Duration // Maximum packets per second to be processed. Disabled if negative. Default value: 100. @@ -105,6 +107,7 @@ func NewConfig() *Config { MaxNodes: 500, CleanupPeriod: 15 * time.Minute, SaveRoutingTable: true, + PassivMode: false, SavePeriod: 5 * time.Minute, RateLimit: 100, MaxInfoHashes: 2048, @@ -655,6 +658,10 @@ func (d *DHT) processPacket(p packetType) { d.ping(addr) } } + // don't reply to any queries if in passivMode + if d.config.PassivMode { + return + } log.V(5).Infof("DHT processing %v request", r.Q) switch r.Q { case "ping": From fe74c1f13433685c4b1992fe8641e5e3cd8cf1e9 Mon Sep 17 00:00:00 2001 From: mipa123 Date: Wed, 8 Aug 2018 06:00:54 +0200 Subject: [PATCH 3/4] connPoolMod --- dht.go | 86 +++++++++++++++++++++++++++++++++----------- dht_test.go | 6 ++-- krpc.go | 34 +++++++++++++++--- neighborhood_test.go | 6 ++-- peer_store_test.go | 2 +- routing.go | 4 +++ 6 files changed, 107 insertions(+), 31 deletions(-) diff --git a/dht.go b/dht.go index 83f0edc..dd42860 100644 --- a/dht.go +++ b/dht.go @@ -95,6 +95,7 @@ type Config struct { ThrottlerTrackedClients int64 //Protocol for UDP connections, udp4= IPv4, udp6 = IPv6 UDPProto string + ConnPoolSize int } // Creates a *Config populated with default values. @@ -115,6 +116,7 @@ func NewConfig() *Config { ClientPerMinuteLimit: 50, ThrottlerTrackedClients: 1000, UDPProto: "udp4", + ConnPoolSize: 1, } } @@ -152,7 +154,7 @@ type DHT struct { config Config routingTable *routingTable peerStore *peerStore - conn *net.UDPConn + conn []*net.UDPConn Logger Logger exploredNeighborhood bool remoteNodeAcquaintance chan string @@ -342,14 +344,18 @@ func (d *DHT) Run() error { // initSocket initializes the udp socket // listening to incoming dht requests func (d *DHT) initSocket() (err error) { - d.conn, err = listen(d.config.Address, d.config.Port, d.config.UDPProto) - if err != nil { - return err - } + for i := 1; i <= d.config.ConnPoolSize ; i++ { + //d.conn[i], err = listen(d.config.Address, d.config.Port, d.config.UDPProto) + conn, err := listen(d.config.Address, d.config.Port, d.config.UDPProto) + if err != nil { + return err + } - // Update the stored port number in case it was set 0, meaning it was - // set automatically by the system - d.config.Port = d.conn.LocalAddr().(*net.UDPAddr).Port + d.conn = append(d.conn,conn) + // Update the stored port number in case it was set 0, meaning it was + // set automatically by the system + d.config.Port = conn.LocalAddr().(*net.UDPAddr).Port + } return nil } @@ -368,26 +374,50 @@ func (d *DHT) bootstrap() { d.getMorePeers(nil) } +func (d *DHT) chanProcessPacket (socketChan chan packetType, bytesArena arena,i int) { + fmt.Printf("chanProcessPacket %d\n",i); + for p := range socketChan { + //for { + //select { + //case p := <-socketChan: + fmt.Printf("process pakcet %d\n",i); + totalRecv.Add(1) + d.processPacket(p) + bytesArena.Push(p.b) + //} + } +} + // loop is the main working section of dht. // It bootstraps a routing table, if necessary, // and listens for incoming DHT requests until d.Stop() // is called from another go routine. func (d *DHT) loop() { // Close socket - defer d.conn.Close() + for _, conn:= range d.conn { + defer conn.Close() + } // There is goroutine pushing and one popping items out of the arena. // One passes work to the other. So there is little contention in the // arena, so it doesn't need many items (it used to have 500!). If // readFromSocket or the packet processing ever need to be // parallelized, this would have to be bumped. - bytesArena := newArena(maxUDPPacketSize, 3) - socketChan := make(chan packetType) - d.wg.Add(1) - go func() { - defer d.wg.Done() - readFromSocket(d.conn, socketChan, bytesArena, d.stop) - }() + bytesArena := newArena(maxUDPPacketSize, 3*d.config.ConnPoolSize) + //socketChan := make(chan packetType,10000) + var socketChan [100]chan packetType + d.wg.Add(d.config.ConnPoolSize) + for i, conn := range d.conn { + socketChan[i] = make(chan packetType) + go func(i int) { + defer d.wg.Done() + readFromSocket(conn, socketChan[i], bytesArena, d.stop, i) + }(i) + go func(i int) { + defer d.wg.Done() + d.chanProcessPacket(socketChan[i], bytesArena, i) + }(i) + } d.bootstrap() @@ -406,7 +436,7 @@ func (d *DHT) loop() { log.Warning("rate limiting disabled") } else { // Token bucket for limiting the number of packets per second. - fillTokenBucket = time.Tick(time.Second / 10) + //fillTokenBucket = time.Tick(time.Second / 10) if d.config.RateLimit > 0 && d.config.RateLimit < 10 { // Less than 10 leads to rounding problems. d.config.RateLimit = 10 @@ -461,10 +491,12 @@ func (d *DHT) loop() { break L } } + d.routingTable.Lock() for ih, _ := range m { d.findNode(string(ih)) } - + d.routingTable.Unlock() +/* case p := <-socketChan: totalRecv.Add(1) if d.config.RateLimit > 0 { @@ -479,12 +511,13 @@ func (d *DHT) loop() { d.processPacket(p) } bytesArena.Push(p.b) - +*/ case <-fillTokenBucket: if tokenBucket < d.config.RateLimit { tokenBucket += d.config.RateLimit / 10 } case <-cleanupTicker: + d.routingTable.Lock() needPing := d.routingTable.cleanup(d.config.CleanupPeriod, d.peerStore) d.wg.Add(1) go func() { @@ -494,6 +527,7 @@ func (d *DHT) loop() { if d.needMoreNodes() { d.bootstrap() } + d.routingTable.Unlock() case node := <-d.pingRequest: d.pingNode(node) case <-secretRotateTicker: @@ -501,11 +535,13 @@ func (d *DHT) loop() { case d.portRequest <- d.config.Port: continue case <-saveTicker: + d.routingTable.Lock() tbl := d.routingTable.reachableNodes() if len(tbl) > 5 { d.store.Remotes = tbl saveStore(*d.store) } + d.routingTable.Unlock() } } } @@ -573,6 +609,8 @@ func (d *DHT) processPacket(p packetType) { log.Warningf("DHT: readResponse Error: %v, %q", err, string(p.b)) return } + d.routingTable.Lock() + defer d.routingTable.Unlock() switch { // Response. case r.Y == "r": @@ -691,10 +729,12 @@ func (d *DHT) ping(address string) { func (d *DHT) pingNode(r *remoteNode) { log.V(3).Infof("DHT: ping => %+v", r.address) - t := r.newQuery("ping") + r.Lock() + t := r.newQuery("pine") queryArguments := map[string]interface{}{"id": d.nodeId} query := queryMessage{t, "q", "ping", queryArguments} + r.Unlock() sendMsg(d.conn, r.address, query) totalSentPing.Add(1) } @@ -703,6 +743,7 @@ func (d *DHT) getPeersFrom(r *remoteNode, ih InfoHash) { if r == nil { return } + r.Lock() totalSentGetPeers.Add(1) ty := "get_peers" transId := r.newQuery(ty) @@ -721,6 +762,7 @@ func (d *DHT) getPeersFrom(r *remoteNode, ih InfoHash) { log.V(3).Infof("DHT sending get_peers. nodeID: %x@%v, InfoHash: %x , distance: %x", r.id, r.address, ih, x) } r.lastSearchTime = time.Now() + r.Unlock() sendMsg(d.conn, r.address, query) } @@ -728,6 +770,7 @@ func (d *DHT) findNodeFrom(r *remoteNode, id string) { if r == nil { return } + r.Lock() totalSentFindNode.Add(1) ty := "find_node" transId := r.newQuery(ty) @@ -748,6 +791,7 @@ func (d *DHT) findNodeFrom(r *remoteNode, id string) { log.V(3).Infof("DHT sending find_node. nodeID: %x@%v, target ID: %x , distance: %x", r.id, r.address, id, x) } r.lastSearchTime = time.Now() + r.Unlock() sendMsg(d.conn, r.address, query) } @@ -762,7 +806,9 @@ func (d *DHT) announcePeer(address net.UDPAddr, ih InfoHash, token string) { } ty := "announce_peer" log.V(3).Infof("DHT: announce_peer => address: %v, ih: %x, token: %x", address, ih, token) + r.Lock() transId := r.newQuery(ty) + r.Unlock() queryArguments := map[string]interface{}{ "id": d.nodeId, "info_hash": ih, diff --git a/dht_test.go b/dht_test.go index 3993ad4..50f562b 100644 --- a/dht_test.go +++ b/dht_test.go @@ -31,7 +31,7 @@ func ExampleDHT() { return } - infoHash, err := DecodeInfoHash("d1c5676ae7ac98e8b19f63565905105e3c4c37a2") + infoHash, err := DecodeInfoHash("c3c5fe05c329ae51c6eca464f6b30ba0a457b2ca") if err != nil { fmt.Printf("DecodeInfoHash faiure: %v", err) return @@ -63,7 +63,7 @@ M: // fmt.Println(DecodePeerAddress(peer)) //} - if fmt.Sprintf("%x", ih) == "d1c5676ae7ac98e8b19f63565905105e3c4c37a2" { + if fmt.Sprintf("%x", ih) == "c3c5fe05c329ae51c6eca464f6b30ba0a457b2ca" { fmt.Println("Peer found for the requested infohash or the test was skipped") return } @@ -122,7 +122,7 @@ func TestDHTLocal(t *testing.T) { return } searchRetryPeriod = time.Second - infoHash, err := DecodeInfoHash("d1c5676ae7ac98e8b19f63565905105e3c4c37a2") + infoHash, err := DecodeInfoHash("c3c5fe05c329ae51c6eca464f6b30ba0a457b2ca") if err != nil { t.Fatalf(err.Error()) } diff --git a/krpc.go b/krpc.go index 8613459..8db0bde 100644 --- a/krpc.go +++ b/krpc.go @@ -9,10 +9,14 @@ import ( "net" "strconv" "time" + "sync" log "github.com/golang/glog" bencode "github.com/jackpal/bencode-go" "github.com/nictuku/nettools" + + "github.com/kavu/go_reuseport" + mrand "math/rand" ) // Search a node again after some time. @@ -20,6 +24,7 @@ var searchRetryPeriod = 15 * time.Second // Owned by the DHT engine. type remoteNode struct { + sync.RWMutex address net.UDPAddr // addressDotFormatted contains a binary representation of the node's host:port address. addressBinaryFormat string @@ -52,6 +57,7 @@ func newRemoteNode(addr net.UDPAddr, id string) *remoteNode { } type queryType struct { + sync.RWMutex Type string ih InfoHash srcNode string @@ -166,16 +172,27 @@ type responseType struct { } // sendMsg bencodes the data in 'query' and sends it to the remote node. -func sendMsg(conn *net.UDPConn, raddr net.UDPAddr, query interface{}) { +func sendMsg(conn []*net.UDPConn, raddr net.UDPAddr, query interface{}) { totalSent.Add(1) var b bytes.Buffer if err := bencode.Marshal(&b, query); err != nil { return } - if n, err := conn.WriteToUDP(b.Bytes(), &raddr); err != nil { +/* + for _, conn := range conn { + if n, err := conn.WriteToUDP(b.Bytes(), &raddr); err != nil { + log.V(3).Infof("DHT: node write failed to %+v, error=%s", raddr, err) + } else { + totalWrittenBytes.Add(int64(n)) + return + } + } +*/ + if n, err := conn[mrand.Intn(len(conn))].WriteToUDP(b.Bytes(), &raddr); err != nil { log.V(3).Infof("DHT: node write failed to %+v, error=%s", raddr, err) } else { totalWrittenBytes.Add(int64(n)) + return } return } @@ -219,9 +236,11 @@ type packetType struct { func listen(addr string, listenPort int, proto string) (socket *net.UDPConn, err error) { log.V(3).Infof("DHT: Listening for peers on IP: %s port: %d Protocol=%s\n", addr, listenPort, proto) - listener, err := net.ListenPacket(proto, addr+":"+strconv.Itoa(listenPort)) + //listener, err := net.ListenPacket(proto, addr+":"+strconv.Itoa(listenPort)) + listener, err := reuseport.ListenPacket(proto, addr+":"+strconv.Itoa(listenPort)) if err != nil { log.V(3).Infof("DHT: Listen failed:", err) + fmt.Printf("DHT: Listen failed: %v\n", err) } if listener != nil { socket = listener.(*net.UDPConn) @@ -230,8 +249,11 @@ func listen(addr string, listenPort int, proto string) (socket *net.UDPConn, err } // Read from UDP socket, writes slice of byte into channel. -func readFromSocket(socket *net.UDPConn, conChan chan packetType, bytesArena arena, stop chan bool) { +func readFromSocket(socket *net.UDPConn, conChan chan packetType, bytesArena arena, stop chan bool, i int) { +//func readFromSocket(socket *net.UDPConn, conChan chan packetType, stop chan bool, i int) { + //b := make([]byte, maxUDPPacketSize) for { + log.V(4).Infof("DHT: readFromSocket %d\n",i) b := bytesArena.Pop() n, addr, err := socket.ReadFromUDP(b) if err != nil { @@ -241,8 +263,12 @@ func readFromSocket(socket *net.UDPConn, conChan chan packetType, bytesArena are if n == maxUDPPacketSize { log.V(3).Infof("DHT: Warning. Received packet with len >= %d, some data may have been discarded.\n", maxUDPPacketSize) } + //fmt.Printf("DHT: readFromSocket %d %d\n",i,int64(n)) totalReadBytes.Add(int64(n)) + //b2 := make([]byte, maxUDPPacketSize) + //copy(b2, b) if n > 0 && err == nil { + //p := packetType{b2[:n], *addr} p := packetType{b, *addr} select { case conChan <- p: diff --git a/neighborhood_test.go b/neighborhood_test.go index a1c2c31..5f2c65f 100644 --- a/neighborhood_test.go +++ b/neighborhood_test.go @@ -51,7 +51,7 @@ func TestUpkeep(t *testing.T) { // there should be no sign of them later on. n := randNodeId() n[0] = byte(0x3d) // Ensure long distance. - r.neighborhoodUpkeep(genremoteNode(string(n)), "udp", newPeerStore(0, 0)) + r.neighborhoodUpkeep(genremoteNode(string(n)), "udp", newPeerStore(0, 0, 0)) } // Current state: 8 neighbors with low proximity. @@ -59,7 +59,7 @@ func TestUpkeep(t *testing.T) { // Adds 7 neighbors from the static table. They should replace the // random ones, except for one. for _, v := range table[1:8] { - r.neighborhoodUpkeep(genremoteNode(v.rid), "udp", newPeerStore(0, 0)) + r.neighborhoodUpkeep(genremoteNode(v.rid), "udp", newPeerStore(0, 0, 0)) } // Current state: 7 close neighbors, one distant dude. @@ -80,7 +80,7 @@ func TestUpkeep(t *testing.T) { if r.boundaryNode == nil { t.Fatalf("tried to kill nil boundary node") } - r.kill(r.boundaryNode, newPeerStore(0, 0)) + r.kill(r.boundaryNode, newPeerStore(0, 0, 0)) // The resulting boundary neighbor should now be one from the static // table, with high proximity. diff --git a/peer_store_test.go b/peer_store_test.go index 59254c5..33c458c 100644 --- a/peer_store_test.go +++ b/peer_store_test.go @@ -10,7 +10,7 @@ func TestPeerStorage(t *testing.T) { t.Fatalf("DecodeInfoHash: %v", err) } // Allow 1 IH and 2 peers. - p := newPeerStore(1, 2) + p := newPeerStore(1, 2, 5000) if ok := p.addContact(ih, "abcedf"); !ok { t.Fatalf("addContact(1/2) expected true, got false") diff --git a/routing.go b/routing.go index 30ca47c..0a0dfcd 100644 --- a/routing.go +++ b/routing.go @@ -2,6 +2,7 @@ package dht import ( log "github.com/golang/glog" + "sync" ) // DHT routing using a binary tree and no buckets. @@ -41,6 +42,7 @@ import ( // even show on the CPU profiling anymore. type nTree struct { + sync.RWMutex zero, one *nTree value *remoteNode } @@ -211,6 +213,8 @@ func (n *nTree) isOK(ih InfoHash) bool { return false } r := n.value + r.Lock() + defer r.Unlock() if len(r.pendingQueries) > maxNodePendingQueries { log.V(3).Infof("DHT: Skipping because there are too many queries pending for this dude.") From 450debb44eb7796ce24cf944acb8a4607f001a86 Mon Sep 17 00:00:00 2001 From: mipa123 Date: Thu, 9 Aug 2018 07:19:07 +0200 Subject: [PATCH 4/4] implemented MaxSearchQueries/searchCount to prevent infinity get_peers loop --- dht.go | 9 ++++++++- peer_store.go | 8 ++++++++ 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/dht.go b/dht.go index 83f0edc..3480717 100644 --- a/dht.go +++ b/dht.go @@ -95,6 +95,8 @@ type Config struct { ThrottlerTrackedClients int64 //Protocol for UDP connections, udp4= IPv4, udp6 = IPv6 UDPProto string + // max get_peer requests per hash to prevent infinity loop + MaxSearchQueries int } // Creates a *Config populated with default values. @@ -115,6 +117,7 @@ func NewConfig() *Config { ClientPerMinuteLimit: 50, ThrottlerTrackedClients: 1000, UDPProto: "udp4", + MaxSearchQueries: 1000, } } @@ -570,7 +573,7 @@ func (d *DHT) processPacket(p packetType) { } r, err := readResponse(p) if err != nil { - log.Warningf("DHT: readResponse Error: %v, %q", err, string(p.b)) + //log.Warningf("DHT: readResponse Error: %v, %q", err, string(p.b)) return } switch { @@ -704,6 +707,10 @@ func (d *DHT) getPeersFrom(r *remoteNode, ih InfoHash) { return } totalSentGetPeers.Add(1) + cnt := d.peerStore.addSearchCount(ih) + if d.config.MaxSearchQueries > 0 && cnt > d.config.MaxSearchQueries { + return + } ty := "get_peers" transId := r.newQuery(ty) if _, ok := r.pendingQueries[transId]; ok { diff --git a/peer_store.go b/peer_store.go index 64eb8d7..65af16c 100644 --- a/peer_store.go +++ b/peer_store.go @@ -128,6 +128,7 @@ func newPeerStore(maxInfoHashes, maxInfoHashPeers, maxNodes int) *peerStore { return &peerStore{ infoHashPeers: lru.New(maxInfoHashes), localActiveDownloads: make(map[InfoHash]bool), + searchCount: make(map[InfoHash]int), maxInfoHashes: maxInfoHashes, maxInfoHashPeers: maxInfoHashPeers, maxNodes: maxNodes, @@ -140,6 +141,7 @@ type peerStore struct { infoHashPeers *lru.Cache // infoHashes for which we are peers. localActiveDownloads map[InfoHash]bool + searchCount map[InfoHash]int maxInfoHashes int maxInfoHashPeers int maxNodes int @@ -227,3 +229,9 @@ func (h *peerStore) hasLocalDownload(ih InfoHash) bool { log.V(3).Infof("hasLocalDownload for %x: %v", ih, ok) return ok } + +func (h *peerStore) addSearchCount(ih InfoHash) int { + h.searchCount[ih]++ + log.V(3).Infof("searchCount %x: %d", ih, h.searchCount[ih]) + return h.searchCount[ih] +}