Skip to content

Commit

Permalink
refactor(enhancement): refactor the actors data structure (#543)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tochemey authored Dec 1, 2024
1 parent f1b9176 commit 219ddaf
Show file tree
Hide file tree
Showing 21 changed files with 2,457 additions and 1,426 deletions.
231 changes: 139 additions & 92 deletions actors/actor_system.go

Large diffs are not rendered by default.

60 changes: 53 additions & 7 deletions actors/actor_system_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,20 +88,17 @@ func TestActorSystem(t *testing.T) {
err := sys.Start(ctx)
assert.NoError(t, err)

lib.Pause(time.Second)

actor := newActor()
actorRef, err := sys.Spawn(ctx, "Test", actor)
assert.NoError(t, err)
assert.NotNil(t, actorRef)

// stop the actor after some time
lib.Pause(time.Second)

t.Cleanup(
func() {
err = sys.Stop(ctx)
assert.NoError(t, err)
},
)
err = sys.Stop(ctx)
assert.NoError(t, err)
})
t.Run("With Spawn an actor with invalid actor name", func(t *testing.T) {
ctx := context.TODO()
Expand Down Expand Up @@ -1147,6 +1144,55 @@ func TestActorSystem(t *testing.T) {

require.NoError(t, sys.Stop(ctx))
})
t.Run("With SpawnNamedFromFunc when actor already exist", func(t *testing.T) {
ctx := context.TODO()
ports := dynaport.Get(1)

logger := log.DiscardLogger
host := "127.0.0.1"

newActorSystem, err := NewActorSystem(
"test",
WithPassivationDisabled(),
WithLogger(logger),
WithJanitorInterval(time.Minute),
WithRemoting(host, int32(ports[0])))

require.NoError(t, err)

// start the actor system
err = newActorSystem.Start(ctx)
require.NoError(t, err)

receiveFn := func(_ context.Context, message proto.Message) error {
expected := &testpb.Reply{Content: "test spawn from func"}
assert.True(t, proto.Equal(expected, message))
return nil
}

actorName := "name"
actorRef, err := newActorSystem.SpawnNamedFromFunc(ctx, actorName, receiveFn)
assert.NoError(t, err)
assert.NotNil(t, actorRef)

// stop the actor after some time
lib.Pause(time.Second)

// send a message to the actor
require.NoError(t, Tell(ctx, actorRef, &testpb.Reply{Content: "test spawn from func"}))

newInstance, err := newActorSystem.SpawnNamedFromFunc(ctx, actorName, receiveFn)
require.NoError(t, err)
require.NotNil(t, newInstance)
require.True(t, newInstance.Equals(actorRef))

t.Cleanup(
func() {
err = newActorSystem.Stop(ctx)
assert.NoError(t, err)
},
)
})
t.Run("With SpawnFromFunc (cluster/remote enabled)", func(t *testing.T) {
ctx := context.TODO()
nodePorts := dynaport.Get(3)
Expand Down
13 changes: 7 additions & 6 deletions actors/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,13 +242,13 @@ func (e *exchanger) PostStop(context.Context) error {

var _ Actor = &exchanger{}

type stasQA struct{}
type stashQA struct{}

func (x *stasQA) PreStart(context.Context) error {
func (x *stashQA) PreStart(context.Context) error {
return nil
}

func (x *stasQA) Receive(ctx *ReceiveContext) {
func (x *stashQA) Receive(ctx *ReceiveContext) {
switch ctx.Message().(type) {
case *goaktpb.PostStart:
case *testspb.TestStash:
Expand All @@ -260,7 +260,7 @@ func (x *stasQA) Receive(ctx *ReceiveContext) {
}
}

func (x *stasQA) Ready(ctx *ReceiveContext) {
func (x *stashQA) Ready(ctx *ReceiveContext) {
switch ctx.Message().(type) {
case *goaktpb.PostStart:
case *testspb.TestStash:
Expand All @@ -276,11 +276,11 @@ func (x *stasQA) Ready(ctx *ReceiveContext) {
}
}

func (x *stasQA) PostStop(context.Context) error {
func (x *stashQA) PostStop(context.Context) error {
return nil
}

var _ Actor = &stasQA{}
var _ Actor = &stashQA{}

type preStartQA struct{}

Expand Down Expand Up @@ -464,6 +464,7 @@ func startClusterSystem(t *testing.T, serverAddr string) (ActorSystem, discovery
WithPassivationDisabled(),
WithLogger(logger),
WithRemoting(host, int32(remotingPort)),
WithJanitorInterval(time.Minute),
WithPeerStateLoopInterval(500*time.Millisecond),
WithCluster(
NewClusterConfig().
Expand Down
Loading

0 comments on commit 219ddaf

Please sign in to comment.