Skip to content

Commit

Permalink
Optimise consensus (#125)
Browse files Browse the repository at this point in the history
* optimise code

* change consensus engine restart to reset;move consensus state unlock

* update consensus
  • Loading branch information
siovanus authored Oct 12, 2023
1 parent ced0cb7 commit b2566ca
Show file tree
Hide file tree
Showing 11 changed files with 77 additions and 58 deletions.
4 changes: 2 additions & 2 deletions consensus/hotstuff/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type Backend interface {
Address() common.Address

// Validators returns current epoch participants
Validators(height uint64, inConsensus bool) ValidatorSet
Validators(height uint64, inConsensus bool) (ValidatorSet, error)

// EventMux returns the event mux in backend
EventMux() *event.TypeMux
Expand Down Expand Up @@ -70,7 +70,7 @@ type Backend interface {
// CheckPoint retrieve the flag of epoch change and new epoch start height
CheckPoint(height uint64) (uint64, bool)

ReStart()
Reset()

Close() error
}
Expand Down
25 changes: 14 additions & 11 deletions consensus/hotstuff/backend/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,24 +264,27 @@ func (s *backend) Close() error {
return nil
}

func (s *backend) ReStart() {
func (s *backend) Reset() {
if !s.coreStarted {
log.Errorf("Try to reset stopped core engine")
return
}
log.Debug("Reset consensus engine...")

next, err := s.newEpochValidators()
if err != nil {
panic(fmt.Errorf("Restart consensus engine failed, err: %v ", err))
panic(fmt.Errorf("Reset consensus engine failed, err: %v ", err))
}

if next.Equal(s.vals.Copy()) {
log.Trace("Restart Consensus engine, validators not changed.", "origin", s.vals.AddressList(), "current", next.AddressList())
log.Trace("Reset Consensus engine, validators not changed.", "origin", s.vals.AddressList(), "current", next.AddressList())
return
}

if s.coreStarted {
s.Stop()
// waiting for last engine instance free resource, e.g: unsubscribe...
time.Sleep(2 * time.Second)
log.Debug("Restart consensus engine...")
s.Start(s.chain, s.hasBadBlock)
}
// reset validator set
s.vals = next.Copy()

// p2p module connect nodes directly
s.nodesFeed.Send(consensus.StaticNodesEvent{Validators: s.vals.AddressList()})
}

// verifyHeader checks whether a header conforms to the consensus rules.The
Expand Down
10 changes: 5 additions & 5 deletions consensus/hotstuff/backend/governance.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,20 +96,20 @@ func (s *backend) CheckPoint(height uint64) (uint64, bool) {
}

// Validators get validators from backend by `consensus core`, param of `mining` is false denote need last epoch validators.
func (s *backend) Validators(height uint64, mining bool) hotstuff.ValidatorSet {
func (s *backend) Validators(height uint64, mining bool) (hotstuff.ValidatorSet, error) {
if mining {
return s.vals.Copy()
return s.vals.Copy(), nil
}

header := s.chain.GetHeaderByNumber(height)
if header == nil {
return nil
return nil, fmt.Errorf("GetHeaderByNumber, header is nil")
}
_, vals, err := s.getValidatorsByHeader(header, nil, s.chain)
if err != nil {
return nil
return nil, err
}
return vals
return vals, nil
}

// IsSystemTransaction used by state processor while sync block.
Expand Down
26 changes: 13 additions & 13 deletions consensus/hotstuff/core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ type core struct {
validateFn func(common.Hash, []byte) (common.Address, error)
checkPointFn func(uint64) (uint64, bool)
isRunning bool

wg sync.WaitGroup
}

// New creates an HotStuff consensus core
Expand Down Expand Up @@ -125,30 +127,30 @@ func (c *core) startNewRound(round *big.Int) {
Height: new(big.Int).Add(lastProposal.Number(), common.Big1),
Round: new(big.Int),
}
var changeEpoch bool
if changeView {
newView.Height = new(big.Int).Set(c.current.Height())
newView.Round = new(big.Int).Set(round)
} else if c.checkPoint(newView) {
logger.Trace("Stop engine after check point.")
return
} else {
changeEpoch = c.checkPoint(newView)
}

// calculate validator set
c.valSet = c.backend.Validators(newView.HeightU64(), true)
var err error
if c.valSet, err = c.backend.Validators(newView.HeightU64(), true); err != nil {
logger.Error("get validator set failed", "err", err)
return
}
c.valSet.CalcProposer(lastProposer, newView.Round.Uint64())

// update smr and try to unlock at the round0
if err := c.updateRoundState(lastProposal, newView); err != nil {
logger.Error("Update round state failed", "state", c.currentState(), "newView", newView, "err", err)
return
}
if !changeView {
if err := c.current.Unlock(); err != nil {
logger.Error("Unlock node failed", "newView", newView, "err", err)
return
}
if changeEpoch {
c.current.Unlock()
}

logger.Debug("New round", "state", c.currentState(), "newView", newView, "new_proposer", c.valSet.GetProposer(), "valSet", c.valSet.List(), "size", c.valSet.Size(), "IsProposer", c.IsProposer())

// stop last timer and regenerate new timer
Expand All @@ -170,9 +172,7 @@ func (c *core) checkPoint(view *View) bool {
c.point = epochStart
c.lastVals = c.valSet.Copy()
c.logger.Trace("CheckPoint done", "view", view, "point", c.point)
c.backend.ReStart()
}
if !c.isRunning {
c.backend.Reset()
return true
}
return false
Expand Down
5 changes: 5 additions & 0 deletions consensus/hotstuff/core/decide.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@ import (
// handleCommitVote implement description as follow:
// ```
// leader wait for (n n f) votes: V ← {v | matchingMsg(v, commit, curView)}
//
// commitQC ← QC(V )
// broadcast Msg(decide, ⊥, commitQC )
//
// ```
func (c *core) handleCommitVote(data *Message) error {
var (
Expand Down Expand Up @@ -200,6 +202,9 @@ func (c *core) handleDecide(data *Message) error {
}
}

//prepare for new round
c.current.Unlock()

c.startNewRound(common.Big0)
return nil
}
Expand Down
5 changes: 4 additions & 1 deletion consensus/hotstuff/core/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,19 @@ func (c *core) Start(chain consensus.ChainReader) {
c.current = nil

c.subscribeEvents()
go c.handleEvents()

// Start a new round from last sequence + 1
c.startNewRound(common.Big0)
c.wg.Add(1)
go c.handleEvents()
}

// Stop implements core.Engine.Stop
func (c *core) Stop() {
c.stopTimer()
c.unsubscribeEvents()
c.isRunning = false
c.wg.Wait()
}

// Address implement core.Engine.Address
Expand Down Expand Up @@ -100,6 +102,7 @@ func (c *core) unsubscribeEvents() {
}

func (c *core) handleEvents() {
defer c.wg.Done()
logger := c.logger.New("handleEvents")

for {
Expand Down
38 changes: 23 additions & 15 deletions consensus/hotstuff/core/prepare.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ func (c *core) sendPrepare() {
request := c.current.PendingRequest()
if request == nil || request.block == nil || request.block.NumberU64() != c.HeightU64() {
logger.Trace("Pending request invalid", "msg", code)
if request != nil && request.block != nil {
logger.Trace("Pending request invalid", "msg", code, "request.block.Number", request.block.NumberU64(),
"c.Height", c.HeightU64(), "request.block.hash", request.block.SealHash())
}
return
} else {
block = c.current.PendingRequest().block
Expand Down Expand Up @@ -91,10 +95,12 @@ func (c *core) sendPrepare() {

// handlePrepare implement description as follow:
// ```
// repo wait for message m : matchingMsg(m, prepare, curView) from leader(curView)
// if m.node extends from m.justify.node ∧
// safeNode(m.node, m.justify) then
// send voteMsg(prepare, m.node, ⊥) to leader(curView)
//
// repo wait for message m : matchingMsg(m, prepare, curView) from leader(curView)
// if m.node extends from m.justify.node ∧
// safeNode(m.node, m.justify) then
// send voteMsg(prepare, m.node, ⊥) to leader(curView)
//
// ```
func (c *core) handlePrepare(data *Message) error {
var (
Expand Down Expand Up @@ -127,17 +133,19 @@ func (c *core) handlePrepare(data *Message) error {

// ensure remote block is legal.
block := node.Block
if err := c.checkBlock(block); err != nil {
logger.Trace("Failed to check block", "msg", code, "src", src, "err", err)
return err
}
if duration, err := c.backend.Verify(block, false); err != nil {
logger.Trace("Failed to verify unsealed proposal", "msg", code, "src", src, "err", err, "duration", duration)
return errVerifyUnsealedProposal
}
if err := c.executeBlock(block); err != nil {
logger.Trace("Failed to execute block", "msg", code, "src", src, "err", err)
return err
if c.current.executed == nil || c.current.executed.Block != nil || c.current.executed.Block.SealHash() != block.SealHash() {
if err := c.checkBlock(block); err != nil {
logger.Trace("Failed to check block", "msg", code, "src", src, "err", err)
return err
}
if duration, err := c.backend.Verify(block, false); err != nil {
logger.Trace("Failed to verify unsealed proposal", "msg", code, "src", src, "err", err, "duration", duration)
return errVerifyUnsealedProposal
}
if err := c.executeBlock(block); err != nil {
logger.Trace("Failed to execute block", "msg", code, "src", src, "err", err)
return err
}
}

// safety and liveness rules judgement.
Expand Down
1 change: 1 addition & 0 deletions consensus/hotstuff/core/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ func (c *core) handleRequest(request *Request) error {
if c.current.PendingRequest() == nil ||
c.current.PendingRequest().block.NumberU64() < c.current.HeightU64() {
c.current.SetPendingRequest(request)
logger.Trace("Set PendingRequest", "number", request.block.NumberU64(), "hash", request.block.SealHash())
c.sendPrepare()
} else {
logger.Trace("PendingRequest exist")
Expand Down
7 changes: 2 additions & 5 deletions consensus/hotstuff/core/round_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,9 +149,7 @@ func (s *roundState) LastChainedBlock() *types.Block {

// accept pending request from miner only for once.
func (s *roundState) SetPendingRequest(req *Request) {
if s.pendingRequest == nil {
s.pendingRequest = req
}
s.pendingRequest = req
}

func (s *roundState) PendingRequest() *Request {
Expand Down Expand Up @@ -207,13 +205,12 @@ func (s *roundState) LockQC() *QuorumCert {
}

// Unlock it's happened at the start of new round, new state is `StateAcceptRequest`, and `lockQC` keep to judge safety rule
func (s *roundState) Unlock() error {
func (s *roundState) Unlock() {
s.pendingRequest = nil
s.proposalLocked = false
s.lockedBlock = nil
s.node.temp = nil
s.executed = nil
return nil
}

func (s *roundState) LockedBlock() *types.Block {
Expand Down
10 changes: 6 additions & 4 deletions consensus/hotstuff/core/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,8 @@ func (ts *testSystemBackend) Address() common.Address {
}

// Peers returns all connected peers
func (ts *testSystemBackend) Validators(height uint64, mining bool) hotstuff.ValidatorSet {
return ts.peers
func (ts *testSystemBackend) Validators(height uint64, mining bool) (hotstuff.ValidatorSet, error) {
return ts.peers, nil
}

func (ts *testSystemBackend) EventMux() *event.TypeMux {
Expand Down Expand Up @@ -208,7 +208,7 @@ func (ts *testSystemBackend) HasPropsal(hash common.Hash, number *big.Int) bool
}

func (ts *testSystemBackend) Close() error { return nil }
func (ts *testSystemBackend) ReStart() {}
func (ts *testSystemBackend) Reset() {}
func (ts *testSystemBackend) CheckPoint(height uint64) (uint64, bool) { return 0, false }

// ==============================================
Expand Down Expand Up @@ -352,7 +352,9 @@ type testSigner struct {
func (ts *testSigner) Address() common.Address { return ts.address }
func (ts *testSigner) Sign(data []byte) ([]byte, error) { return common.EmptyHash.Bytes(), nil }
func (ts *testSigner) SigHash(header *types.Header) (hash common.Hash) { return common.EmptyHash }
func (ts *testSigner) SignHash(hash common.Hash) ([]byte, error) { return common.EmptyHash.Bytes(), nil }
func (ts *testSigner) SignHash(hash common.Hash) ([]byte, error) {
return common.EmptyHash.Bytes(), nil
}
func (ts *testSigner) SignTx(tx *types.Transaction, signer types.Signer) (*types.Transaction, error) {
return tx, nil
}
Expand Down
4 changes: 2 additions & 2 deletions miner/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -486,7 +486,7 @@ func (w *worker) mainLoop() {
// taskLoop is a standalone goroutine to fetch sealing task from the generator and
// push them to consensus engine.
func (w *worker) taskLoop() {
w.wg.Done()
defer w.wg.Done()
var (
stopCh chan struct{}
prev common.Hash
Expand Down Expand Up @@ -534,7 +534,7 @@ func (w *worker) taskLoop() {
// resultLoop is a standalone goroutine to handle sealing result submitting
// and flush relative data to the database.
func (w *worker) resultLoop() {
w.wg.Done()
defer w.wg.Done()
for {
select {
case block := <-w.resultCh:
Expand Down

0 comments on commit b2566ca

Please sign in to comment.