Skip to content

Commit

Permalink
Add load balancer and comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Azer0s committed Jan 19, 2021
1 parent 0e1d95f commit babe0cd
Show file tree
Hide file tree
Showing 4 changed files with 193 additions and 1 deletion.
33 changes: 33 additions & 0 deletions component/component_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,3 +269,36 @@ func TestLink(t *testing.T) {
wg.Wait()
quacktors.Run()
}

func TestLoadBalancer(t *testing.T) {
count = 0
usage := 1

lb := LoadBalancer(10, &testActor{id: 1}, func() uint16 {
return uint16(usage)
})

//load balancer should spin up 1 testActor
lbPid := quacktors.SpawnStateful(lb)
context := quacktors.RootContext()

usage = 11

//load balancer should spin up a second testActor
context.Send(lbPid, quacktors.GenericMessage{})

//load balancer should repair itself
context.Send(lbPid, quacktors.KillMessage{})
<-time.After(1 * time.Second)
assert.Equal(t, 3, count)

usage = 21

//load balancer should spin up a third testActor
context.Send(lbPid, quacktors.GenericMessage{})
<-time.After(1 * time.Second)
assert.Equal(t, 4, count)

context.Send(lbPid, quacktors.PoisonPill{})
quacktors.Run()
}
6 changes: 6 additions & 0 deletions component/dynamic_supervisor.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,12 @@ func (d *dynamicSupervisorComponent) Pids() []*quacktors.Pid {
return d.actorPids
}

