Skip to content

Commit

Permalink
Relay the TX and Consensus
Browse files Browse the repository at this point in the history
MSG should be relayed since not all node are connected with each other.

Signed-off-by: Xiang Fu <[email protected]>
  • Loading branch information
Xiang Fu committed Sep 21, 2017
1 parent 1cfb6cd commit 1d220f2
Show file tree
Hide file tree
Showing 5 changed files with 107 additions and 20 deletions.
12 changes: 10 additions & 2 deletions net/message/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,10 @@ type consensus struct {
}

func (cp *ConsensusPayload) Hash() common.Uint256 {
return common.Uint256{}
d := sig.GetHashData(cp)
temp := sha256.Sum256([]byte(d))
cp.hash = common.Uint256(sha256.Sum256(temp[:]))
return cp.hash
}

func (cp *ConsensusPayload) Verify() error {
Expand Down Expand Up @@ -106,7 +109,12 @@ func (cp *ConsensusPayload) GetMessage() []byte {

func (msg consensus) Handle(node Noder) error {
log.Debug()
node.LocalNode().GetEvent("consensus").Notify(events.EventNewInventory, &msg.cons)
cp := &msg.cons
if !node.LocalNode().ExistedID(cp.Hash()) {
node.LocalNode().GetEvent("consensus").Notify(events.EventNewInventory, cp)
node.LocalNode().Relay(cp)
log.Info("Relay consensus message")
}
return nil
}

Expand Down
2 changes: 2 additions & 0 deletions net/message/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ func (msg trn) Handle(node Noder) error {
if errCode := node.LocalNode().AppendTxnPool(&(msg.txn)); errCode != ErrNoError {
return errors.New("[message] VerifyTransaction failed when AppendTxnPool.")
}
node.LocalNode().Relay(tx)
log.Info("Relay transaction")
node.LocalNode().IncRxTxnCnt()
log.Debug("RX Transaction message hash", msg.txn.Hash())
log.Debug("RX Transaction message type", msg.txn.TxType)
Expand Down
31 changes: 26 additions & 5 deletions net/node/idCache.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,45 @@ package node

import (
"DNA/common"
"DNA/net/protocol"
"sync"
)

type idCache struct {
sync.RWMutex
list map[common.Uint256]bool
lastid common.Uint256
index int
idarray []common.Uint256
idmaplsit map[common.Uint256]int
}

func (c *idCache) init() {
c.index = 0
c.idmaplsit = make(map[common.Uint256]int, protocol.MAXIDCACHED)
c.idarray = make([]common.Uint256, protocol.MAXIDCACHED)
}

func (c *idCache) add() {
}
func (c *idCache) add(id common.Uint256) {
oldid := c.idarray[c.index]
delete(c.idmaplsit, oldid)
c.idarray[c.index] = id
c.idmaplsit[id] = c.index
c.index++
c.lastid = id
c.index = c.index % protocol.MAXIDCACHED

func (c *idCache) del() {
}

func (c *idCache) ExistedID(id common.Uint256) bool {
// TODO
c.Lock()
defer c.Unlock()
if id == c.lastid {
return true
}
if _, ok := c.idmaplsit[id]; ok {
return true
} else {
c.add(id)
}
return false
}
80 changes: 67 additions & 13 deletions net/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,15 @@ import (

type node struct {
//sync.RWMutex //The Lock not be used as expected to use function channel instead of lock
state uint32 // node state
id uint64 // The nodes's id
state uint32 // node state
id uint64 // The nodes's id
cap [32]byte // The node capability set
version uint32 // The network protocol the node used
services uint64 // The services the node supplied
relay bool // The relay capability of the node (merge into capbility flag)
height uint64 // The node latest block height
txnCnt uint64 // The transactions be transmit by this node
rxTxnCnt uint64 // The transaction received by this node
version uint32 // The network protocol the node used
services uint64 // The services the node supplied
relay bool // The relay capability of the node (merge into capbility flag)
height uint64 // The node latest block height
txnCnt uint64 // The transactions be transmit by this node
rxTxnCnt uint64 // The transaction received by this node
publicKey *crypto.PubKey
// TODO does this channel should be a buffer channel
chF chan func() error // Channel used to operate the node without lock
Expand Down Expand Up @@ -173,6 +173,7 @@ func InitNode(pubKey *crypto.PubKey) Noder {
n.publicKey = pubKey
n.TXNPool.init()
n.eventQueue.init()
n.idCache.init()
n.nodeDisconnectSubscriber = n.eventQueue.GetEvent("disconnect").Subscribe(events.EventNodeDisconnect, n.NodeDisconnect)
go n.initConnection()
go n.updateConnection()
Expand Down Expand Up @@ -216,24 +217,24 @@ func (node *node) GetPort() uint16 {
return node.port
}

func (node *node) GetHttpInfoPort() (int) {
func (node *node) GetHttpInfoPort() int {
return int(node.httpInfoPort)
}

func (node *node) SetHttpInfoPort(nodeInfoPort uint16) {
node.httpInfoPort = nodeInfoPort
}

func (node *node) GetHttpInfoState() bool{
func (node *node) GetHttpInfoState() bool {
if node.cap[HTTPINFOFLAG] == 0x01 {
return true
} else {
return false
}
}

func (node *node) SetHttpInfoState(nodeInfo bool){
if nodeInfo{
func (node *node) SetHttpInfoState(nodeInfo bool) {
if nodeInfo {
node.cap[HTTPINFOFLAG] = 0x01
} else {
node.cap[HTTPINFOFLAG] = 0x00
Expand Down Expand Up @@ -268,7 +269,7 @@ func (node *node) SetState(state uint32) {
atomic.StoreUint32(&(node.state), state)
}

func (node *node) GetPubKey() *crypto.PubKey{
func (node *node) GetPubKey() *crypto.PubKey {
return node.publicKey
}

Expand Down Expand Up @@ -490,3 +491,56 @@ func (node *node) RemoveFromRetryList(addr string) {
}

}

//copy xmit at first
func (node *node) Relay(message interface{}) error {
log.Debug()
var buffer []byte
var err error
switch message.(type) {
case *transaction.Transaction:
log.Debug("TX transaction message")
txn := message.(*transaction.Transaction)
buffer, err = NewTxn(txn)
if err != nil {
log.Error("Error New Tx message: ", err)
return err
}
node.txnCnt++
case *ledger.Block:
log.Debug("TX block message")
block := message.(*ledger.Block)
buffer, err = NewBlock(block)
if err != nil {
log.Error("Error New Block message: ", err)
return err
}
case *ConsensusPayload:
log.Debug("TX consensus message")
consensusPayload := message.(*ConsensusPayload)
buffer, err = NewConsensus(consensusPayload)
if err != nil {
log.Error("Error New consensus message: ", err)
return err
}
case Uint256:
log.Debug("TX block hash message")
hash := message.(Uint256)
buf := bytes.NewBuffer([]byte{})
hash.Serialize(buf)
// construct inv message
invPayload := NewInvPayload(BLOCK, 1, buf.Bytes())
buffer, err = NewInv(invPayload)
if err != nil {
log.Error("Error New inv message")
return err
}
default:
log.Warn("Unknown Xmit message type")
return errors.New("Unknown Xmit message type")
}

node.nbrNodes.Broadcast(buffer)

return nil
}
2 changes: 2 additions & 0 deletions net/protocol/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ const (
CONNMONITOR = 6
CONNMAXBACK = 4000
MAXRETRYCOUNT = 3
MAXIDCACHED = 5000
)

// The node state
Expand Down Expand Up @@ -137,6 +138,7 @@ type Noder interface {
RemoveAddrInConnectingList(addr string)
AddInRetryList(addr string)
RemoveFromRetryList(addr string)
Relay(interface{}) error
}

func (msg *NodeAddr) Deserialization(p []byte) error {
Expand Down

0 comments on commit 1d220f2

Please sign in to comment.