Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: enhance message processing #551

Merged
merged 3 commits into from
Dec 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions actors/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,11 +111,11 @@ func toReceiveContext(ctx context.Context, to *PID, message proto.Message, async
if err != nil {
return nil, ErrInvalidRemoteMessage(err)
}
receiveContext := contextFromPool()
receiveContext := getContext()
receiveContext.build(ctx, NoSender, to, actual, async)
return receiveContext.withRemoteSender(address.From(msg.GetSender())), nil
default:
receiveContext := contextFromPool()
receiveContext := getContext()
receiveContext.build(ctx, NoSender, to, message, async)
return receiveContext.withRemoteSender(address.NoSender()), nil
}
Expand Down
46 changes: 46 additions & 0 deletions actors/go_scheduler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* MIT License
*
* Copyright (c) 2022-2024 Arsene Tochemey Gandote
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package actors

// goScheduler is a custom go routines scheduler
// this will help schedule actors message busy
type goScheduler struct {
throughput int
}

// Schedule schedules a function in a go routine to be executed
func (s *goScheduler) Schedule(fn func()) {
go fn()
}

// Throughput returns the scheduler throughput
func (s *goScheduler) Throughput() int {
return s.throughput
}

// newGoScheduler creates an instance of goScheduler
func newGoScheduler(capacity int) *goScheduler {
return &goScheduler{throughput: capacity}
}
2 changes: 1 addition & 1 deletion actors/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func newActor() *actorQA {
}

// Init initialize the actor. This function can be used to set up some database connections
// or some sort of initialization before the actor init processing public
// or some sort of initialization before the actor init processing message
func (p *actorQA) PreStart(context.Context) error {
return nil
}
Expand Down
114 changes: 57 additions & 57 deletions actors/pid.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"errors"
"fmt"
"os"
"runtime"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -59,8 +60,8 @@ type processingState int32
const (
// idle means there are no messages to process
idle processingState = iota
// processing means the PID is processing messages
processing
// busy means the PID is processing messages
busy
)

// taskCompletion is used to track completions' taskCompletion
Expand Down Expand Up @@ -135,13 +136,12 @@ type PID struct {
supervisionChan chan error
supervisionStopSignal chan types.Unit

receiveSignal chan types.Unit
receiveStopSignal chan types.Unit

// atomic flag indicating whether the actor is processing messages
processingMessages atomic.Int32
processing atomic.Int32

remoting *Remoting

goScheduler *goScheduler
}

// newPID creates a new pid
Expand Down Expand Up @@ -174,9 +174,8 @@ func newPID(ctx context.Context, address *address.Address, actor Actor, opts ...
processingTimeLocker: new(sync.Mutex),
supervisionChan: make(chan error, 1),
supervisionStopSignal: make(chan types.Unit, 1),
receiveSignal: make(chan types.Unit, 1),
receiveStopSignal: make(chan types.Unit, 1),
remoting: NewRemoting(),
goScheduler: newGoScheduler(300),
}

pid.initMaxRetries.Store(DefaultInitMaxRetries)
Expand All @@ -185,6 +184,7 @@ func newPID(ctx context.Context, address *address.Address, actor Actor, opts ...
pid.stopping.Store(false)
pid.passivateAfter.Store(DefaultPassivationTimeout)
pid.initTimeout.Store(DefaultInitTimeout)
pid.processing.Store(int32(idle))

for _, opt := range opts {
opt(pid)
Expand All @@ -198,13 +198,12 @@ func newPID(ctx context.Context, address *address.Address, actor Actor, opts ...
return nil, err
}

pid.receiveLoop()
pid.supervisionLoop()
if pid.passivateAfter.Load() > 0 {
go pid.passivationLoop()
}

receiveContext := contextFromPool()
receiveContext := getContext()
receiveContext.build(ctx, NoSender, pid, new(goaktpb.PostStart), true)
pid.doReceive(receiveContext)

Expand Down Expand Up @@ -425,7 +424,7 @@ func (pid *PID) Restart(ctx context.Context) error {
return fmt.Errorf("actor=(%s) failed to restart: %w", pid.Name(), err)
}

pid.receiveLoop()
pid.processing.Store(int32(idle))
pid.supervisionLoop()
if pid.passivateAfter.Load() > 0 {
go pid.passivationLoop()
Expand Down Expand Up @@ -591,16 +590,20 @@ func (pid *PID) Ask(ctx context.Context, to *PID, message proto.Message, timeout
return nil, ErrInvalidTimeout
}

receiveContext := contextFromPool()
receiveContext := getContext()
receiveContext.build(ctx, pid, to, message, false)
to.doReceive(receiveContext)

timer := timers.Get(timeout)

select {
case result := <-receiveContext.response:
timers.Put(timer)
return result, nil
case <-time.After(timeout):
case <-timer.C:
err = ErrRequestTimeout
pid.toDeadletterQueue(receiveContext, err)
timers.Put(timer)
return nil, err
}
}
Expand All @@ -611,7 +614,7 @@ func (pid *PID) Tell(ctx context.Context, to *PID, message proto.Message) error
return ErrDead
}

receiveContext := contextFromPool()
receiveContext := getContext()
receiveContext.build(ctx, pid, to, message, true)

to.doReceive(receiveContext)
Expand Down Expand Up @@ -1117,60 +1120,58 @@ func (pid *PID) doReceive(receiveCtx *ReceiveContext) {
// push the message as a deadletter
pid.toDeadletterQueue(receiveCtx, err)
}
pid.signalMessage()
pid.schedule()
}

// signal that a message has arrived and wake up the actor if needed
func (pid *PID) signalMessage() {
// schedule schedules that a message has arrived and wake up the
// message processing loop
func (pid *PID) schedule() {
// only signal if the actor is not already processing messages
if pid.processingMessages.CompareAndSwap(int32(idle), int32(processing)) {
select {
case pid.receiveSignal <- types.Unit{}:
default:
}
if pid.processing.CompareAndSwap(int32(idle), int32(busy)) {
pid.goScheduler.Schedule(pid.receiveLoop)
}
}

// receiveLoop extracts every message from the actor mailbox
// and pass it to the appropriate behavior for handling
func (pid *PID) receiveLoop() {
go func() {
for {
select {
case <-pid.receiveStopSignal:
return
case <-pid.receiveSignal:
var received *ReceiveContext
if received != nil {
returnToPool(received)
received = nil
}
var received *ReceiveContext
counter, throughput := 0, pid.goScheduler.Throughput()
for {
if counter > throughput {
counter = 0
runtime.Gosched()
}

// Process all messages in the queue one by one
for {
received = pid.mailbox.Dequeue()
if received == nil {
// If no more messages, stop processing
pid.processingMessages.Store(int32(idle))
// Check if new messages were added in the meantime and restart processing
if !pid.mailbox.IsEmpty() && pid.processingMessages.CompareAndSwap(int32(idle), int32(processing)) {
continue
}
break
}
// Process the message
switch msg := received.Message().(type) {
case *goaktpb.PoisonPill:
_ = pid.Shutdown(received.Context())
case *internalpb.HandleFault:
pid.handleFaultyChild(msg)
default:
pid.handleReceived(received)
}
}
counter++
if received != nil {
releaseContext(received)
received = nil
}

if received = pid.mailbox.Dequeue(); received != nil {
// Process the message
switch msg := received.Message().(type) {
case *goaktpb.PoisonPill:
_ = pid.Shutdown(received.Context())
case *internalpb.HandleFault:
pid.handleFaultyChild(msg)
default:
pid.handleReceived(received)
}
}
}()

// If no more messages, change busy state to idle
if !pid.processing.CompareAndSwap(int32(busy), int32(idle)) {
return
}

// Check if new messages were added in the meantime and restart processing
if !pid.mailbox.IsEmpty() && pid.processing.CompareAndSwap(int32(idle), int32(busy)) {
continue
}
return
}
}

// handleReceived picks the right behavior and processes the message
Expand Down Expand Up @@ -1454,7 +1455,6 @@ func (pid *PID) doStop(ctx context.Context) error {

// stop processing messages
pid.supervisionStopSignal <- types.Unit{}
pid.receiveStopSignal <- types.Unit{}

// TODO: revisit this part of the code
// move remaining messages in the mailbox to deadletter queue
Expand Down
10 changes: 5 additions & 5 deletions actors/receive_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,13 @@ var pool = sync.Pool{
},
}

// contextFromPool retrieves a message from the pool
func contextFromPool() *ReceiveContext {
// getContext retrieves a message from the pool
func getContext() *ReceiveContext {
return pool.Get().(*ReceiveContext)
}

// returnToPool sends the message context back to the pool
func returnToPool(receiveContext *ReceiveContext) {
// releaseContext sends the message context back to the pool
func releaseContext(receiveContext *ReceiveContext) {
receiveContext.reset()
pool.Put(receiveContext)
}
Expand Down Expand Up @@ -337,7 +337,7 @@ func (rctx *ReceiveContext) Forward(to *PID) {

if to.IsRunning() {
ctx := context.WithoutCancel(rctx.ctx)
receiveContext := contextFromPool()
receiveContext := getContext()
receiveContext.build(ctx, sender, to, message, true)
to.doReceive(receiveContext)
}
Expand Down
9 changes: 5 additions & 4 deletions actors/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,12 @@ package actors

import (
"time"

"github.com/tochemey/goakt/v2/internal/timer"
)

type nameType int

const (
// DefaultPassivationTimeout defines the default passivation timeout
DefaultPassivationTimeout = 2 * time.Minute
Expand All @@ -48,11 +52,7 @@ const (

systemNamePrefix = "GoAkt"
routeeNamePrefix = "GoAktRoutee"
)

type nameType int

const (
routerType nameType = iota
rebalancerType
rootGuardianType
Expand All @@ -66,6 +66,7 @@ var (
NoSender *PID
// DefaultSupervisoryStrategy defines the default supervisory strategy
DefaultSupervisoryStrategy = NewStopDirective()
timers = timer.NewPool()

systemNames = map[nameType]string{
routerType: "GoAktRouter",
Expand Down
Loading
Loading