Skip to content

Commit

Permalink
fix: fix linting
Browse files Browse the repository at this point in the history
  • Loading branch information
Tochemey committed Dec 9, 2024
1 parent fe503f4 commit c176c1b
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 27 deletions.
14 changes: 11 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -152,14 +152,22 @@ When cluster mode is enabled, passivated actors are removed from the entire clus
### Supervision

In GoAkt, supervision allows to define the various strategies to apply when a given actor is faulty.
The supervisory strategy to adopt is set during the creation of the actor system.
**_The supervisory strategies to adopt are to be set during the creation of the actor_**. One can set as many as he/she wants supervisor strategies based upon various error types using the `SpawnOption` method `WithSupervisorStrategies`.
To create a supervisor strategy one needs to call the `NewSupervisorStrategy` function and pass _the error type_ and the corresponding _directive_.
GoAkt comes bundled with the following default supervisor strategies that can be overriden when creating an actor:
- `NewSupervisorStrategy(PanicError{}, NewStopDirective())`: this will stop the faulty actor in case of panic.
- `NewSupervisorStrategy(&runtime.PanicNilError{}, NewStopDirective())`: this will stop the faulty actor for nil panic error

Note: GoAkt will suspend a faulty actor when there is no supervisor strategy set in place for the corresponding error type. Once can check the state of the actor using the `IsSuspended` method on the `PID`.
A suspended actor can be restarted or shutdown, however it cannot handle any messages sent to it.

In GoAkt each child actor is treated separately. There is no concept of one-for-one and one-for-all strategies.
The following directives are supported:
- [`Restart`](./actors/supervisor_directive.go): to restart the child actor. One can control how the restart is done using the following options: - `maxNumRetries`: defines the maximum of restart attempts - `timeout`: how to attempt restarting the faulty actor.
- [`Stop`](./actors/supervisor_directive.go): to stop the child actor which is the default one
- [`Stop`](./actors/supervisor_directive.go): to stop the child actor which is the default one as long as its descendants.
- [`Resume`](./actors/supervisor_directive.go): ignores the failure and process the next message, instead.

