Skip to content

Commit

Permalink
refactor: refactor deployment code
Browse files Browse the repository at this point in the history
  • Loading branch information
Tochemey committed Jun 4, 2024
1 parent fff0a72 commit 2a88132
Show file tree
Hide file tree
Showing 9 changed files with 181 additions and 199 deletions.
110 changes: 35 additions & 75 deletions actors/actor_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ import (

"connectrpc.com/connect"
"connectrpc.com/otelconnect"
"github.com/alphadose/haxmap"
"github.com/google/uuid"
"github.com/pkg/errors"
"go.opentelemetry.io/otel/codes"
Expand Down Expand Up @@ -211,7 +210,8 @@ type actorSystem struct {
registry types.Registry
reflection reflection

peersCache *haxmap.Map[string, []byte]
peersCacheMu *sync.RWMutex
peersCache map[string][]byte
clusterConfig *ClusterConfig
redistributionChan chan *cluster.Event
}
Expand Down Expand Up @@ -250,7 +250,8 @@ func NewActorSystem(name string, opts ...Option) (ActorSystem, error) {
clusterEventsChan: make(chan *cluster.Event, 1),
registry: types.NewRegistry(),
clusterSyncStopSig: make(chan types.Unit, 1),
peersCache: haxmap.New[string, []byte](100), // TODO need to check with memory footprint here since we change the map engine
peersCacheMu: &sync.RWMutex{},
peersCache: make(map[string][]byte),
}

system.started.Store(false)
Expand Down Expand Up @@ -1139,7 +1140,7 @@ func (x *actorSystem) enableClustering(ctx context.Context) error {

go x.clusterEventsLoop()
go x.clusterReplicationLoop()
go x.peersSyncLoop()
go x.peersStateLoop()
go x.redistributionLoop()

x.logger.Info("clustering enabled...:)")
Expand Down Expand Up @@ -1285,14 +1286,7 @@ func (x *actorSystem) clusterReplicationLoop() {
for actor := range x.actorsChan {
if x.InCluster() {
ctx := context.Background()
peerSync := &internalpb.PeersSync{
Host: x.remotingHost,
RemotingPort: x.remotingPort,
PeersPort: int32(x.clusterConfig.PeersPort()),
Actor: actor,
}

if err := x.cluster.PutPeerSync(ctx, peerSync); err != nil {
if err := x.cluster.PutActor(ctx, actor); err != nil {
x.logger.Panic(err.Error())
}
}
Expand All @@ -1317,8 +1311,8 @@ func (x *actorSystem) clusterEventsLoop() {
}
}

// peersSyncLoop fetches the cluster peers' PeerSync and update the node peersCache
func (x *actorSystem) peersSyncLoop() {
// peersStateLoop fetches the cluster peers' PeerState and update the node peersCache
func (x *actorSystem) peersStateLoop() {
x.logger.Info("peers state synchronization has started...")
ticker := time.NewTicker(10 * time.Second)
tickerStopSig := make(chan types.Unit, 1)
Expand All @@ -1327,7 +1321,7 @@ func (x *actorSystem) peersSyncLoop() {
select {
case <-ticker.C:
eg, ctx := errgroup.WithContext(context.Background())
workersCount := 5

peersChan := make(chan *cluster.Peer)

eg.Go(func() error {
Expand All @@ -1347,22 +1341,20 @@ func (x *actorSystem) peersSyncLoop() {
return nil
})

for i := 0; i < workersCount; i++ {
eg.Go(func() error {
for peer := range peersChan {
if err := x.processPeerSync(ctx, peer); err != nil {
return err
}
select {
case <-ctx.Done():
return ctx.Err()
default:
// pass
}
eg.Go(func() error {
for peer := range peersChan {
if err := x.processPeerState(ctx, peer); err != nil {
return err
}
return nil
})
}
select {
case <-ctx.Done():
return ctx.Err()
default:
// pass
}
}
return nil
})

if err := eg.Wait(); err != nil {
x.logger.Error(err)
Expand Down Expand Up @@ -1393,12 +1385,15 @@ func (x *actorSystem) redistributionLoop() {
}
}

// processPeerSync processes a given peer synchronization record.
func (x *actorSystem) processPeerSync(ctx context.Context, peer *cluster.Peer) error {
// processPeerState processes a given peer synchronization record.
func (x *actorSystem) processPeerState(ctx context.Context, peer *cluster.Peer) error {
x.peersCacheMu.Lock()
defer x.peersCacheMu.Unlock()

peerAddress := net.JoinHostPort(peer.Host, strconv.Itoa(peer.Port))

x.logger.Infof("processing peer sync:(%s)", peerAddress)
peerSync, err := x.cluster.GetPeerSync(ctx, peerAddress)
peerState, err := x.cluster.GetState(ctx, peerAddress)
if err != nil {
if errors.Is(err, cluster.ErrPeerSyncNotFound) {
return nil
Expand All @@ -1407,51 +1402,16 @@ func (x *actorSystem) processPeerSync(ctx context.Context, peer *cluster.Peer) e
return err
}

peerState := new(internalpb.PeerState)
if bytea, ok := x.peersCache.Get(peerAddress); ok {
// no need to handle the error here because we pushed the data into the cache
_ = proto.Unmarshal(bytea, peerState)
}

// first time entry
if proto.Equal(peerState, new(internalpb.PeerState)) {
peerState = &internalpb.PeerState{
Host: peerSync.GetHost(),
RemotingPort: peerSync.GetRemotingPort(),
PeersPort: peerSync.GetPeersPort(),
Actors: []*internalpb.WireActor{},
}
}

// avoid duplicate actors
actorsMap := make(map[string]*internalpb.WireActor)
for _, actor := range peerState.GetActors() {
path := actor.GetActorPath()
if path != "" {
actorsMap[path] = actor
}
}

path := peerSync.GetActor().GetActorPath()
if _, ok := actorsMap[path]; !ok {
actorsMap[path] = peerSync.GetActor()
}

// build the actors list
actors := make([]*internalpb.WireActor, 0, len(actorsMap))
for _, actor := range actorsMap {
actors = append(actors, actor)
}

// set the state actors list
peerState.Actors = actors

x.logger.Debugf("peer (%s) actors count (%d)", peerAddress, len(actors))
x.logger.Debugf("peer (%s) actors count (%d)", peerAddress, len(peerState.GetActors()))

// no need to handle the error
bytea, _ := proto.Marshal(peerState)
x.peersCache.Set(peerAddress, bytea)
bytea, err := proto.Marshal(peerState)
if err != nil {
x.logger.Error(err)
return err
}

x.peersCache[peerAddress] = bytea
x.logger.Infof("peer sync(%s) successfully processed", peerAddress)
return nil
}
54 changes: 30 additions & 24 deletions actors/pid_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ package actors

import (
"reflect"

"github.com/alphadose/haxmap"
"sync"

"github.com/tochemey/goakt/v2/internal/types"
"go.uber.org/atomic"
)

type prop struct {
Expand All @@ -38,23 +38,28 @@ type prop struct {
}

type pidMap struct {
mappings *haxmap.Map[string, *prop]
mu *sync.RWMutex
size atomic.Int32
mappings map[string]*prop
}

func newPIDMap(cap int) *pidMap {
return &pidMap{
mappings: haxmap.New[string, *prop](uintptr(cap)),
mappings: make(map[string]*prop, cap),
mu: &sync.RWMutex{},
}
}

// len returns the number of PIDs
func (m *pidMap) len() int {
return int(m.mappings.Len())
return int(m.size.Load())
}

// get retrieves a pid by its address
func (m *pidMap) get(path *Path) (pid PID, ok bool) {
prop, ok := m.mappings.Get(path.String())
m.mu.RLock()
defer m.mu.RUnlock()
prop, ok := m.mappings[path.String()]
if prop != nil {
pid = prop.pid
ok = true
Expand All @@ -64,50 +69,51 @@ func (m *pidMap) get(path *Path) (pid PID, ok bool) {

// set sets a pid in the map
func (m *pidMap) set(pid PID) {
m.mu.Lock()
defer m.mu.Unlock()
if pid != nil {
var rtype reflect.Type
handle := pid.ActorHandle()
if handle != nil {
rtype = types.Of(handle)
}

m.mappings.Set(pid.ActorPath().String(), &prop{
m.mappings[pid.ActorPath().String()] = &prop{
pid: pid,
rtype: rtype,
})
}
m.size.Add(1)
}
}

// delete removes a pid from the map
func (m *pidMap) delete(addr *Path) {
m.mappings.Del(addr.String())
m.mu.Lock()
delete(m.mappings, addr.String())
m.size.Add(-1)
m.mu.Unlock()
}

// pids returns all actors as a slice
func (m *pidMap) pids() []PID {
m.mu.Lock()
var out []PID
m.mappings.ForEach(func(_ string, prop *prop) bool {
if len(out) == int(m.mappings.Len()) {
return false
}
for _, prop := range m.mappings {
out = append(out, prop.pid)
return true
})
}
m.mu.Unlock()
return out
}

func (m *pidMap) props() map[string]*prop {
out := make(map[string]*prop, m.mappings.Len())
m.mappings.ForEach(func(key string, prop *prop) bool {
if len(out) == int(m.mappings.Len()) {
return false
}
out[key] = prop
return true
})
m.mu.Lock()
out := m.mappings
m.mu.Unlock()
return out
}

func (m *pidMap) close() {
m.mappings.Clear()
m.mu.Lock()
m.mappings = make(map[string]*prop)
m.mu.Unlock()
}
11 changes: 8 additions & 3 deletions actors/redistribution.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,9 @@ func (x *actorSystem) redistribute(ctx context.Context, event *cluster.Event) er
return err
}

bytea, ok := x.peersCache.Get(nodeLeft.GetAddress())
x.peersCacheMu.RLock()
bytea, ok := x.peersCache[nodeLeft.GetAddress()]
x.peersCacheMu.RUnlock()
if !ok {
return ErrPeerNotFound
}
Expand Down Expand Up @@ -98,7 +100,6 @@ func (x *actorSystem) redistribute(ctx context.Context, event *cluster.Event) er
}

eg, ctx := errgroup.WithContext(ctx)
eg.SetLimit(2)

eg.Go(func() error {
for _, actor := range leaderActors {
Expand Down Expand Up @@ -128,7 +129,11 @@ func (x *actorSystem) redistribute(ctx context.Context, event *cluster.Event) er
for i := 1; i < len(chunks); i++ {
actors := chunks[i]
peer := peers[i-1]
bytea, _ := x.peersCache.Get(net.JoinHostPort(peer.Host, strconv.Itoa(peer.Port)))

x.peersCacheMu.RLock()
bytea := x.peersCache[net.JoinHostPort(peer.Host, strconv.Itoa(peer.Port))]
x.peersCacheMu.RUnlock()

state := new(internalpb.PeerState)
_ = proto.Unmarshal(bytea, state)

Expand Down
2 changes: 0 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ go 1.21
require (
connectrpc.com/connect v1.16.2
connectrpc.com/otelconnect v0.7.0
github.com/alphadose/haxmap v1.4.0
github.com/buraksezer/olric v0.5.6-0.20240510193155-81e12546eb39
github.com/caarlos0/env/v11 v11.0.1
github.com/deckarep/golang-set/v2 v2.6.0
Expand Down Expand Up @@ -103,7 +102,6 @@ require (
github.com/vmihailenco/msgpack/v5 v5.4.1 // indirect
github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect
golang.org/x/crypto v0.23.0 // indirect
golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842 // indirect
golang.org/x/mod v0.17.0 // indirect
golang.org/x/oauth2 v0.20.0 // indirect
golang.org/x/sys v0.20.0 // indirect
Expand Down
4 changes: 0 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@ github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuy
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/alphadose/haxmap v1.4.0 h1:1yn+oGzy2THJj1DMuJBzRanE3sMnDAjJVbU0L31Jp3w=
github.com/alphadose/haxmap v1.4.0/go.mod h1:rjHw1IAqbxm0S3U5tD16GoKsiAd8FWx5BJ2IYqXwgmM=
github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY=
github.com/armon/go-metrics v0.4.1 h1:hR91U9KYmb6bLBYLQjyM+3j+rcd/UhE+G78SFnF8gJA=
github.com/armon/go-metrics v0.4.1/go.mod h1:E6amYzXo6aW1tqzoZGT755KkbgrJsSdpwZ+3JqfkOG4=
Expand Down Expand Up @@ -325,8 +323,6 @@ golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPh
golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.23.0 h1:dIJU/v2J8Mdglj/8rJ6UUOM3Zc9zLZxVZwwxMooUSAI=
golang.org/x/crypto v0.23.0/go.mod h1:CKFgDieR+mRhux2Lsu27y0fO304Db0wZe70UKqHu0v8=
golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842 h1:vr/HnozRka3pE4EsMEg1lgkXJkTFJCVUX+S/ZT6wYzM=
golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842/go.mod h1:XtvwrStGgqGPLc4cjQfWqZHG1YFdYs6swckp8vpsjnc=
golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg=
golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
Expand Down
Loading

0 comments on commit 2a88132

Please sign in to comment.