Skip to content

Commit

Permalink
feat: add cluster client (#371)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tochemey authored Jun 22, 2024
1 parent 350ec08 commit f0a5336
Show file tree
Hide file tree
Showing 36 changed files with 2,197 additions and 176 deletions.
25 changes: 0 additions & 25 deletions .mockery.yaml

This file was deleted.

4 changes: 3 additions & 1 deletion Earthfile
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ mock:
FROM +code

# generate the mocks
RUN mockery
RUN mockery --dir hash --name Hasher --keeptree --exported=true --with-expecter=true --inpackage=true --disable-version-string=true --output ./mocks/hash --case snake
RUN mockery --dir discovery --name Provider --keeptree --exported=true --with-expecter=true --inpackage=true --disable-version-string=true --output ./mocks/discovery --case snake
RUN mockery --dir internal/cluster --name Interface --keeptree --exported=true --with-expecter=true --inpackage=true --disable-version-string=true --output ./mocks/cluster --case snake

SAVE ARTIFACT ./mocks mocks AS LOCAL mocks

Expand Down
184 changes: 135 additions & 49 deletions actors/actor_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ type ActorSystem interface {
// Only a single instance of the ActorSystem can be created on a given node
type actorSystem struct {
internalpbconnect.UnimplementedRemotingServiceHandler
internalpbconnect.UnimplementedClusterServiceHandler

// map of actors in the system
actors *pidMap
Expand Down Expand Up @@ -210,10 +211,11 @@ type actorSystem struct {
registry types.Registry
reflection reflection

peersCacheMu *sync.RWMutex
peersCache map[string][]byte
clusterConfig *ClusterConfig
redistributionChan chan *cluster.Event
peersStateLoopInterval time.Duration
peersCacheMu *sync.RWMutex
peersCache map[string][]byte
clusterConfig *ClusterConfig
redistributionChan chan *cluster.Event
}

// enforce compilation error when all methods of the ActorSystem interface are not implemented
Expand All @@ -230,28 +232,29 @@ func NewActorSystem(name string, opts ...Option) (ActorSystem, error) {
}

system := &actorSystem{
actors: newPIDMap(1_000), // TODO need to check with memory footprint here since we change the map engine
actorsChan: make(chan *internalpb.WireActor, 10),
name: name,
logger: log.DefaultLogger,
expireActorAfter: DefaultPassivationTimeout,
replyTimeout: DefaultReplyTimeout,
actorInitMaxRetries: DefaultInitMaxRetries,
supervisorStrategy: DefaultSupervisoryStrategy,
telemetry: telemetry.New(),
locker: sync.Mutex{},
shutdownTimeout: DefaultShutdownTimeout,
mailboxSize: DefaultMailboxSize,
housekeeperStopSig: make(chan types.Unit, 1),
eventsStream: eventstream.New(),
partitionHasher: hash.DefaultHasher(),
actorInitTimeout: DefaultInitTimeout,
tracer: noop.NewTracerProvider().Tracer(name),
clusterEventsChan: make(chan *cluster.Event, 1),
registry: types.NewRegistry(),
clusterSyncStopSig: make(chan types.Unit, 1),
peersCacheMu: &sync.RWMutex{},
peersCache: make(map[string][]byte),
actors: newPIDMap(1_000), // TODO need to check with memory footprint here since we change the map engine
actorsChan: make(chan *internalpb.WireActor, 10),
name: name,
logger: log.DefaultLogger,
expireActorAfter: DefaultPassivationTimeout,
replyTimeout: DefaultReplyTimeout,
actorInitMaxRetries: DefaultInitMaxRetries,
supervisorStrategy: DefaultSupervisoryStrategy,
telemetry: telemetry.New(),
locker: sync.Mutex{},
shutdownTimeout: DefaultShutdownTimeout,
mailboxSize: DefaultMailboxSize,
housekeeperStopSig: make(chan types.Unit, 1),
eventsStream: eventstream.New(),
partitionHasher: hash.DefaultHasher(),
actorInitTimeout: DefaultInitTimeout,
tracer: noop.NewTracerProvider().Tracer(name),
clusterEventsChan: make(chan *cluster.Event, 1),
registry: types.NewRegistry(),
clusterSyncStopSig: make(chan types.Unit, 1),
peersCacheMu: &sync.RWMutex{},
peersCache: make(map[string][]byte),
peersStateLoopInterval: DefaultPeerStateLoopInterval,
}

system.started.Store(false)
Expand Down Expand Up @@ -661,7 +664,7 @@ func (x *actorSystem) ActorOf(ctx context.Context, actorName string) (addr *goak
}

// check in the cluster
if x.cluster != nil || x.clusterEnabled.Load() {
if x.clusterEnabled.Load() {
actor, err := x.cluster.GetActor(spanCtx, actorName)
if err != nil {
if errors.Is(err, cluster.ErrActorNotFound) {
Expand Down Expand Up @@ -851,24 +854,41 @@ func (x *actorSystem) Stop(ctx context.Context) error {
}

// RemoteLookup for an actor on a remote host.
func (x *actorSystem) RemoteLookup(_ context.Context, request *connect.Request[internalpb.RemoteLookupRequest]) (*connect.Response[internalpb.RemoteLookupResponse], error) {
func (x *actorSystem) RemoteLookup(ctx context.Context, request *connect.Request[internalpb.RemoteLookupRequest]) (*connect.Response[internalpb.RemoteLookupResponse], error) {
logger := x.logger
msg := request.Msg

reqCopy := request.Msg
if !x.remotingEnabled.Load() {
return nil, connect.NewError(connect.CodeFailedPrecondition, ErrRemotingDisabled)
}

actorPath := NewPath(reqCopy.GetName(), NewAddress(x.Name(), reqCopy.GetHost(), int(reqCopy.GetPort())))
remoteAddr := fmt.Sprintf("%s:%d", x.remotingHost, x.remotingPort)
if remoteAddr != net.JoinHostPort(msg.GetHost(), strconv.Itoa(int(msg.GetPort()))) {
return nil, connect.NewError(connect.CodeInvalidArgument, ErrInvalidHost)
}

if x.clusterEnabled.Load() {
actorName := msg.GetName()
actor, err := x.cluster.GetActor(ctx, actorName)
if err != nil {
if errors.Is(err, cluster.ErrActorNotFound) {
logger.Error(ErrAddressNotFound(actorName).Error())
return nil, ErrAddressNotFound(actorName)
}

return nil, connect.NewError(connect.CodeInternal, err)
}
return connect.NewResponse(&internalpb.RemoteLookupResponse{Address: actor.GetActorAddress()}), nil
}

actorPath := NewPath(msg.GetName(), NewAddress(x.Name(), msg.GetHost(), int(msg.GetPort())))
pid, exist := x.actors.get(actorPath)
if !exist {
logger.Error(ErrAddressNotFound(actorPath.String()).Error())
return nil, ErrAddressNotFound(actorPath.String())
}

addr := pid.ActorPath().RemoteAddress()

return connect.NewResponse(&internalpb.RemoteLookupResponse{Address: addr}), nil
return connect.NewResponse(&internalpb.RemoteLookupResponse{Address: pid.ActorPath().RemoteAddress()}), nil
}

// RemoteAsk is used to send a message to an actor remotely and expect a response
Expand Down Expand Up @@ -900,10 +920,15 @@ func (x *actorSystem) RemoteAsk(ctx context.Context, stream *connect.BidiStream[
}

message := request.GetRemoteMessage()
receiver := message.GetReceiver()
name := receiver.GetName()

name := message.GetReceiver().GetName()
actorPath := NewPath(name, NewAddress(x.name, x.remotingHost, int(x.remotingPort)))
remoteAddr := fmt.Sprintf("%s:%d", x.remotingHost, x.remotingPort)
if remoteAddr != net.JoinHostPort(receiver.GetHost(), strconv.Itoa(int(receiver.GetPort()))) {
return connect.NewError(connect.CodeInvalidArgument, ErrInvalidHost)
}

actorPath := NewPath(name, NewAddress(x.name, x.remotingHost, int(x.remotingPort)))
pid, exist := x.actors.get(actorPath)
if !exist {
logger.Error(ErrAddressNotFound(actorPath.String()).Error())
Expand Down Expand Up @@ -990,13 +1015,18 @@ func (x *actorSystem) RemoteTell(ctx context.Context, stream *connect.ClientStre
func (x *actorSystem) RemoteReSpawn(ctx context.Context, request *connect.Request[internalpb.RemoteReSpawnRequest]) (*connect.Response[internalpb.RemoteReSpawnResponse], error) {
logger := x.logger

reqCopy := request.Msg
msg := request.Msg

if !x.remotingEnabled.Load() {
return nil, connect.NewError(connect.CodeFailedPrecondition, ErrRemotingDisabled)
}

actorPath := NewPath(reqCopy.GetName(), NewAddress(x.Name(), reqCopy.GetHost(), int(reqCopy.GetPort())))
remoteAddr := fmt.Sprintf("%s:%d", x.remotingHost, x.remotingPort)
if remoteAddr != net.JoinHostPort(msg.GetHost(), strconv.Itoa(int(msg.GetPort()))) {
return nil, connect.NewError(connect.CodeInvalidArgument, ErrInvalidHost)
}

actorPath := NewPath(msg.GetName(), NewAddress(x.Name(), msg.GetHost(), int(msg.GetPort())))
pid, exist := x.actors.get(actorPath)
if !exist {
logger.Error(ErrAddressNotFound(actorPath.String()).Error())
Expand All @@ -1015,13 +1045,18 @@ func (x *actorSystem) RemoteReSpawn(ctx context.Context, request *connect.Reques
func (x *actorSystem) RemoteStop(ctx context.Context, request *connect.Request[internalpb.RemoteStopRequest]) (*connect.Response[internalpb.RemoteStopResponse], error) {
logger := x.logger

reqCopy := request.Msg
msg := request.Msg

if !x.remotingEnabled.Load() {
return nil, connect.NewError(connect.CodeFailedPrecondition, ErrRemotingDisabled)
}

actorPath := NewPath(reqCopy.GetName(), NewAddress(x.Name(), reqCopy.GetHost(), int(reqCopy.GetPort())))
remoteAddr := fmt.Sprintf("%s:%d", x.remotingHost, x.remotingPort)
if remoteAddr != net.JoinHostPort(msg.GetHost(), strconv.Itoa(int(msg.GetPort()))) {
return nil, connect.NewError(connect.CodeInvalidArgument, ErrInvalidHost)
}

actorPath := NewPath(msg.GetName(), NewAddress(x.Name(), msg.GetHost(), int(msg.GetPort())))
pid, exist := x.actors.get(actorPath)
if !exist {
logger.Error(ErrAddressNotFound(actorPath.String()).Error())
Expand All @@ -1040,31 +1075,79 @@ func (x *actorSystem) RemoteStop(ctx context.Context, request *connect.Request[i
func (x *actorSystem) RemoteSpawn(ctx context.Context, request *connect.Request[internalpb.RemoteSpawnRequest]) (*connect.Response[internalpb.RemoteSpawnResponse], error) {
logger := x.logger

req := request.Msg

msg := request.Msg
if !x.remotingEnabled.Load() {
return nil, connect.NewError(connect.CodeFailedPrecondition, ErrRemotingDisabled)
}

actor, err := x.reflection.ActorFrom(req.GetActorType())
remoteAddr := fmt.Sprintf("%s:%d", x.remotingHost, x.remotingPort)
if remoteAddr != net.JoinHostPort(msg.GetHost(), strconv.Itoa(int(msg.GetPort()))) {
return nil, connect.NewError(connect.CodeInvalidArgument, ErrInvalidHost)
}

actor, err := x.reflection.ActorFrom(msg.GetActorType())
if err != nil {
logger.Errorf("failed to create actor=[(%s) of type (%s)] on [host=%s, port=%d]: reason: (%v)",
req.GetActorName(), req.GetActorType(), req.GetHost(), req.GetPort(), err)
msg.GetActorName(), msg.GetActorType(), msg.GetHost(), msg.GetPort(), err)

if errors.Is(err, ErrTypeNotRegistered) {
return nil, connect.NewError(connect.CodeFailedPrecondition, ErrTypeNotRegistered)
}

return nil, connect.NewError(connect.CodeInternal, err)
}

_, err = x.Spawn(ctx, req.GetActorName(), actor)
if err != nil {
logger.Errorf("failed to create actor=(%s) on [host=%s, port=%d]: reason: (%v)", req.GetActorName(), req.GetHost(), req.GetPort(), err)
if _, err = x.Spawn(ctx, msg.GetActorName(), actor); err != nil {
logger.Errorf("failed to create actor=(%s) on [host=%s, port=%d]: reason: (%v)", msg.GetActorName(), msg.GetHost(), msg.GetPort(), err)
return nil, connect.NewError(connect.CodeInternal, err)
}

logger.Infof("actor=(%s) successfully created on [host=%s, port=%d]", msg.GetActorName(), msg.GetHost(), msg.GetPort())
return connect.NewResponse(new(internalpb.RemoteSpawnResponse)), nil
}

// GetNodeMetric handles the GetNodeMetric request send the given node
func (x *actorSystem) GetNodeMetric(_ context.Context, request *connect.Request[internalpb.GetNodeMetricRequest]) (*connect.Response[internalpb.GetNodeMetricResponse], error) {
if !x.clusterEnabled.Load() {
return nil, connect.NewError(connect.CodeFailedPrecondition, ErrClusterDisabled)
}

req := request.Msg

remoteAddr := fmt.Sprintf("%s:%d", x.remotingHost, x.remotingPort)
if remoteAddr != req.GetNodeAddress() {
return nil, connect.NewError(connect.CodeInvalidArgument, ErrInvalidHost)
}

actorCount := x.actors.len()
return connect.NewResponse(&internalpb.GetNodeMetricResponse{
NodeRemoteAddress: remoteAddr,
ActorsCount: uint64(actorCount),
}), nil
}

// GetKinds returns the cluster kinds
func (x *actorSystem) GetKinds(_ context.Context, request *connect.Request[internalpb.GetKindsRequest]) (*connect.Response[internalpb.GetKindsResponse], error) {
if !x.clusterEnabled.Load() {
return nil, connect.NewError(connect.CodeFailedPrecondition, ErrClusterDisabled)
}

req := request.Msg
remoteAddr := fmt.Sprintf("%s:%d", x.remotingHost, x.remotingPort)

// routine check
if remoteAddr != req.GetNodeAddress() {
return nil, connect.NewError(connect.CodeInvalidArgument, ErrInvalidHost)
}

kinds := make([]string, len(x.clusterConfig.Kinds()))
for i, kind := range x.clusterConfig.Kinds() {
kinds[i] = types.NameOf(kind)
}

return connect.NewResponse(&internalpb.GetKindsResponse{Kinds: kinds}), nil
}

// handleRemoteAsk handles a synchronous message to another actor and expect a response.
// This block until a response is received or timed out.
func (x *actorSystem) handleRemoteAsk(ctx context.Context, to PID, message proto.Message) (response proto.Message, err error) {
Expand Down Expand Up @@ -1176,9 +1259,12 @@ func (x *actorSystem) enableRemoting(ctx context.Context) {
x.remotingHost = remotingHost
x.remotingPort = int32(remotingPort)

remotingServicePath, remotingServiceHandler := internalpbconnect.NewRemotingServiceHandler(x, opts...)
clusterServicePath, clusterServiceHandler := internalpbconnect.NewClusterServiceHandler(x, opts...)

mux := http.NewServeMux()
path, handler := internalpbconnect.NewRemotingServiceHandler(x, opts...)
mux.Handle(path, handler)
mux.Handle(remotingServicePath, remotingServiceHandler)
mux.Handle(clusterServicePath, clusterServiceHandler)
serverAddr := fmt.Sprintf("%s:%d", x.remotingHost, x.remotingPort)

// TODO revisit the timeouts
Expand Down Expand Up @@ -1314,7 +1400,7 @@ func (x *actorSystem) clusterEventsLoop() {
// peersStateLoop fetches the cluster peers' PeerState and update the node peersCache
func (x *actorSystem) peersStateLoop() {
x.logger.Info("peers state synchronization has started...")
ticker := time.NewTicker(10 * time.Second)
ticker := time.NewTicker(x.peersStateLoopInterval)
tickerStopSig := make(chan types.Unit, 1)
go func() {
for {
Expand Down
3 changes: 1 addition & 2 deletions actors/actor_system_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ import (
clustermocks "github.com/tochemey/goakt/v2/mocks/cluster"
testkit "github.com/tochemey/goakt/v2/mocks/discovery"
"github.com/tochemey/goakt/v2/telemetry"
testpb "github.com/tochemey/goakt/v2/test/data/testpb"
"github.com/tochemey/goakt/v2/test/data/testpb"
)

// nolint
Expand Down Expand Up @@ -1382,7 +1382,6 @@ func TestActorSystem(t *testing.T) {
assert.Error(t, err)
})
})

t.Run("With cluster start failure with remoting not enabled", func(t *testing.T) {
ctx := context.TODO()
logger := log.DiscardLogger
Expand Down
6 changes: 3 additions & 3 deletions actors/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ import (
"google.golang.org/protobuf/types/known/anypb"

"github.com/tochemey/goakt/v2/goaktpb"
internalpb "github.com/tochemey/goakt/v2/internal/internalpb"
"github.com/tochemey/goakt/v2/internal/internalpb"
"github.com/tochemey/goakt/v2/log"
testpb "github.com/tochemey/goakt/v2/test/data/testpb"
"github.com/tochemey/goakt/v2/test/data/testpb"
)

func TestAsk(t *testing.T) {
Expand Down Expand Up @@ -1778,7 +1778,7 @@ func TestAPIRemoteSpawn(t *testing.T) {
// generate the remoting port
ports := dynaport.Get(1)
remotingPort := ports[0]
host := "localhost"
host := "127.0.0.1"

// create the actor system
sys, err := NewActorSystem("test",
Expand Down
2 changes: 1 addition & 1 deletion actors/cluster_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ var _ validation.Validator = (*ClusterConfig)(nil)
// NewClusterConfig creates an instance of ClusterConfig
func NewClusterConfig() *ClusterConfig {
return &ClusterConfig{
kinds: []Actor{new(fnActor)},
kinds: []Actor{new(funcActor)},
minimumPeersQuorum: 1,
replicaCount: 2,
}
Expand Down
2 changes: 1 addition & 1 deletion actors/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import (
"github.com/tochemey/goakt/v2/goaktpb"
"github.com/tochemey/goakt/v2/internal/eventstream"
"github.com/tochemey/goakt/v2/log"
testpb "github.com/tochemey/goakt/v2/test/data/testpb"
"github.com/tochemey/goakt/v2/test/data/testpb"
testspb "github.com/tochemey/goakt/v2/test/data/testpb"
)

Expand Down
Loading

0 comments on commit f0a5336

Please sign in to comment.