With the `Restart` directive, every child actor of the faulty is stopped and garbage-collected when the given parent is restarted. This helps avoid resources leaking.
With the `Restart` directive, only the direct alive children of the given actor will be shudown and respawned with their initial state.
There are only two scenarios where an actor can supervise another actor:
- It watches the given actor via the `Watch` method. With this method the parent actor can also listen to the `Terminated` message to decide what happens next to the child actor.
- The actor to be supervised is a child of the given actor.
Expand Down
24 changes: 19 additions & 5 deletions actors/pid.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,9 @@ type PID struct {
address *address.Address

// helps determine whether the actor should handle messages or not.
started atomic.Bool
stopping atomic.Bool
started atomic.Bool
stopping atomic.Bool
suspended atomic.Bool

latestReceiveTime atomic.Time
latestReceiveDuration atomic.Duration
Expand Down Expand Up @@ -182,12 +183,15 @@ func newPID(ctx context.Context, address *address.Address, actor Actor, opts ...
pid.latestReceiveDuration.Store(0)
pid.started.Store(false)
pid.stopping.Store(false)
pid.suspended.Store(false)
pid.passivateAfter.Store(DefaultPassivationTimeout)
pid.initTimeout.Store(DefaultInitTimeout)
pid.processing.Store(int32(idle))

// set default strategies mappings
pid.supervisorStrategies.Put(NewSupervisorStrategy(new(PanicError), NewStopDirective()))
for _, strategy := range DefaultSupervisorStrategies {
pid.supervisorStrategies.Put(strategy)
}

for _, opt := range opts {
opt(pid)
Expand Down Expand Up @@ -328,7 +332,13 @@ func (pid *PID) Stop(ctx context.Context, cid *PID) error {
// IsRunning returns true when the actor is alive ready to process messages and false
// when the actor is stopped or not started at all
func (pid *PID) IsRunning() bool {
return pid.started.Load()
return pid.started.Load() && !pid.suspended.Load()
}

// IsSuspended returns true when the actor is suspended
// A suspended actor is a faulty actor
func (pid *PID) IsSuspended() bool {
return pid.suspended.Load()
}

// ActorSystem returns the actor system
Expand Down Expand Up @@ -428,6 +438,7 @@ func (pid *PID) Restart(ctx context.Context) error {
}

pid.processing.Store(int32(idle))
pid.suspended.Store(false)
pid.supervisionLoop()
if pid.passivateAfter.Load() > 0 {
go pid.passivationLoop()
Expand Down Expand Up @@ -1258,6 +1269,7 @@ func (pid *PID) reset() {
pid.behaviorStack.Reset()
pid.processedCount.Store(0)
pid.stopping.Store(false)
pid.suspended.Store(false)
pid.supervisorStrategies.Reset()
}

Expand Down Expand Up @@ -1518,7 +1530,7 @@ func (pid *PID) doStop(ctx context.Context) error {
}

pid.started.Store(false)
pid.logger.Infof("post shutdown process is on going for actor=%s...", pid.Name())
pid.logger.Infof("shutdown process completed for actor=%s...", pid.Name())
pid.reset()
return nil
}
Expand Down Expand Up @@ -1547,6 +1559,7 @@ func (pid *PID) notifyParent(err error) {
strategy, ok := pid.supervisorStrategies.Get(err)
if !ok {
pid.logger.Debugf("no supervisor directive found for error: %s", errorType(err))
pid.suspended.Store(true)
// no supervisor strategy found no-op
// business as usual
return
Expand Down Expand Up @@ -1578,6 +1591,7 @@ func (pid *PID) notifyParent(err error) {
// no supervisor strategy found no-op
// business as usual
pid.logger.Debugf("unknown directive: %T found for error: %s", d, errorType(err))
pid.suspended.Store(true)
return
}

Expand Down
41 changes: 23 additions & 18 deletions actors/pid_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -584,7 +584,7 @@ func TestSupervisorStrategy(t *testing.T) {
assert.NoError(t, err)
assert.NoError(t, actorSystem.Stop(ctx))
})
t.Run("With the default supervisor directive", func(t *testing.T) {
t.Run("With the default supervisor directives", func(t *testing.T) {
ctx := context.TODO()
host := "127.0.0.1"
ports := dynaport.Get(1)
Expand Down Expand Up @@ -628,7 +628,7 @@ func TestSupervisorStrategy(t *testing.T) {
assert.NoError(t, err)
assert.NoError(t, actorSystem.Stop(ctx))
})
t.Run("With undefined directive no-op", func(t *testing.T) {
t.Run("With undefined directive suspends actor", func(t *testing.T) {
ctx := context.TODO()
host := "127.0.0.1"
ports := dynaport.Get(1)
Expand All @@ -647,33 +647,37 @@ func TestSupervisorStrategy(t *testing.T) {

parent, err := actorSystem.Spawn(ctx, "test", newTestSupervisor())
require.NoError(t, err)
assert.NotNil(t, parent)
require.NotNil(t, parent)

// create the child actor
fakeStrategy := NewSupervisorStrategy(PanicError{}, new(unhandledSupervisorDirective))
child, err := parent.SpawnChild(ctx, "SpawnChild", newTestSupervised(), WithSupervisorStrategies(fakeStrategy))
assert.NoError(t, err)
assert.NotNil(t, child)
require.NoError(t, err)
require.NotNil(t, child)

lib.Pause(time.Second)

assert.Len(t, parent.Children(), 1)
require.Len(t, parent.Children(), 1)
// send a test panic message to the actor
assert.NoError(t, Tell(ctx, child, new(testpb.TestPanic)))
require.NoError(t, Tell(ctx, child, new(testpb.TestPanic)))

// wait for the child to properly shutdown
lib.Pause(time.Second)

// assert the actor state
assert.True(t, child.IsRunning())
assert.Len(t, parent.Children(), 1)
require.False(t, child.IsRunning())
require.True(t, child.IsSuspended())
require.Len(t, parent.Children(), 0)

// trying sending a message to the actor will return an error
require.Error(t, Tell(ctx, child, new(testpb.TestSend)))

//stop the actor
err = parent.Shutdown(ctx)
assert.NoError(t, err)
require.NoError(t, err)
assert.NoError(t, actorSystem.Stop(ctx))
})
t.Run("With directive not found no-op", func(t *testing.T) {
t.Run("With directive not found suspends actor", func(t *testing.T) {
ctx := context.TODO()
host := "127.0.0.1"
ports := dynaport.Get(1)
Expand All @@ -692,28 +696,29 @@ func TestSupervisorStrategy(t *testing.T) {

parent, err := actorSystem.Spawn(ctx, "test", newTestSupervisor())
require.NoError(t, err)
assert.NotNil(t, parent)
require.NotNil(t, parent)

// create the child actor
child, err := parent.SpawnChild(ctx, "SpawnChild", newTestSupervised())
assert.NoError(t, err)
assert.NotNil(t, child)
require.NoError(t, err)
require.NotNil(t, child)

lib.Pause(time.Second)

// just for the sake of the test we remove the default directive
child.supervisorStrategies = newStrategiesMap()

assert.Len(t, parent.Children(), 1)
require.Len(t, parent.Children(), 1)
// send a message to the actor which result in panic
assert.NoError(t, Tell(ctx, child, new(testpb.TestPanic)))
require.NoError(t, Tell(ctx, child, new(testpb.TestPanic)))

// wait for the child to properly shutdown
lib.Pause(time.Second)

// assert the actor state
assert.True(t, child.IsRunning())
assert.Len(t, parent.Children(), 1)
require.False(t, child.IsRunning())
require.True(t, child.IsSuspended())
require.Len(t, parent.Children(), 0)

//stop the actor
err = parent.Shutdown(ctx)
Expand Down
4 changes: 3 additions & 1 deletion actors/supervisor_directive.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,10 @@ func NewResumeDirective() *ResumeDirective {
return new(ResumeDirective)
}

// implements the SupervisorDirective
func (*ResumeDirective) isSupervisorDirective() {}

// RestartDirective defines supervisor restart directive
// RestartDirective defines the supervisor restart directive
type RestartDirective struct {
// Specifies the maximum number of retries
// When reaching this number the faulty actor is stopped
Expand Down Expand Up @@ -85,4 +86,5 @@ func (x *RestartDirective) WithLimit(maxNumRetries uint32, timeout time.Duration
x.timeout = timeout
}

// implements the SupervisorDirective
func (*RestartDirective) isSupervisorDirective() {}
7 changes: 7 additions & 0 deletions actors/supervisor_strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,16 @@ package actors

import (
"reflect"
"runtime"
"sync"
)

// DefaultSupervisorStrategies defines the default supervisor strategies
var DefaultSupervisorStrategies = []*SupervisorStrategy{
NewSupervisorStrategy(PanicError{}, NewStopDirective()),
NewSupervisorStrategy(&runtime.PanicNilError{}, NewStopDirective()),
}

// SupervisorStrategy defines the rules to apply to a faulty actor
// during message processing
type SupervisorStrategy struct {
Expand Down

0 comments on commit c176c1b

Please sign in to comment.