Skip to content

Commit

Permalink
fix: ping etcd
Browse files Browse the repository at this point in the history
Signed-off-by: rfyiamcool <[email protected]>
  • Loading branch information
rfyiamcool committed Sep 26, 2023
1 parent b50490f commit d97d6e2
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 5 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ func main() {
s := gocron.NewScheduler(time.UTC)
s.WithDistributedElector(el)

s.Every("1s").Do(func() {
if el.IsLeader(context.TODO()) == nil {
_, _ = s.Every("1s").Do(func() {
if el.IsLeader(context.Background()) == nil {
fmt.Println("the current instance is leader")
} else {
fmt.Println("the current leader is", el.GetLeaderID())
Expand Down
22 changes: 20 additions & 2 deletions elector.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@ package elector
import (
"context"
"crypto/rand"
"errors"
"fmt"
mrand "math/rand"
"os"
"sync"
"time"

"github.com/pkg/errors"

"github.com/go-co-op/gocron"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/concurrency"
Expand All @@ -18,6 +19,7 @@ import (
var (
ErrNonLeader = errors.New("the elector is not leader")
ErrClosed = errors.New("the elector is already closed")
ErrPingEtcd = errors.New("ping etcd server timeout")
)

var (
Expand Down Expand Up @@ -64,7 +66,7 @@ func NewElectorWithClient(ctx context.Context, cli *clientv3.Client, options ...
func newElector(ctx context.Context, cli *clientv3.Client, cfg clientv3.Config, options ...concurrency.SessionOption) (*Elector, error) {
var err error
if cli == nil {
cli, err = clientv3.New(cfg)
cli, err = clientv3.New(cfg) // async dial etcd
if err != nil {
return nil, err
}
Expand All @@ -80,6 +82,11 @@ func newElector(ctx context.Context, cli *clientv3.Client, cfg clientv3.Config,
client: cli,
logger: nullLogger,
}

err = el.pingEtcd("/")
if err != nil {
return nil, err
}
return el, nil
}

Expand Down Expand Up @@ -139,6 +146,17 @@ func (e *Elector) unsetLeader(id string) {
e.leaderID = id
}

func (e *Elector) pingEtcd(electionPath string) error {
timeoutCtx, cancel := context.WithTimeout(e.ctx, 6*time.Second)
defer cancel()

_, _ = e.client.KV.Get(timeoutCtx, electionPath)
if timeoutCtx.Err() == context.DeadlineExceeded {
return ErrPingEtcd
}
return nil
}

// Start Start the election.
// This method will keep trying the election. When the election is successful, set isleader to true.
// If it fails, the election directory will be monitored until the election is successful.
Expand Down
12 changes: 12 additions & 0 deletions elector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,18 @@ var (
testElectionPath = "/gocron/elector/"
)

func TestGocronDialTimeout(t *testing.T) {
start := time.Now()
_, err := NewElector(context.Background(), Config{
Endpoints: []string{"http://127.0.0.1:2000"}, // invalid etcd
})
assert.Equal(t, ErrPingEtcd, err)

// 5< 6 < 7
assert.Greater(t, int(time.Since(start).Seconds()), 5)
assert.Less(t, int(time.Since(start).Seconds()), 8)
}

func TestGocronWithElector(t *testing.T) {
el, err := NewElector(context.Background(), testConfig, WithTTL(1))
assert.Equal(t, nil, err)
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ require (
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/google/uuid v1.3.1 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pkg/errors v0.9.1
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/robfig/cron/v3 v3.0.1 // indirect
go.uber.org/atomic v1.9.0 // indirect
Expand Down

0 comments on commit d97d6e2

Please sign in to comment.