Skip to content

Commit

Permalink
refactor: remove system ask timeout and pass timeout to asks (#515)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tochemey authored Nov 11, 2024
1 parent f0a5abe commit 9760b03
Show file tree
Hide file tree
Showing 21 changed files with 115 additions and 215 deletions.
2 changes: 0 additions & 2 deletions actors/actor_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,6 @@ func NewActorSystem(name string, opts ...Option) (ActorSystem, error) {
name: name,
logger: log.New(log.ErrorLevel, os.Stderr),
expireActorAfter: DefaultPassivationTimeout,
askTimeout: DefaultAskTimeout,
actorInitMaxRetries: DefaultInitMaxRetries,
supervisorDirective: DefaultSupervisoryStrategy,
locker: sync.Mutex{},
Expand Down Expand Up @@ -1358,7 +1357,6 @@ func (x *actorSystem) configPID(ctx context.Context, name string, actor Actor, o
// pid inherit the actor system settings defined during instantiation
pidOpts := []pidOption{
withInitMaxRetries(x.actorInitMaxRetries),
withAskTimeout(x.askTimeout),
withCustomLogger(x.logger),
withActorSystem(x),
withSupervisorDirective(x.supervisorDirective),
Expand Down
17 changes: 3 additions & 14 deletions actors/actor_system_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,6 @@ func TestActorSystem(t *testing.T) {
"test",
WithPassivationDisabled(),
WithLogger(logger),
WithReplyTimeout(time.Minute),
WithRemoting(host, int32(remotingPort)),
WithClustering(provider, 9, 1, gossipPort, clusterPort, new(testActor)),
)
Expand Down Expand Up @@ -239,7 +238,7 @@ func TestActorSystem(t *testing.T) {
require.True(t, proto.Equal(remoteAddr, addr))

remoting := NewRemoting()
reply, err := remoting.RemoteAsk(ctx, addr, new(testpb.TestReply), DefaultAskTimeout)
reply, err := remoting.RemoteAsk(ctx, addr, new(testpb.TestReply), 20*time.Second)
require.NoError(t, err)
require.NotNil(t, reply)

Expand Down Expand Up @@ -285,7 +284,6 @@ func TestActorSystem(t *testing.T) {
"test",
WithPassivationDisabled(),
WithLogger(logger),
WithReplyTimeout(time.Minute),
WithRemoting(host, int32(remotingPort)),
)
require.NoError(t, err)
Expand Down Expand Up @@ -384,7 +382,6 @@ func TestActorSystem(t *testing.T) {
"test",
WithPassivationDisabled(),
WithLogger(logger),
WithReplyTimeout(time.Minute),
WithRemoting(host, int32(remotingPort)),
)
require.NoError(t, err)
Expand Down Expand Up @@ -561,7 +558,6 @@ func TestActorSystem(t *testing.T) {
"test",
WithPassivationDisabled(),
WithLogger(logger),
WithReplyTimeout(time.Minute),
WithRemoting(host, int32(remotingPort)),
)
require.NoError(t, err)
Expand Down Expand Up @@ -641,7 +637,6 @@ func TestActorSystem(t *testing.T) {
"test",
WithPassivationDisabled(),
WithLogger(logger),
WithReplyTimeout(time.Minute),
WithRemoting(host, int32(remotingPort)),
)
require.NoError(t, err)
Expand All @@ -667,7 +662,7 @@ func TestActorSystem(t *testing.T) {
Id: "",
},
)
reply, err := remoting.RemoteAsk(ctx, addr, new(testpb.TestReply), DefaultAskTimeout)
reply, err := remoting.RemoteAsk(ctx, addr, new(testpb.TestReply), 20*time.Second)
require.Error(t, err)
require.Nil(t, reply)

Expand Down Expand Up @@ -695,7 +690,6 @@ func TestActorSystem(t *testing.T) {
"test",
WithPassivationDisabled(),
WithLogger(logger),
WithReplyTimeout(time.Minute),
WithRemoting(host, int32(remotingPort)),
)
require.NoError(t, err)
Expand All @@ -719,7 +713,6 @@ func TestActorSystem(t *testing.T) {
"test",
WithPassivationDisabled(),
WithLogger(logger),
WithReplyTimeout(time.Minute),
WithRemoting(host, int32(remotingPort)),
)
require.NoError(t, err)
Expand All @@ -741,7 +734,6 @@ func TestActorSystem(t *testing.T) {
"test",
WithPassivationDisabled(),
WithLogger(logger),
WithReplyTimeout(time.Minute),
WithRemoting(host, int32(remotingPort)),
)
require.NoError(t, err)
Expand Down Expand Up @@ -1074,7 +1066,6 @@ func TestActorSystem(t *testing.T) {
"test",
WithExpireActorAfter(passivateAfter),
WithLogger(logger),
WithReplyTimeout(time.Minute),
WithRemoting(host, int32(remotingPort)),
WithClustering(provider, 9, 1, gossipPort, clusterPort, new(testActor)),
)
Expand Down Expand Up @@ -1249,7 +1240,6 @@ func TestActorSystem(t *testing.T) {
"test",
WithPassivationDisabled(),
WithLogger(logger),
WithReplyTimeout(time.Minute),
WithRemoting(host, int32(remotingPort)),
WithClustering(provider, 9, 1, gossipPort, clusterPort, new(testActor)),
)
Expand Down Expand Up @@ -1545,7 +1535,6 @@ func TestActorSystem(t *testing.T) {
"test",
WithPassivationDisabled(),
WithLogger(logger),
WithReplyTimeout(time.Minute),
WithRemoting(host, int32(remotingPort)),
WithClustering(provider, 9, 1, gossipPort, clusterPort, new(exchanger)),
)
Expand Down Expand Up @@ -1583,7 +1572,7 @@ func TestActorSystem(t *testing.T) {
require.NotNil(t, addr)

// send the message to exchanger actor one using remote messaging
reply, err := remoting.RemoteAsk(ctx, addr, new(testpb.TestReply), DefaultAskTimeout)
reply, err := remoting.RemoteAsk(ctx, addr, new(testpb.TestReply), 20*time.Second)

require.NoError(t, err)
require.NotNil(t, reply)
Expand Down
3 changes: 0 additions & 3 deletions actors/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,6 @@ func TestAsk(t *testing.T) {
sys, err := NewActorSystem(
"test",
WithLogger(logger),
WithReplyTimeout(replyTimeout),
WithPassivationDisabled(),
)
// assert there are no error
Expand Down Expand Up @@ -265,7 +264,6 @@ func TestAsk(t *testing.T) {
sys, err := NewActorSystem(
"test",
WithLogger(logger),
WithReplyTimeout(replyTimeout),
WithPassivationDisabled(),
)
// assert there are no error
Expand Down Expand Up @@ -309,7 +307,6 @@ func TestAsk(t *testing.T) {
sys, err := NewActorSystem(
"test",
WithLogger(logger),
WithReplyTimeout(replyTimeout),
WithPassivationDisabled(),
)
// assert there are no error
Expand Down
2 changes: 2 additions & 0 deletions actors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ var (
ErrSchedulerNotStarted = errors.New("scheduler has not started")
// ErrInvalidMessage is returned when an invalid remote message is sent
ErrInvalidMessage = func(err error) error { return fmt.Errorf("invalid remote message: %w", err) }
// ErrInvalidTimeout is returned when a given timeout is negative or zero
ErrInvalidTimeout = errors.New("invalid timeout")
)

// eof returns true if the given error is an EOF error
Expand Down
1 change: 0 additions & 1 deletion actors/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,6 @@ func startClusterSystem(t *testing.T, nodeName, serverAddr string) (ActorSystem,
actorSystemName,
WithPassivationDisabled(),
WithLogger(logger),
WithReplyTimeout(time.Minute),
WithRemoting(host, int32(remotingPort)),
WithPeerStateLoopInterval(500*time.Millisecond),
WithCluster(
Expand Down
10 changes: 0 additions & 10 deletions actors/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,16 +67,6 @@ func WithLogger(logger log.Logger) Option {
)
}

// WithReplyTimeout sets how long in seconds an actor should reply a command
// in a receive-reply pattern
func WithReplyTimeout(timeout time.Duration) Option {
return OptionFunc(
func(a *actorSystem) {
a.askTimeout = timeout
},
)
}

// WithActorInitMaxRetries sets the number of times to retry an actor init process
func WithActorInitMaxRetries(max int) Option {
return OptionFunc(
Expand Down
5 changes: 0 additions & 5 deletions actors/option_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,6 @@ func TestOption(t *testing.T) {
option: WithExpireActorAfter(2 * time.Second),
expected: actorSystem{expireActorAfter: 2. * time.Second},
},
{
name: "WithReplyTimeout",
option: WithReplyTimeout(2 * time.Second),
expected: actorSystem{askTimeout: 2. * time.Second},
},
{
name: "WithActorInitMaxRetries",
option: WithActorInitMaxRetries(2),
Expand Down
36 changes: 19 additions & 17 deletions actors/pid.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"go.uber.org/atomic"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"
"google.golang.org/protobuf/types/known/durationpb"
"google.golang.org/protobuf/types/known/timestamppb"

"github.com/tochemey/goakt/v2/address"
Expand Down Expand Up @@ -97,10 +98,6 @@ type PID struct {
// any further resources like memory and cpu. The default value is 120 seconds
passivateAfter atomic.Duration

// specifies how long the sender of a mail should wait to receiveLoop a reply
// when using Ask. The default value is 5s
askTimeout atomic.Duration

// specifies the maximum of retries to attempt when the actor
// initialization fails. The default value is 5
initMaxRetries atomic.Int32
Expand Down Expand Up @@ -214,7 +211,6 @@ func newPID(ctx context.Context, address *address.Address, actor Actor, opts ...
pid.latestReceiveDuration.Store(0)
pid.running.Store(false)
pid.passivateAfter.Store(DefaultPassivationTimeout)
pid.askTimeout.Store(DefaultAskTimeout)
pid.initTimeout.Store(DefaultInitTimeout)

for _, opt := range opts {
Expand Down Expand Up @@ -461,7 +457,6 @@ func (pid *PID) SpawnChild(ctx context.Context, name string, actor Actor, opts .
pidOptions := []pidOption{
withInitMaxRetries(int(pid.initMaxRetries.Load())),
withPassivationAfter(pid.passivateAfter.Load()),
withAskTimeout(pid.askTimeout.Load()),
withCustomLogger(pid.logger),
withActorSystem(pid.system),
withSupervisorDirective(pid.supervisorDirective),
Expand Down Expand Up @@ -546,16 +541,18 @@ func (pid *PID) PipeTo(ctx context.Context, to *PID, task future.Task) error {

// Ask sends a synchronous message to another actor and expect a response.
// This block until a response is received or timed out.
func (pid *PID) Ask(ctx context.Context, to *PID, message proto.Message) (response proto.Message, err error) {
func (pid *PID) Ask(ctx context.Context, to *PID, message proto.Message, timeout time.Duration) (response proto.Message, err error) {
if !to.IsRunning() {
return nil, ErrDead
}

if timeout <= 0 {
return nil, ErrInvalidTimeout
}

receiveContext := contextFromPool()
receiveContext.build(ctx, pid, to, message, false)

to.doReceive(receiveContext)
timeout := pid.askTimeout.Load()

select {
case result := <-receiveContext.response:
Expand Down Expand Up @@ -602,7 +599,7 @@ func (pid *PID) SendAsync(ctx context.Context, actorName string, message proto.M
// SendSync sends a synchronous message to another actor and expect a response.
// The location of the given actor is transparent to the caller.
// This block until a response is received or timed out.
func (pid *PID) SendSync(ctx context.Context, actorName string, message proto.Message) (response proto.Message, err error) {
func (pid *PID) SendSync(ctx context.Context, actorName string, message proto.Message, timeout time.Duration) (response proto.Message, err error) {
if !pid.IsRunning() {
return nil, ErrDead
}
Expand All @@ -613,10 +610,10 @@ func (pid *PID) SendSync(ctx context.Context, actorName string, message proto.Me
}

if cid != nil {
return pid.Ask(ctx, cid, message)
return pid.Ask(ctx, cid, message, timeout)
}

reply, err := pid.RemoteAsk(ctx, addr, message)
reply, err := pid.RemoteAsk(ctx, addr, message, timeout)
if err != nil {
return nil, err
}
Expand All @@ -639,12 +636,12 @@ func (pid *PID) BatchTell(ctx context.Context, to *PID, messages ...proto.Messag
// BatchAsk sends a synchronous bunch of messages to the given PID and expect responses in the same order as the messages.
// The messages will be processed one after the other in the order they are sent.
// This is a design choice to follow the simple principle of one message at a time processing by actors.
func (pid *PID) BatchAsk(ctx context.Context, to *PID, messages ...proto.Message) (responses chan proto.Message, err error) {
func (pid *PID) BatchAsk(ctx context.Context, to *PID, messages []proto.Message, timeout time.Duration) (responses chan proto.Message, err error) {
responses = make(chan proto.Message, len(messages))
defer close(responses)

for i := 0; i < len(messages); i++ {
response, err := pid.Ask(ctx, to, messages[i])
response, err := pid.Ask(ctx, to, messages[i], timeout)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -733,11 +730,15 @@ func (pid *PID) RemoteTell(ctx context.Context, to *address.Address, message pro
}

// RemoteAsk sends a synchronous message to another actor remotely and expect a response.
func (pid *PID) RemoteAsk(ctx context.Context, to *address.Address, message proto.Message) (response *anypb.Any, err error) {
func (pid *PID) RemoteAsk(ctx context.Context, to *address.Address, message proto.Message, timeout time.Duration) (response *anypb.Any, err error) {
if pid.remoting == nil {
return nil, ErrRemotingDisabled
}

if timeout <= 0 {
return nil, ErrInvalidTimeout
}

marshaled, err := anypb.New(message)
if err != nil {
return nil, err
Expand All @@ -759,6 +760,7 @@ func (pid *PID) RemoteAsk(ctx context.Context, to *address.Address, message prot
Receiver: to.Address,
Message: marshaled,
},
Timeout: durationpb.New(timeout),
}

stream := remoteService.RemoteAsk(ctx)
Expand Down Expand Up @@ -860,7 +862,7 @@ func (pid *PID) RemoteBatchTell(ctx context.Context, to *address.Address, messag
// RemoteBatchAsk sends a synchronous bunch of messages to a remote actor and expect responses in the same order as the messages.
// Messages are processed one after the other in the order they are sent.
// This can hinder performance if it is not properly used.
func (pid *PID) RemoteBatchAsk(ctx context.Context, to *address.Address, messages []proto.Message) (responses []*anypb.Any, err error) {
func (pid *PID) RemoteBatchAsk(ctx context.Context, to *address.Address, messages []proto.Message, timeout time.Duration) (responses []*anypb.Any, err error) {
if pid.remoting == nil {
return nil, ErrRemotingDisabled
}
Expand All @@ -886,6 +888,7 @@ func (pid *PID) RemoteBatchAsk(ctx context.Context, to *address.Address, message
Receiver: to.Address,
Message: packed,
},
Timeout: durationpb.New(timeout),
},
)
}
Expand Down Expand Up @@ -1210,7 +1213,6 @@ func (pid *PID) init(ctx context.Context) error {
func (pid *PID) reset() {
pid.latestReceiveTime.Store(time.Time{})
pid.passivateAfter.Store(DefaultPassivationTimeout)
pid.askTimeout.Store(DefaultAskTimeout)
pid.shutdownTimeout.Store(DefaultShutdownTimeout)
pid.initMaxRetries.Store(DefaultInitMaxRetries)
pid.latestReceiveDuration.Store(0)
Expand Down
8 changes: 0 additions & 8 deletions actors/pid_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,6 @@ func withPassivationAfter(duration time.Duration) pidOption {
}
}

// withAskTimeout sets how long in seconds an actor should reply a command
// in a receive-reply pattern
func withAskTimeout(timeout time.Duration) pidOption {
return func(pid *PID) {
pid.askTimeout.Store(timeout)
}
}

// withInitMaxRetries sets the number of times to retry an actor init process
func withInitMaxRetries(max int) pidOption {
return func(pid *PID) {
Expand Down
5 changes: 0 additions & 5 deletions actors/pid_option_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,6 @@ func TestPIDOptions(t *testing.T) {
option: withPassivationAfter(time.Second),
expected: &PID{passivateAfter: atomicDuration},
},
{
name: "WithAskTimeout",
option: withAskTimeout(time.Second),
expected: &PID{askTimeout: atomicDuration},
},
{
name: "WithInitMaxRetries",
option: withInitMaxRetries(5),
Expand Down
Loading

0 comments on commit 9760b03

Please sign in to comment.