//DynamicSupervisor returns a dynamic supervisor component.
//It is functionally the same as the Supervisor with the key
//difference being that child actors of the dynamic supervisor
//don't use named actors but can be addressed via PIDs
//(internally it still uses named actors but automatically creates
//and manages relays for each child).
func DynamicSupervisor(strategy strategy, actors []quacktors.Actor) *dynamicSupervisorComponent {
d := dynamicSupervisorComponent{}

Expand Down
118 changes: 117 additions & 1 deletion component/load_balancer.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,121 @@
package component

func LoadBalancer() {
import (
"github.com/Azer0s/quacktors"
"math"
)

type pidWithUsage struct {
pid *quacktors.Pid
usage uint64
}

type loadBalancerComponent struct {
threshold uint16
actor quacktors.Actor
pids []*pidWithUsage
usageFunction func() uint16
}

func (l *loadBalancerComponent) Init(ctx *quacktors.Context) {
l.spawnOrDestroy(ctx)

//if the load balancer is killed forcefully, kill all the
//spawned actors
ctx.Defer(func() {
for _, p := range l.pids {
ctx.Kill(p.pid)
}
})
}

func (l *loadBalancerComponent) spawnOrDestroy(ctx *quacktors.Context) {
usage := l.usageFunction()
currentPids := len(l.pids)
requiredPids := int(math.Max(1, math.Round((float64(usage)-(float64(l.threshold)/2))/float64(l.threshold))+1))

if currentPids < requiredPids {
delta := requiredPids - currentPids
//spawn delta amount of pids

for i := 0; i < delta; i++ {
p := &pidWithUsage{
pid: quacktors.SpawnStateful(l.actor),
usage: 0,
}

ctx.Monitor(p.pid)

l.pids = append(l.pids, p)
}
} else if currentPids > requiredPids && requiredPids != 0 {
delta := currentPids - requiredPids
//kill delta amount of pids

for i := 0; i < delta; i++ {
p := l.pids[len(l.pids)-1]
ctx.Send(p.pid, quacktors.PoisonPill{})
l.pids = l.pids[:len(l.pids)-1]
}
} else {
//currentPids < requiredPids is the optimum
}
}

func (l *loadBalancerComponent) Run(ctx *quacktors.Context, message quacktors.Message) {
if d, ok := message.(quacktors.DownMessage); ok {
for i, p := range l.pids {
if d.Who.Is(p.pid) {
copy(l.pids[i:], l.pids[i+1:])
l.pids = l.pids[:len(l.pids)-1]

l.spawnOrDestroy(ctx)

return
}
}
}

l.spawnOrDestroy(ctx)
var sendTo = &pidWithUsage{
pid: nil,
usage: math.MaxUint64,
}

for _, pid := range l.pids {
if pid.usage < sendTo.usage {
sendTo = pid
}
}

ctx.Send(sendTo.pid, message)

sendTo.usage++
}

//LoadBalancer creates a load balancer component from
//the provided parameters. The load balancer scales an
//actor according to usage determined by the usage function
//and the scaling threshold. If one actor in the pool goes
//down, it is automatically restarted if needed. At least
//one instance of the actor has to be always running. The
//scaling of the actor is calculated by a threshold function.
//If the load balancer is killed, it takes down all of the
//actors in its pool so to avoid actor leaks.
//
//Threshold function
//
//The threshold function is defined like so:
//
//f(u,t) = max(1, (\lfloor {\frac{u - (t/2)}_{t}} \rfloor) + 1)
//
//Where `u` is the usage and `t` is the threshold and the
//function domain is u >= 0 and t >= 1.
func LoadBalancer(threshold uint16, actor quacktors.Actor, usageFunction func() uint16) quacktors.Actor {
return &loadBalancerComponent{
threshold: threshold,
actor: actor,
usageFunction: usageFunction,
pids: make([]*pidWithUsage, 0),
}
}
37 changes: 37 additions & 0 deletions context.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,14 @@ import (
"time"
)

//The Context struct defines the actor context and
//provides ways for an actor to interact with the
//rest of the system. Actors are provided a
//Context instance on Init and Run. Actors should
//only use the provided context to interact with
//other actors as the Context also stores things
//like current Span or a pointer to the acto
//specific send mutex.
type Context struct {
span opentracing.Span
traceFork func(ctx opentracing.SpanContext) opentracing.SpanReference
Expand Down Expand Up @@ -35,18 +43,29 @@ func (c *Context) TraceFork(traceFork func(ctx opentracing.SpanContext) opentrac
c.traceFork = traceFork
}

//Span returns the current opentracing.Span. This will
//always be nil unless Trace was called with a service
//name in the Init function of the actor.
func (c *Context) Span() opentracing.Span {
return c.span
}

//Defer defers an action to after an actor has gone down.
//The same general advice applies to the Defer function
//as to the built-in Go defer (e.g. avoid defers in
//for loops, no nil function defers, etc). Deferred
//actor functions should not panic (because nothing will
//happen if they do, quacktors just recovers the panic).
func (c *Context) Defer(action func()) {
c.deferred = append(c.deferred, action)
}

//Self returns the PID of the calling actor.
func (c *Context) Self() *Pid {
return c.self
}

//Send sends a Message to another actor by its PID.
func (c *Context) Send(to *Pid, message Message) {
t := reflect.ValueOf(message).Type().Kind()

Expand All @@ -65,6 +84,11 @@ func (c *Context) Send(to *Pid, message Message) {
doSend(to, message, spanContext)
}

//SendAfter schedules a Message to be sent to another
//actor by its PID after a timer has finished. SendAfter
//also returns an Abortable so the scheduled Send can
//be stopped. If the sending actor goes down before the
//timer has completed, the Send operation is still executed.
func (c *Context) SendAfter(to *Pid, message Message, duration time.Duration) Abortable {
quitChan := make(chan bool)

Expand All @@ -83,6 +107,7 @@ func (c *Context) SendAfter(to *Pid, message Message, duration time.Duration) Ab
return &sendAfterAbortable{quitChan: quitChan}
}

//Kill kills another actor by its PID.
func (c *Context) Kill(pid *Pid) {
go func() {
if pid.MachineId != machineId {
Expand All @@ -108,10 +133,17 @@ func (c *Context) Kill(pid *Pid) {
}()
}

//Quit kills the calling actor.
func (c *Context) Quit() {
panic(quitAction{})
}

//MonitorMachine starts a monitor on a connection to
//a remote machine. As soon as the remote disconnects,
//a DisconnectMessage is sent to the monitoring actor.
//MonitorMachine also returns an Abortable so the
//monitor can be canceled (i.e. no DisconnectMessage
//will be sent out if the monitored actor goes down).
func (c *Context) MonitorMachine(machine *Machine) Abortable {
machine.monitorsMu.Lock()
defer machine.monitorsMu.Unlock()
Expand Down Expand Up @@ -139,6 +171,11 @@ func (c *Context) MonitorMachine(machine *Machine) Abortable {
}
}

//Monitor starts a monitor on another actor. As soon as
//the actor goes down, a DownMessage is sent to the
//monitoring actor. Monitor also returns an Abortable
//so the monitor can be canceled (i.e. no DownMessage
//will be sent out if the monitored actor goes down).
func (c *Context) Monitor(pid *Pid) Abortable {
errorChan := make(chan bool)
okChan := make(chan bool)
Expand Down

0 comments on commit babe0cd

Please sign in to comment.