From d97d6e2007f88a92177f95cef19985ac469d6b08 Mon Sep 17 00:00:00 2001 From: rfyiamcool Date: Tue, 26 Sep 2023 11:39:55 +0800 Subject: [PATCH] fix: ping etcd Signed-off-by: rfyiamcool --- README.md | 4 ++-- elector.go | 22 ++++++++++++++++++++-- elector_test.go | 12 ++++++++++++ go.mod | 2 +- 4 files changed, 35 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index 68f0505..2cc9318 100644 --- a/README.md +++ b/README.md @@ -50,8 +50,8 @@ func main() { s := gocron.NewScheduler(time.UTC) s.WithDistributedElector(el) - s.Every("1s").Do(func() { - if el.IsLeader(context.TODO()) == nil { + _, _ = s.Every("1s").Do(func() { + if el.IsLeader(context.Background()) == nil { fmt.Println("the current instance is leader") } else { fmt.Println("the current leader is", el.GetLeaderID()) diff --git a/elector.go b/elector.go index 79ee7e0..c79d6c6 100644 --- a/elector.go +++ b/elector.go @@ -3,13 +3,14 @@ package elector import ( "context" "crypto/rand" - "errors" "fmt" mrand "math/rand" "os" "sync" "time" + "github.com/pkg/errors" + "github.com/go-co-op/gocron" clientv3 "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/client/v3/concurrency" @@ -18,6 +19,7 @@ import ( var ( ErrNonLeader = errors.New("the elector is not leader") ErrClosed = errors.New("the elector is already closed") + ErrPingEtcd = errors.New("ping etcd server timeout") ) var ( @@ -64,7 +66,7 @@ func NewElectorWithClient(ctx context.Context, cli *clientv3.Client, options ... func newElector(ctx context.Context, cli *clientv3.Client, cfg clientv3.Config, options ...concurrency.SessionOption) (*Elector, error) { var err error if cli == nil { - cli, err = clientv3.New(cfg) + cli, err = clientv3.New(cfg) // async dial etcd if err != nil { return nil, err } @@ -80,6 +82,11 @@ func newElector(ctx context.Context, cli *clientv3.Client, cfg clientv3.Config, client: cli, logger: nullLogger, } + + err = el.pingEtcd("/") + if err != nil { + return nil, err + } return el, nil } @@ -139,6 +146,17 @@ func (e *Elector) unsetLeader(id string) { e.leaderID = id } +func (e *Elector) pingEtcd(electionPath string) error { + timeoutCtx, cancel := context.WithTimeout(e.ctx, 6*time.Second) + defer cancel() + + _, _ = e.client.KV.Get(timeoutCtx, electionPath) + if timeoutCtx.Err() == context.DeadlineExceeded { + return ErrPingEtcd + } + return nil +} + // Start Start the election. // This method will keep trying the election. When the election is successful, set isleader to true. // If it fails, the election directory will be monitored until the election is successful. diff --git a/elector_test.go b/elector_test.go index f2c6b49..8b7b543 100644 --- a/elector_test.go +++ b/elector_test.go @@ -17,6 +17,18 @@ var ( testElectionPath = "/gocron/elector/" ) +func TestGocronDialTimeout(t *testing.T) { + start := time.Now() + _, err := NewElector(context.Background(), Config{ + Endpoints: []string{"http://127.0.0.1:2000"}, // invalid etcd + }) + assert.Equal(t, ErrPingEtcd, err) + + // 5< 6 < 7 + assert.Greater(t, int(time.Since(start).Seconds()), 5) + assert.Less(t, int(time.Since(start).Seconds()), 8) +} + func TestGocronWithElector(t *testing.T) { el, err := NewElector(context.Background(), testConfig, WithTTL(1)) assert.Equal(t, nil, err) diff --git a/go.mod b/go.mod index 1f49843..337db93 100644 --- a/go.mod +++ b/go.mod @@ -15,7 +15,7 @@ require ( github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/protobuf v1.5.3 // indirect github.com/google/uuid v1.3.1 // indirect - github.com/pkg/errors v0.9.1 // indirect + github.com/pkg/errors v0.9.1 github.com/pmezard/go-difflib v1.0.0 // indirect github.com/robfig/cron/v3 v3.0.1 // indirect go.uber.org/atomic v1.9.0 // indirect