From b2566ca9ac4448354aca10654890dc368da8d4d7 Mon Sep 17 00:00:00 2001 From: siovanus Date: Thu, 12 Oct 2023 10:50:22 +0800 Subject: [PATCH] Optimise consensus (#125) * optimise code * change consensus engine restart to reset;move consensus state unlock * update consensus --- consensus/hotstuff/backend.go | 4 +-- consensus/hotstuff/backend/engine.go | 25 +++++++++------- consensus/hotstuff/backend/governance.go | 10 +++---- consensus/hotstuff/core/core.go | 26 ++++++++-------- consensus/hotstuff/core/decide.go | 5 ++++ consensus/hotstuff/core/handler.go | 5 +++- consensus/hotstuff/core/prepare.go | 38 ++++++++++++++---------- consensus/hotstuff/core/request.go | 1 + consensus/hotstuff/core/round_state.go | 7 ++--- consensus/hotstuff/core/test_utils.go | 10 ++++--- miner/worker.go | 4 +-- 11 files changed, 77 insertions(+), 58 deletions(-) diff --git a/consensus/hotstuff/backend.go b/consensus/hotstuff/backend.go index 5c03c236..d4ea89a3 100644 --- a/consensus/hotstuff/backend.go +++ b/consensus/hotstuff/backend.go @@ -33,7 +33,7 @@ type Backend interface { Address() common.Address // Validators returns current epoch participants - Validators(height uint64, inConsensus bool) ValidatorSet + Validators(height uint64, inConsensus bool) (ValidatorSet, error) // EventMux returns the event mux in backend EventMux() *event.TypeMux @@ -70,7 +70,7 @@ type Backend interface { // CheckPoint retrieve the flag of epoch change and new epoch start height CheckPoint(height uint64) (uint64, bool) - ReStart() + Reset() Close() error } diff --git a/consensus/hotstuff/backend/engine.go b/consensus/hotstuff/backend/engine.go index fd260303..3b4b7566 100644 --- a/consensus/hotstuff/backend/engine.go +++ b/consensus/hotstuff/backend/engine.go @@ -264,24 +264,27 @@ func (s *backend) Close() error { return nil } -func (s *backend) ReStart() { +func (s *backend) Reset() { + if !s.coreStarted { + log.Errorf("Try to reset stopped core engine") + return + } + log.Debug("Reset consensus engine...") + next, err := s.newEpochValidators() if err != nil { - panic(fmt.Errorf("Restart consensus engine failed, err: %v ", err)) + panic(fmt.Errorf("Reset consensus engine failed, err: %v ", err)) } - if next.Equal(s.vals.Copy()) { - log.Trace("Restart Consensus engine, validators not changed.", "origin", s.vals.AddressList(), "current", next.AddressList()) + log.Trace("Reset Consensus engine, validators not changed.", "origin", s.vals.AddressList(), "current", next.AddressList()) return } - if s.coreStarted { - s.Stop() - // waiting for last engine instance free resource, e.g: unsubscribe... - time.Sleep(2 * time.Second) - log.Debug("Restart consensus engine...") - s.Start(s.chain, s.hasBadBlock) - } + // reset validator set + s.vals = next.Copy() + + // p2p module connect nodes directly + s.nodesFeed.Send(consensus.StaticNodesEvent{Validators: s.vals.AddressList()}) } // verifyHeader checks whether a header conforms to the consensus rules.The diff --git a/consensus/hotstuff/backend/governance.go b/consensus/hotstuff/backend/governance.go index baf9e660..6d26e1ee 100644 --- a/consensus/hotstuff/backend/governance.go +++ b/consensus/hotstuff/backend/governance.go @@ -96,20 +96,20 @@ func (s *backend) CheckPoint(height uint64) (uint64, bool) { } // Validators get validators from backend by `consensus core`, param of `mining` is false denote need last epoch validators. -func (s *backend) Validators(height uint64, mining bool) hotstuff.ValidatorSet { +func (s *backend) Validators(height uint64, mining bool) (hotstuff.ValidatorSet, error) { if mining { - return s.vals.Copy() + return s.vals.Copy(), nil } header := s.chain.GetHeaderByNumber(height) if header == nil { - return nil + return nil, fmt.Errorf("GetHeaderByNumber, header is nil") } _, vals, err := s.getValidatorsByHeader(header, nil, s.chain) if err != nil { - return nil + return nil, err } - return vals + return vals, nil } // IsSystemTransaction used by state processor while sync block. diff --git a/consensus/hotstuff/core/core.go b/consensus/hotstuff/core/core.go index 2477b5d7..147c875a 100644 --- a/consensus/hotstuff/core/core.go +++ b/consensus/hotstuff/core/core.go @@ -58,6 +58,8 @@ type core struct { validateFn func(common.Hash, []byte) (common.Address, error) checkPointFn func(uint64) (uint64, bool) isRunning bool + + wg sync.WaitGroup } // New creates an HotStuff consensus core @@ -125,16 +127,20 @@ func (c *core) startNewRound(round *big.Int) { Height: new(big.Int).Add(lastProposal.Number(), common.Big1), Round: new(big.Int), } + var changeEpoch bool if changeView { newView.Height = new(big.Int).Set(c.current.Height()) newView.Round = new(big.Int).Set(round) - } else if c.checkPoint(newView) { - logger.Trace("Stop engine after check point.") - return + } else { + changeEpoch = c.checkPoint(newView) } // calculate validator set - c.valSet = c.backend.Validators(newView.HeightU64(), true) + var err error + if c.valSet, err = c.backend.Validators(newView.HeightU64(), true); err != nil { + logger.Error("get validator set failed", "err", err) + return + } c.valSet.CalcProposer(lastProposer, newView.Round.Uint64()) // update smr and try to unlock at the round0 @@ -142,13 +148,9 @@ func (c *core) startNewRound(round *big.Int) { logger.Error("Update round state failed", "state", c.currentState(), "newView", newView, "err", err) return } - if !changeView { - if err := c.current.Unlock(); err != nil { - logger.Error("Unlock node failed", "newView", newView, "err", err) - return - } + if changeEpoch { + c.current.Unlock() } - logger.Debug("New round", "state", c.currentState(), "newView", newView, "new_proposer", c.valSet.GetProposer(), "valSet", c.valSet.List(), "size", c.valSet.Size(), "IsProposer", c.IsProposer()) // stop last timer and regenerate new timer @@ -170,9 +172,7 @@ func (c *core) checkPoint(view *View) bool { c.point = epochStart c.lastVals = c.valSet.Copy() c.logger.Trace("CheckPoint done", "view", view, "point", c.point) - c.backend.ReStart() - } - if !c.isRunning { + c.backend.Reset() return true } return false diff --git a/consensus/hotstuff/core/decide.go b/consensus/hotstuff/core/decide.go index 372cca49..51da6ad7 100644 --- a/consensus/hotstuff/core/decide.go +++ b/consensus/hotstuff/core/decide.go @@ -29,8 +29,10 @@ import ( // handleCommitVote implement description as follow: // ``` // leader wait for (n n f) votes: V ← {v | matchingMsg(v, commit, curView)} +// // commitQC ← QC(V ) // broadcast Msg(decide, ⊥, commitQC ) +// // ``` func (c *core) handleCommitVote(data *Message) error { var ( @@ -200,6 +202,9 @@ func (c *core) handleDecide(data *Message) error { } } + //prepare for new round + c.current.Unlock() + c.startNewRound(common.Big0) return nil } diff --git a/consensus/hotstuff/core/handler.go b/consensus/hotstuff/core/handler.go index 347abeb6..cf2455b3 100644 --- a/consensus/hotstuff/core/handler.go +++ b/consensus/hotstuff/core/handler.go @@ -32,10 +32,11 @@ func (c *core) Start(chain consensus.ChainReader) { c.current = nil c.subscribeEvents() - go c.handleEvents() // Start a new round from last sequence + 1 c.startNewRound(common.Big0) + c.wg.Add(1) + go c.handleEvents() } // Stop implements core.Engine.Stop @@ -43,6 +44,7 @@ func (c *core) Stop() { c.stopTimer() c.unsubscribeEvents() c.isRunning = false + c.wg.Wait() } // Address implement core.Engine.Address @@ -100,6 +102,7 @@ func (c *core) unsubscribeEvents() { } func (c *core) handleEvents() { + defer c.wg.Done() logger := c.logger.New("handleEvents") for { diff --git a/consensus/hotstuff/core/prepare.go b/consensus/hotstuff/core/prepare.go index c618036f..0534dab4 100644 --- a/consensus/hotstuff/core/prepare.go +++ b/consensus/hotstuff/core/prepare.go @@ -53,6 +53,10 @@ func (c *core) sendPrepare() { request := c.current.PendingRequest() if request == nil || request.block == nil || request.block.NumberU64() != c.HeightU64() { logger.Trace("Pending request invalid", "msg", code) + if request != nil && request.block != nil { + logger.Trace("Pending request invalid", "msg", code, "request.block.Number", request.block.NumberU64(), + "c.Height", c.HeightU64(), "request.block.hash", request.block.SealHash()) + } return } else { block = c.current.PendingRequest().block @@ -91,10 +95,12 @@ func (c *core) sendPrepare() { // handlePrepare implement description as follow: // ``` -// repo wait for message m : matchingMsg(m, prepare, curView) from leader(curView) -// if m.node extends from m.justify.node ∧ -// safeNode(m.node, m.justify) then -// send voteMsg(prepare, m.node, ⊥) to leader(curView) +// +// repo wait for message m : matchingMsg(m, prepare, curView) from leader(curView) +// if m.node extends from m.justify.node ∧ +// safeNode(m.node, m.justify) then +// send voteMsg(prepare, m.node, ⊥) to leader(curView) +// // ``` func (c *core) handlePrepare(data *Message) error { var ( @@ -127,17 +133,19 @@ func (c *core) handlePrepare(data *Message) error { // ensure remote block is legal. block := node.Block - if err := c.checkBlock(block); err != nil { - logger.Trace("Failed to check block", "msg", code, "src", src, "err", err) - return err - } - if duration, err := c.backend.Verify(block, false); err != nil { - logger.Trace("Failed to verify unsealed proposal", "msg", code, "src", src, "err", err, "duration", duration) - return errVerifyUnsealedProposal - } - if err := c.executeBlock(block); err != nil { - logger.Trace("Failed to execute block", "msg", code, "src", src, "err", err) - return err + if c.current.executed == nil || c.current.executed.Block != nil || c.current.executed.Block.SealHash() != block.SealHash() { + if err := c.checkBlock(block); err != nil { + logger.Trace("Failed to check block", "msg", code, "src", src, "err", err) + return err + } + if duration, err := c.backend.Verify(block, false); err != nil { + logger.Trace("Failed to verify unsealed proposal", "msg", code, "src", src, "err", err, "duration", duration) + return errVerifyUnsealedProposal + } + if err := c.executeBlock(block); err != nil { + logger.Trace("Failed to execute block", "msg", code, "src", src, "err", err) + return err + } } // safety and liveness rules judgement. diff --git a/consensus/hotstuff/core/request.go b/consensus/hotstuff/core/request.go index 43fecad3..2760ec74 100644 --- a/consensus/hotstuff/core/request.go +++ b/consensus/hotstuff/core/request.go @@ -46,6 +46,7 @@ func (c *core) handleRequest(request *Request) error { if c.current.PendingRequest() == nil || c.current.PendingRequest().block.NumberU64() < c.current.HeightU64() { c.current.SetPendingRequest(request) + logger.Trace("Set PendingRequest", "number", request.block.NumberU64(), "hash", request.block.SealHash()) c.sendPrepare() } else { logger.Trace("PendingRequest exist") diff --git a/consensus/hotstuff/core/round_state.go b/consensus/hotstuff/core/round_state.go index 9e4a9b22..a7c0b782 100644 --- a/consensus/hotstuff/core/round_state.go +++ b/consensus/hotstuff/core/round_state.go @@ -149,9 +149,7 @@ func (s *roundState) LastChainedBlock() *types.Block { // accept pending request from miner only for once. func (s *roundState) SetPendingRequest(req *Request) { - if s.pendingRequest == nil { - s.pendingRequest = req - } + s.pendingRequest = req } func (s *roundState) PendingRequest() *Request { @@ -207,13 +205,12 @@ func (s *roundState) LockQC() *QuorumCert { } // Unlock it's happened at the start of new round, new state is `StateAcceptRequest`, and `lockQC` keep to judge safety rule -func (s *roundState) Unlock() error { +func (s *roundState) Unlock() { s.pendingRequest = nil s.proposalLocked = false s.lockedBlock = nil s.node.temp = nil s.executed = nil - return nil } func (s *roundState) LockedBlock() *types.Block { diff --git a/consensus/hotstuff/core/test_utils.go b/consensus/hotstuff/core/test_utils.go index 573c6d97..130a537b 100644 --- a/consensus/hotstuff/core/test_utils.go +++ b/consensus/hotstuff/core/test_utils.go @@ -136,8 +136,8 @@ func (ts *testSystemBackend) Address() common.Address { } // Peers returns all connected peers -func (ts *testSystemBackend) Validators(height uint64, mining bool) hotstuff.ValidatorSet { - return ts.peers +func (ts *testSystemBackend) Validators(height uint64, mining bool) (hotstuff.ValidatorSet, error) { + return ts.peers, nil } func (ts *testSystemBackend) EventMux() *event.TypeMux { @@ -208,7 +208,7 @@ func (ts *testSystemBackend) HasPropsal(hash common.Hash, number *big.Int) bool } func (ts *testSystemBackend) Close() error { return nil } -func (ts *testSystemBackend) ReStart() {} +func (ts *testSystemBackend) Reset() {} func (ts *testSystemBackend) CheckPoint(height uint64) (uint64, bool) { return 0, false } // ============================================== @@ -352,7 +352,9 @@ type testSigner struct { func (ts *testSigner) Address() common.Address { return ts.address } func (ts *testSigner) Sign(data []byte) ([]byte, error) { return common.EmptyHash.Bytes(), nil } func (ts *testSigner) SigHash(header *types.Header) (hash common.Hash) { return common.EmptyHash } -func (ts *testSigner) SignHash(hash common.Hash) ([]byte, error) { return common.EmptyHash.Bytes(), nil } +func (ts *testSigner) SignHash(hash common.Hash) ([]byte, error) { + return common.EmptyHash.Bytes(), nil +} func (ts *testSigner) SignTx(tx *types.Transaction, signer types.Signer) (*types.Transaction, error) { return tx, nil } diff --git a/miner/worker.go b/miner/worker.go index 8883847d..1c6c2b0b 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -486,7 +486,7 @@ func (w *worker) mainLoop() { // taskLoop is a standalone goroutine to fetch sealing task from the generator and // push them to consensus engine. func (w *worker) taskLoop() { - w.wg.Done() + defer w.wg.Done() var ( stopCh chan struct{} prev common.Hash @@ -534,7 +534,7 @@ func (w *worker) taskLoop() { // resultLoop is a standalone goroutine to handle sealing result submitting // and flush relative data to the database. func (w *worker) resultLoop() { - w.wg.Done() + defer w.wg.Done() for { select { case block := <-w.resultCh: