Skip to content

Commit

Permalink
refactor: refactor nats discovery (#520)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tochemey authored Nov 16, 2024
1 parent 556cc47 commit e9b443c
Show file tree
Hide file tree
Showing 11 changed files with 95 additions and 149 deletions.
9 changes: 5 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -543,6 +543,8 @@ To use the NATS discovery provider one needs to provide the following:
- `Timeout`: the nodes discovery timeout
- `MaxJoinAttempts`: the maximum number of attempts to connect an existing NATs server. Defaults to `5`
- `ReconnectWait`: the time to backoff after attempting a reconnect to a server that we were already connected to previously. Default to `2 seconds`
- `Host`: the given node host address
- `DiscoveryPort`: the discovery port of the given node

```go
package main
Expand All @@ -562,13 +564,12 @@ config := nats.Config{
ActorSystemName: actorSystemName,
NatsServer: natsServer,
NatsSubject: natsSubject,
Host: "127.0.0.1",
DiscoveryPort: 2003
}

// define the host node instance
hostNode := discovery.Node{}

// instantiate the NATS discovery provider by passing the config and the hostNode
disco := nats.NewDiscovery(&config, &hostNode)
disco := nats.NewDiscovery(&config)

// pass the service discovery when enabling cluster mode in the actor system
```
Expand Down
4 changes: 2 additions & 2 deletions actors/actor_system_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1129,7 +1129,7 @@ func TestActorSystem(t *testing.T) {
srv := startNatsServer(t)

// create and start system cluster
cl1, sd1 := startClusterSystem(t, "Node1", srv.Addr().String())
cl1, sd1 := startClusterSystem(t, srv.Addr().String())
peerAddress1 := cl1.PeerAddress()
require.NotEmpty(t, peerAddress1)

Expand All @@ -1139,7 +1139,7 @@ func TestActorSystem(t *testing.T) {
require.NotNil(t, subscriber1)

// create and start system cluster
cl2, sd2 := startClusterSystem(t, "Node2", srv.Addr().String())
cl2, sd2 := startClusterSystem(t, srv.Addr().String())
peerAddress2 := cl2.PeerAddress()
require.NotEmpty(t, peerAddress2)

Expand Down
14 changes: 4 additions & 10 deletions actors/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,7 @@ func startNatsServer(t *testing.T) *natsserver.Server {
return serv
}

func startClusterSystem(t *testing.T, nodeName, serverAddr string) (ActorSystem, discovery.Provider) {
func startClusterSystem(t *testing.T, serverAddr string) (ActorSystem, discovery.Provider) {
ctx := context.TODO()
logger := log.DiscardLogger

Expand All @@ -432,18 +432,12 @@ func startClusterSystem(t *testing.T, nodeName, serverAddr string) (ActorSystem,
ActorSystemName: actorSystemName,
NatsServer: fmt.Sprintf("nats://%s", serverAddr),
NatsSubject: natsSubject,
}

hostNode := discovery.Node{
Name: nodeName,
Host: host,
DiscoveryPort: gossipPort,
PeersPort: clusterPort,
RemotingPort: remotingPort,
Host: host,
DiscoveryPort: gossipPort,
}

// create the instance of provider
provider := nats.NewDiscovery(&config, &hostNode, nats.WithLogger(log.DiscardLogger))
provider := nats.NewDiscovery(&config, nats.WithLogger(log.DiscardLogger))

// create the actor system
system, err := NewActorSystem(
Expand Down
16 changes: 8 additions & 8 deletions actors/pid_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2574,12 +2574,12 @@ func TestSendAsync(t *testing.T) {
srv := startNatsServer(t)

// create and start system cluster
node1, sd1 := startClusterSystem(t, "Node1", srv.Addr().String())
node1, sd1 := startClusterSystem(t, srv.Addr().String())
require.NotNil(t, node1)
require.NotNil(t, sd1)

// create and start system cluster
node2, sd2 := startClusterSystem(t, "Node2", srv.Addr().String())
node2, sd2 := startClusterSystem(t, srv.Addr().String())
require.NotNil(t, node2)
require.NotNil(t, sd2)

Expand Down Expand Up @@ -2613,12 +2613,12 @@ func TestSendAsync(t *testing.T) {
srv := startNatsServer(t)

// create and start system cluster
node1, sd1 := startClusterSystem(t, "Node1", srv.Addr().String())
node1, sd1 := startClusterSystem(t, srv.Addr().String())
require.NotNil(t, node1)
require.NotNil(t, sd1)

// create and start system cluster
node2, sd2 := startClusterSystem(t, "Node2", srv.Addr().String())
node2, sd2 := startClusterSystem(t, srv.Addr().String())
require.NotNil(t, node2)
require.NotNil(t, sd2)

Expand Down Expand Up @@ -2714,12 +2714,12 @@ func TestSendSync(t *testing.T) {
srv := startNatsServer(t)

// create and start system cluster
node1, sd1 := startClusterSystem(t, "Node1", srv.Addr().String())
node1, sd1 := startClusterSystem(t, srv.Addr().String())
require.NotNil(t, node1)
require.NotNil(t, sd1)

// create and start system cluster
node2, sd2 := startClusterSystem(t, "Node2", srv.Addr().String())
node2, sd2 := startClusterSystem(t, srv.Addr().String())
require.NotNil(t, node2)
require.NotNil(t, sd2)

Expand Down Expand Up @@ -2756,12 +2756,12 @@ func TestSendSync(t *testing.T) {
srv := startNatsServer(t)

// create and start system cluster
node1, sd1 := startClusterSystem(t, "Node1", srv.Addr().String())
node1, sd1 := startClusterSystem(t, srv.Addr().String())
require.NotNil(t, node1)
require.NotNil(t, sd1)

// create and start system cluster
node2, sd2 := startClusterSystem(t, "Node2", srv.Addr().String())
node2, sd2 := startClusterSystem(t, srv.Addr().String())
require.NotNil(t, node2)
require.NotNil(t, sd2)

Expand Down
6 changes: 3 additions & 3 deletions actors/redistribution_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,17 +45,17 @@ func TestRedistribution(t *testing.T) {
srv := startNatsServer(t)

// create and start system cluster
node1, sd1 := startClusterSystem(t, "Node1", srv.Addr().String())
node1, sd1 := startClusterSystem(t, srv.Addr().String())
require.NotNil(t, node1)
require.NotNil(t, sd1)

// create and start system cluster
node2, sd2 := startClusterSystem(t, "Node2", srv.Addr().String())
node2, sd2 := startClusterSystem(t, srv.Addr().String())
require.NotNil(t, node2)
require.NotNil(t, sd2)

// create and start system cluster
node3, sd3 := startClusterSystem(t, "Node3", srv.Addr().String())
node3, sd3 := startClusterSystem(t, srv.Addr().String())
require.NotNil(t, node3)
require.NotNil(t, sd3)

Expand Down
4 changes: 3 additions & 1 deletion client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -760,6 +760,8 @@ func startNode(t *testing.T, logger log.Logger, nodeName, serverAddr string) (sy
ActorSystemName: actorSystemName,
NatsServer: fmt.Sprintf("nats://%s", serverAddr),
NatsSubject: natsSubject,
Host: host,
DiscoveryPort: gossipPort,
}

hostNode := discovery.Node{
Expand All @@ -771,7 +773,7 @@ func startNode(t *testing.T, logger log.Logger, nodeName, serverAddr string) (sy
}

// create the instance of provider
natsProvider := nats.NewDiscovery(&config, &hostNode, nats.WithLogger(logger))
natsProvider := nats.NewDiscovery(&config, nats.WithLogger(logger))

clusterConfig := actors.
NewClusterConfig().
Expand Down
6 changes: 6 additions & 0 deletions discovery/nats/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ type Config struct {
// to a server that we were already connected to previously.
// Defaults to 2s.
ReconnectWait time.Duration
// specifies the host address
Host string
// specifies the discovery port
DiscoveryPort int
}

// Validate checks whether the given discovery configuration is valid
Expand All @@ -61,6 +65,8 @@ func (x Config) Validate() error {
AddValidator(validation.NewEmptyStringValidator("NatsSubject", x.NatsSubject)).
AddValidator(validation.NewEmptyStringValidator("ApplicationName", x.ApplicationName)).
AddValidator(validation.NewEmptyStringValidator("ActorSystemName", x.ActorSystemName)).
AddValidator(validation.NewEmptyStringValidator("Host", x.Host)).
AddAssertion(x.DiscoveryPort > 0, "DiscoveryPort is invalid").
Validate()
}

Expand Down
8 changes: 7 additions & 1 deletion discovery/nats/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ func TestConfig(t *testing.T) {
ApplicationName: "applicationName",
ActorSystemName: "actorSys",
NatsSubject: "nats-subject",
Host: "host",
DiscoveryPort: 123,
}
assert.NoError(t, config.Validate())
})
Expand All @@ -46,15 +48,19 @@ func TestConfig(t *testing.T) {
ApplicationName: "applicationName",
ActorSystemName: "actorSys",
NatsSubject: "nats-subject",
Host: "host",
DiscoveryPort: 123,
}
assert.Error(t, config.Validate())
})
t.Run("With invalid host", func(t *testing.T) {
t.Run("With invalid nats server address", func(t *testing.T) {
config := &Config{
NatsServer: "nats://:2322",
ApplicationName: "applicationName",
ActorSystemName: "actorSys",
NatsSubject: "nats-subject",
Host: "host",
DiscoveryPort: 123,
}
assert.Error(t, config.Validate())
})
Expand Down
37 changes: 18 additions & 19 deletions discovery/nats/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,34 +54,33 @@ type Discovery struct {
// define a slice of subscriptions
subscriptions []*nats.Subscription

// defines the host node
hostNode *discovery.Node

// define a logger
logger log.Logger

address string
}

// enforce compilation error
var _ discovery.Provider = &Discovery{}

// NewDiscovery returns an instance of the kubernetes discovery provider
func NewDiscovery(config *Config, hostNode *discovery.Node, opts ...Option) *Discovery {
func NewDiscovery(config *Config, opts ...Option) *Discovery {
// create an instance of
discovery := &Discovery{
d := &Discovery{
mu: sync.Mutex{},
initialized: atomic.NewBool(false),
registered: atomic.NewBool(false),
config: config,
logger: log.DefaultLogger,
hostNode: hostNode,
}

// apply the various options
for _, opt := range opts {
opt.Apply(discovery)
opt.Apply(d)
}

return discovery
d.address = net.JoinHostPort(config.Host, strconv.Itoa(config.DiscoveryPort))
return d
}

// ID returns the discovery provider id
Expand Down Expand Up @@ -117,7 +116,7 @@ func (d *Discovery) Initialize() error {
// create the nats connection option
opts := nats.GetDefaultOptions()
opts.Url = d.config.NatsServer
opts.Name = d.hostNode.Name
opts.Name = d.address
opts.ReconnectWait = d.config.ReconnectWait
opts.MaxReconnect = -1

Expand Down Expand Up @@ -174,9 +173,9 @@ func (d *Discovery) Register() error {
message.GetName(), message.GetHost(), message.GetPort())

response := &internalpb.NatsMessage{
Host: d.hostNode.Host,
Port: int32(d.hostNode.DiscoveryPort),
Name: d.hostNode.Name,
Host: d.config.Host,
Port: int32(d.config.DiscoveryPort),
Name: d.address,
MessageType: internalpb.NatsMessageType_NATS_MESSAGE_TYPE_RESPONSE,
}

Expand Down Expand Up @@ -226,9 +225,9 @@ func (d *Discovery) Deregister() error {
if d.connection != nil {
// send a message to deregister stating we are out
message := &internalpb.NatsMessage{
Host: d.hostNode.Host,
Port: int32(d.hostNode.DiscoveryPort),
Name: d.hostNode.Name,
Host: d.config.Host,
Port: int32(d.config.DiscoveryPort),
Name: d.address,
MessageType: internalpb.NatsMessageType_NATS_MESSAGE_TYPE_DEREGISTER,
}

Expand Down Expand Up @@ -265,9 +264,9 @@ func (d *Discovery) DiscoverPeers() ([]string, error) {
}

request := &internalpb.NatsMessage{
Host: d.hostNode.Host,
Port: int32(d.hostNode.DiscoveryPort),
Name: d.hostNode.Name,
Host: d.config.Host,
Port: int32(d.config.DiscoveryPort),
Name: d.address,
MessageType: internalpb.NatsMessageType_NATS_MESSAGE_TYPE_REQUEST,
}

Expand All @@ -278,7 +277,7 @@ func (d *Discovery) DiscoverPeers() ([]string, error) {

var peers []string
timeout := time.After(d.config.Timeout)
me := d.hostNode.DiscoveryAddress()
me := d.address
for {
select {
case msg, ok := <-recv:
Expand Down
Loading

0 comments on commit e9b443c

Please sign in to comment.