Skip to content
This repository has been archived by the owner on Oct 4, 2019. It is now read-only.

Commit

Permalink
Merge pull request #417 from ethereumproject/fix/head-sync-tracking
Browse files Browse the repository at this point in the history
core,eth: improve head sync tracking
  • Loading branch information
whilei authored Dec 3, 2017
2 parents 0a3f1fd + 8a39ed1 commit 2b0a4d5
Show file tree
Hide file tree
Showing 12 changed files with 231 additions and 51 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
language: go
go_import_path: github.com/ethereumproject/go-ethereum
go: 1.8
go: 1.8.x
os:
- linux
- osx
Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ $ go install -ldflags "-X main.Version="`git describe --tags` ./cmd/...
```

#### Using release source code tarball
Because of strict Go directory structure, tarball needs to be extracted into proper subdirectory under `$GOPATH`.
Following commands are example of building the v4.1.1 release:
Because of strict Go directory structure, the tarball needs to be extracted into proper subdirectory under `$GOPATH`.
The following commands are an example of building the v4.1.1 release:
```
$ mkdir -p $GOPATH/src/github.com/ethereumproject
$ cd $GOPATH/src/github.com/ethereumproject
Expand Down
2 changes: 1 addition & 1 deletion cmd/geth/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -864,7 +864,7 @@ func runStatusSyncLogs(e *eth.Ethereum, interval string, maxPeers int) {
currentBlockHex = blockchain.CurrentFastBlock().Hash().Hex()
}
}
if current == height && !(current == 0 && height == 0) {
if current >= height && !(current == 0 && height == 0) {
fMode = "Import " // with spaces to make same length as Discover, FastSync, FullSync
fOfHeight = strings.Repeat(" ", 12)
fHeightRatio = strings.Repeat(" ", 7)
Expand Down
59 changes: 41 additions & 18 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ func NewBlockChain(chainDb ethdb.Database, config *ChainConfig, pow pow.PoW, mux
// Check the current state of the block hashes and make sure that we do not have any of the bad blocks in our chain
for i := range config.BadHashes {
if header := bc.GetHeader(config.BadHashes[i].Hash); header != nil && header.Number.Cmp(config.BadHashes[i].Block) == 0 {
glog.V(logger.Error).Infof("Found bad hash, rewinding chain to block #%d [%x…]", header.Number, header.ParentHash[:4])
glog.V(logger.Error).Infof("Found bad hash, rewinding chain to block #%d [%s]", header.Number, header.ParentHash.Hex())
bc.SetHead(header.Number.Uint64() - 1)
glog.V(logger.Error).Infoln("Chain rewind was successful, resuming normal operation")
}
Expand Down Expand Up @@ -198,7 +198,7 @@ func NewBlockChainDryrun(chainDb ethdb.Database, config *ChainConfig, pow pow.Po
// Check the current state of the block hashes and make sure that we do not have any of the bad blocks in our chain
for i := range config.BadHashes {
if header := bc.GetHeader(config.BadHashes[i].Hash); header != nil && header.Number.Cmp(config.BadHashes[i].Block) == 0 {
glog.V(logger.Error).Infof("Found bad hash, rewinding chain to block #%d [%x…]", header.Number, header.ParentHash[:4])
glog.V(logger.Error).Infof("Found bad hash, rewinding chain to block #%d [%s]", header.Number, header.ParentHash.Hex())
bc.SetHead(header.Number.Uint64() - 1)
glog.V(logger.Error).Infoln("Chain rewind was successful, resuming normal operation")
}
Expand Down Expand Up @@ -612,7 +612,7 @@ func (self *BlockChain) LoadLastState(dryrun bool) error {
glog.V(logger.Info).Infof("Validating currentFastBlock: %v", self.currentFastBlock.Number())
if e := self.blockIsInvalid(self.currentFastBlock); e != nil {
if !dryrun {
glog.V(logger.Warn).Infof("WARNING: Found unhealthy head fast block #%d (%x): %v \nAttempting chain reset with recovery.", self.currentFastBlock.Number(), self.currentFastBlock.Hash(), e)
glog.V(logger.Warn).Infof("WARNING: Found unhealthy head fast block #%d [%x]: %v \nAttempting chain reset with recovery.", self.currentFastBlock.Number(), self.currentFastBlock.Hash(), e)
return recoverOrReset()
}
return fmt.Errorf("invalid currentFastBlock: %v", e)
Expand Down Expand Up @@ -1082,7 +1082,6 @@ type WriteStatus byte
const (
NonStatTy WriteStatus = iota
CanonStatTy
SplitStatTy
SideStatTy
)

Expand Down Expand Up @@ -1272,6 +1271,11 @@ func (self *BlockChain) WriteBlock(block *types.Block) (status WriteStatus, err
case SideStatTy:
mlogWriteStatus = "SIDE"
}
parent := self.GetBlock(block.ParentHash())
parentTimeDiff := new(big.Int)
if parent != nil {
parentTimeDiff = new(big.Int).Sub(block.Time(), parent.Time())
}
mlogBlockchain.Send(mlogBlockchainWriteBlock.SetDetailValues(
mlogWriteStatus,
err,
Expand All @@ -1282,6 +1286,10 @@ func (self *BlockChain) WriteBlock(block *types.Block) (status WriteStatus, err
block.GasUsed(),
block.Coinbase().Hex(),
block.Time(),
block.Difficulty(),
len(block.Uncles()),
block.ReceivedAt,
parentTimeDiff,
))
}()
}
Expand All @@ -1302,9 +1310,19 @@ func (self *BlockChain) WriteBlock(block *types.Block) (status WriteStatus, err
externTd := new(big.Int).Add(block.Difficulty(), ptd)

// If the total difficulty is higher than our known, add it to the canonical chain
// Second clause in the if statement reduces the vulnerability to selfish mining.
// Please refer to http://www.cs.cornell.edu/~ie53/publications/btcProcFC.pdf
if externTd.Cmp(localTd) > 0 || (externTd.Cmp(localTd) == 0 && mrand.Float64() < 0.5) {
// Compare local vs external difficulties
tdCompare := externTd.Cmp(localTd)

// Initialize reorg if incoming TD is greater than local.
reorg := tdCompare > 0

// If TDs are the same, randomize.
if tdCompare == 0 {
// Reduces the vulnerability to selfish mining.
// Please refer to http://www.cs.cornell.edu/~ie53/publications/btcProcFC.pdf
reorg = mrand.Float64() < 0.5
}
if reorg {
// Reorganise the chain if the parent is not the head block
if block.ParentHash() != self.currentBlock.Hash() {
if err := self.reorg(self.currentBlock, block); err != nil {
Expand Down Expand Up @@ -1468,7 +1486,7 @@ func (self *BlockChain) InsertChain(chain types.Blocks) (chainIndex int, err err
switch status {
case CanonStatTy:
if glog.V(logger.Debug) {
glog.Infof("[%v] inserted block #%d (%d TXs %v G %d UNCs) (%x...). Took %v\n", time.Now().UnixNano(), block.Number(), len(block.Transactions()), block.GasUsed(), len(block.Uncles()), block.Hash().Bytes()[0:4], time.Since(bstart))
glog.Infof("[%v] inserted block #%d (%d TXs %v G %d UNCs) [%s]. Took %v\n", time.Now().UnixNano(), block.Number(), len(block.Transactions()), block.GasUsed(), len(block.Uncles()), block.Hash().Hex(), time.Since(bstart))
}
events = append(events, ChainEvent{block, block.Hash(), logs})

Expand All @@ -1486,12 +1504,9 @@ func (self *BlockChain) InsertChain(chain types.Blocks) (chainIndex int, err err
}
case SideStatTy:
if glog.V(logger.Detail) {
glog.Infof("inserted forked block #%d (TD=%v) (%d TXs %d UNCs) (%x...). Took %v\n", block.Number(), block.Difficulty(), len(block.Transactions()), len(block.Uncles()), block.Hash().Bytes()[0:4], time.Since(bstart))
glog.Infof("inserted forked block #%d (TD=%v) (%d TXs %d UNCs) [%s]. Took %v\n", block.Number(), block.Difficulty(), len(block.Transactions()), len(block.Uncles()), block.Hash().Hex(), time.Since(bstart))
}
events = append(events, ChainSideEvent{block, logs})

case SplitStatTy:
events = append(events, ChainSplitEvent{block, logs})
}
stats.processed++
}
Expand All @@ -1511,15 +1526,15 @@ func (self *BlockChain) InsertChain(chain types.Blocks) (chainIndex int, err err
tend,
))
}
glog.V(logger.Info).Infof("imported %d block(s) (%d queued %d ignored) including %d txs in %v. #%v [%x / %x]\n",
glog.V(logger.Info).Infof("imported %d block(s) (%d queued %d ignored) including %d txs in %v. #%v [%s / %s]\n",
stats.processed,
stats.queued,
stats.ignored,
txcount,
tend,
end.Number(),
start.Hash().Bytes()[:4],
end.Hash().Bytes()[:4])
start.Hash().Hex(),
end.Hash().Hex())
}
go self.postChainEvents(events, coalescedLogs)

Expand Down Expand Up @@ -1596,9 +1611,17 @@ func (self *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
}
}

commonHash := commonBlock.Hash()
if glog.V(logger.Debug) {
commonHash := commonBlock.Hash()
glog.Infof("Chain split detected @ %x. Reorganising chain from #%v %x to %x", commonHash[:4], numSplit, oldStart.Hash().Bytes()[:4], newStart.Hash().Bytes()[:4])
glog.Infof("Chain split detected @ [%s]. Reorganising chain from #%v %s to %s", commonHash.Hex(), numSplit, oldStart.Hash().Hex(), newStart.Hash().Hex())
}
if logger.MlogEnabled() {
mlogBlockchain.Send(mlogBlockchainReorgBlocks.SetDetailValues(
commonHash.Hex(),
numSplit,
oldStart.Hash().Hex(),
newStart.Hash().Hex(),
))
}

var addedTxs types.Transactions
Expand Down Expand Up @@ -1689,7 +1712,7 @@ func (chain *BlockChain) update() {
if len(blocks) > 0 {
types.BlockBy(types.Number).Sort(blocks)
if i, err := chain.InsertChain(blocks); err != nil {
log.Printf("periodic future chain update on block #%d (%x): %s", blocks[i].Number(), blocks[i].Hash(), err)
log.Printf("periodic future chain update on block #%d [%s]: %s", blocks[i].Number(), blocks[i].Hash().Hex(), err)
}
}
}
Expand Down
17 changes: 17 additions & 0 deletions core/mlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ A STATUS of NONE means it was written _without_ any abnormal chain event, such a
{"BLOCK", "GAS_USED", "BIGINT"},
{"BLOCK", "COINBASE", "STRING"},
{"BLOCK", "TIME", "BIGINT"},
{"BLOCK", "DIFFICULTY", "BIGINT"},
{"BLOCK", "UNCLES", "INT"},
{"BLOCK", "RECEIVED_AT", "BIGINT"},
{"BLOCK", "DIFF_PARENT_TIME", "BIGINT"},
},
}

Expand All @@ -58,6 +62,19 @@ var mlogBlockchainInsertBlocks = logger.MLogT{
},
}

var mlogBlockchainReorgBlocks = logger.MLogT{
Description: "Called when a chain split is detected and a subset of blocks are reoganized.",
Receiver: "BLOCKCHAIN",
Verb: "REORG",
Subject: "BLOCKS",
Details: []logger.MLogDetailT{
{"REORG", "LAST_COMMON_HASH", "STRING"},
{"REORG", "SPLIT_NUMBER", "BIGINT"},
{"BLOCKS", "OLD_START_HASH", "STRING"},
{"BLOCKS", "NEW_START_HASH", "STRING"},
},
}

var mlogTxPoolAddTx = logger.MLogT{
Description: `Called once when a valid transaction is added to tx pool.
$TO.NAME will be the account address hex or '[NEW_CONTRACT]' in case of a contract.`,
Expand Down
43 changes: 38 additions & 5 deletions eth/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,8 +266,20 @@ func (d *Downloader) RegisterPeer(id string, version int, currentHead currentHea
getRelHeaders relativeHeaderFetcherFn, getAbsHeaders absoluteHeaderFetcherFn, getBlockBodies blockBodyFetcherFn,
getReceipts receiptFetcherFn, getNodeData stateFetcherFn) error {

var err error
defer func() {
if logger.MlogEnabled() {
mlogDownloader.Send(mlogDownloaderRegisterPeer.SetDetailValues(
id,
version,
err,
))
}
}()

glog.V(logger.Detail).Infoln("Registering peer", id)
if err := d.peers.Register(newPeer(id, version, currentHead, getRelHeaders, getAbsHeaders, getBlockBodies, getReceipts, getNodeData)); err != nil {
err = d.peers.Register(newPeer(id, version, currentHead, getRelHeaders, getAbsHeaders, getBlockBodies, getReceipts, getNodeData))
if err != nil {
glog.V(logger.Error).Infoln("Register failed:", err)
return err
}
Expand All @@ -280,9 +292,21 @@ func (d *Downloader) RegisterPeer(id string, version int, currentHead currentHea
// the specified peer. An effort is also made to return any pending fetches into
// the queue.
func (d *Downloader) UnregisterPeer(id string) error {

var err error
defer func() {
if logger.MlogEnabled() {
mlogDownloader.Send(mlogDownloaderUnregisterPeer.SetDetailValues(
id,
err,
))
}
}()

// Unregister the peer from the active peer set and revoke any fetch tasks
glog.V(logger.Detail).Infoln("Unregistering peer", id)
if err := d.peers.Unregister(id); err != nil {
err = d.peers.Unregister(id)
if err != nil {
glog.V(logger.Error).Infoln("Unregister failed:", err)
return err
}
Expand Down Expand Up @@ -685,10 +709,10 @@ func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) {
// If the head fetch already found an ancestor, return
if !common.EmptyHash(hash) {
if int64(number) <= floor {
glog.V(logger.Warn).Infof("%v: potential rewrite attack: #%d [%x…] <= #%d limit", p, number, hash[:4], floor)
glog.V(logger.Warn).Infof("%v: potential rewrite attack: #%d [%x…] <= #%d limit", p, number, hash, floor)
return 0, errInvalidAncestor
}
glog.V(logger.Debug).Infof("%v: common ancestor: #%d [%x…]", p, number, hash[:4])
glog.V(logger.Debug).Infof("%v: common ancestor: #%d [%x…]", p, number, hash)
return number, nil
}
// Ancestor not found, we need to binary search over our chain
Expand Down Expand Up @@ -1479,7 +1503,16 @@ func (d *Downloader) qosTuner() {
atomic.StoreUint64(&d.rttConfidence, conf)

// Log the new QoS values and sleep until the next RTT
glog.V(logger.Debug).Infof("Quality of service: rtt %v, conf %.3f, ttl %v", rtt, float64(conf)/1000000.0, d.requestTTL())
ttl := d.requestTTL()
if logger.MlogEnabled() {
mlogDownloader.Send(mlogDownloaderTuneQOS.SetDetailValues(
rtt,
float64(conf)/1000000.0,
ttl,
))
}
glog.V(logger.Debug).Infof("Quality of service: rtt %v, conf %.3f, ttl %v", rtt, float64(conf)/1000000.0, ttl)

select {
case <-d.quitCh:
return
Expand Down
49 changes: 49 additions & 0 deletions eth/downloader/mlog.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package downloader

import "github.com/ethereumproject/go-ethereum/logger"

var mlogDownloader = logger.MLogRegisterAvailable("downloader", mLogLines)

var mLogLines = []logger.MLogT{
mlogDownloaderRegisterPeer,
mlogDownloaderUnregisterPeer,
mlogDownloaderTuneQOS,
}

var mlogDownloaderRegisterPeer = logger.MLogT{
Description: "Called when the downloader registers a peer.",
Receiver: "DOWNLOADER",
Verb: "REGISTER",
Subject: "PEER",
Details: []logger.MLogDetailT{
{"PEER", "ID", "STRING"},
{"PEER", "VERSION", "INT"},
{"REGISTER", "ERROR", "STRING_OR_NULL"},
},
}

var mlogDownloaderUnregisterPeer = logger.MLogT{
Description: "Called when the downloader unregisters a peer.",
Receiver: "DOWNLOADER",
Verb: "UNREGISTER",
Subject: "PEER",
Details: []logger.MLogDetailT{
{"PEER", "ID", "STRING"},
{"UNREGISTER", "ERROR", "STRING_OR_NULL"},
},
}

var mlogDownloaderTuneQOS = logger.MLogT{
Description: `Called at intervals to gather peer latency statistics. Estimates request round trip time.
RTT reports the estimated Request Round Trip time, confidence is measures from 0-1 (1 is ultimately confidenct),
and TTL reports the Timeout Allowance for a single downloader request to finish within.`,
Receiver: "DOWNLOADER",
Verb: "TUNE",
Subject: "QOS",
Details: []logger.MLogDetailT{
{"QOS", "RTT", "DURATION"},
{"QOS", "CONFIDENCE", "NUMBER"},
{"QOS", "TTL", "DURATION"},
},
}
Loading

0 comments on commit 2b0a4d5

Please sign in to comment.