Skip to content

Commit

Permalink
Implement scaling latency metrics through logical clock
Browse files Browse the repository at this point in the history
Signed-off-by: Oleg Vasilev <[email protected]>
  • Loading branch information
Omrigan committed Jun 20, 2024
1 parent 64761aa commit adf115e
Show file tree
Hide file tree
Showing 15 changed files with 371 additions and 55 deletions.
38 changes: 38 additions & 0 deletions neonvm/apis/neonvm/v1/virtualmachine_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,12 +188,48 @@ type Guest struct {
// +optional
Ports []Port `json:"ports,omitempty"`

// Logical clock value corresponding to the desired resources of the VM.
// +optional
DesiredClock *LogicalTime `json:"desiredClock,omitempty"`

// Additional settings for the VM.
// Cannot be updated.
// +optional
Settings *GuestSettings `json:"settings,omitempty"`
}

// LogicalTime allows to track progress of changes to a VM.
type LogicalTime struct {
Value int64 `json:"value"`
UpdatedAt metav1.Time `json:"updatedAt"`
}

func LatestClock(a, b *LogicalTime) *LogicalTime {
if a == nil {
return b
}
if b == nil {
return a
}
if a.UpdatedAt.After(b.UpdatedAt.Time) {
return a
}
return b
}

func EarliestLogicalTime(ts ...*LogicalTime) *LogicalTime {
var earliest *LogicalTime
for _, t := range ts {
if t == nil {
return nil
}
if earliest == nil || t.UpdatedAt.Before(&earliest.UpdatedAt) {
earliest = t
}
}
return earliest
}

type GuestSettings struct {
// Individual lines to add to a sysctl.conf file. See sysctl.conf(5) for more
// +optional
Expand Down Expand Up @@ -494,6 +530,8 @@ type VirtualMachineStatus struct {
MemorySize *resource.Quantity `json:"memorySize,omitempty"`
// +optional
SSHSecretName string `json:"sshSecretName,omitempty"`
// +optional
CurrentClock *LogicalTime `json:"currentClock,omitempty"`
}

type VmPhase string
Expand Down
23 changes: 14 additions & 9 deletions pkg/agent/core/action.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"go.uber.org/zap/zapcore"

vmv1 "github.com/neondatabase/autoscaling/neonvm/apis/neonvm/v1"
"github.com/neondatabase/autoscaling/pkg/api"
)

Expand All @@ -21,24 +22,28 @@ type ActionWait struct {
}

type ActionPluginRequest struct {
LastPermit *api.Resources `json:"current"`
Target api.Resources `json:"target"`
Metrics *api.Metrics `json:"metrics"`
LastPermit *api.Resources `json:"current"`
Target api.Resources `json:"target"`
Metrics *api.Metrics `json:"metrics"`
DesiredLogicalTime *vmv1.LogicalTime `json:"desiredLogicalTime"`
}

type ActionNeonVMRequest struct {
Current api.Resources `json:"current"`
Target api.Resources `json:"target"`
Current api.Resources `json:"current"`
Target api.Resources `json:"target"`
DesiredLogicalTime *vmv1.LogicalTime `json:"desiredLogicalTime"`
}

type ActionMonitorDownscale struct {
Current api.Resources `json:"current"`
Target api.Resources `json:"target"`
Current api.Resources `json:"current"`
Target api.Resources `json:"target"`
DesiredLogicalTime *vmv1.LogicalTime `json:"desiredLogicalTime"`
}

type ActionMonitorUpscale struct {
Current api.Resources `json:"current"`
Target api.Resources `json:"target"`
Current api.Resources `json:"current"`
Target api.Resources `json:"target"`
DesiredLogicalTime *vmv1.LogicalTime `json:"desiredLogicalTime"`
}

func addObjectPtr[T zapcore.ObjectMarshaler](enc zapcore.ObjectEncoder, key string, value *T) error {
Expand Down
26 changes: 15 additions & 11 deletions pkg/agent/core/dumpstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"encoding/json"
"time"

vmv1 "github.com/neondatabase/autoscaling/neonvm/apis/neonvm/v1"
"github.com/neondatabase/autoscaling/pkg/api"
)

Expand Down Expand Up @@ -33,23 +34,25 @@ func (d StateDump) MarshalJSON() ([]byte, error) {
func (s *State) Dump() StateDump {
return StateDump{
internal: state{
Debug: s.internal.Debug,
Config: s.internal.Config,
VM: s.internal.VM,
Plugin: s.internal.Plugin.deepCopy(),
Monitor: s.internal.Monitor.deepCopy(),
NeonVM: s.internal.NeonVM.deepCopy(),
Metrics: shallowCopy[SystemMetrics](s.internal.Metrics),
Debug: s.internal.Debug,
Config: s.internal.Config,
VM: s.internal.VM,
Plugin: s.internal.Plugin.deepCopy(),
Monitor: s.internal.Monitor.deepCopy(),
NeonVM: s.internal.NeonVM.deepCopy(),
Metrics: shallowCopy[SystemMetrics](s.internal.Metrics),
ClockSource: s.internal.ClockSource,
},
}
}

func (s *pluginState) deepCopy() pluginState {
return pluginState{
OngoingRequest: s.OngoingRequest,
LastRequest: shallowCopy[pluginRequested](s.LastRequest),
LastFailureAt: shallowCopy[time.Time](s.LastFailureAt),
Permit: shallowCopy[api.Resources](s.Permit),
OngoingRequest: s.OngoingRequest,
LastRequest: shallowCopy[pluginRequested](s.LastRequest),
LastFailureAt: shallowCopy[time.Time](s.LastFailureAt),
Permit: shallowCopy[api.Resources](s.Permit),
CurrentLogicalTime: shallowCopy[vmv1.LogicalTime](s.CurrentLogicalTime),
}
}

Expand All @@ -61,6 +64,7 @@ func (s *monitorState) deepCopy() monitorState {
Approved: shallowCopy[api.Resources](s.Approved),
DownscaleFailureAt: shallowCopy[time.Time](s.DownscaleFailureAt),
UpscaleFailureAt: shallowCopy[time.Time](s.UpscaleFailureAt),
CurrentLogicalTime: shallowCopy[vmv1.LogicalTime](s.CurrentLogicalTime),
}
}

Expand Down
58 changes: 58 additions & 0 deletions pkg/agent/core/logicclock/logicclock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package logicclock

import (
"errors"
"time"

v1 "k8s.io/apimachinery/pkg/apis/meta/v1"

vmv1 "github.com/neondatabase/autoscaling/neonvm/apis/neonvm/v1"
)

type ClockSource struct {
cb func(time.Duration)
ts []time.Time
offset int64
Now func() v1.Time
}

func NewClockSource(cb func(time.Duration)) *ClockSource {
return &ClockSource{
cb: cb,
ts: nil,
offset: 0,
Now: v1.Now,
}
}

func (c *ClockSource) Next() *vmv1.LogicalTime {
ret := vmv1.LogicalTime{
Value: c.offset + int64(len(c.ts)),
UpdatedAt: c.Now(),
}
c.ts = append(c.ts, ret.UpdatedAt.Time)
return &ret
}

func (c *ClockSource) Observe(clock *vmv1.LogicalTime) error {
if clock == nil {
return nil
}
if clock.Value < c.offset {
return nil
}

idx := clock.Value - c.offset
if idx > int64(len(c.ts)) {
return errors.New("clock value is in the future")
}

diff := clock.UpdatedAt.Time.Sub(c.ts[idx])

c.cb(diff)

c.offset = clock.Value + 1
c.ts = c.ts[idx+1:]

return nil
}
125 changes: 125 additions & 0 deletions pkg/agent/core/logicclock/logicclock_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package logicclock_test

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

v1 "k8s.io/apimachinery/pkg/apis/meta/v1"

vmv1 "github.com/neondatabase/autoscaling/neonvm/apis/neonvm/v1"
"github.com/neondatabase/autoscaling/pkg/agent/core/logicclock"
)

type testClockMetric struct {
*logicclock.ClockSource
t *testing.T
now v1.Time
result *time.Duration
}

func (tcm *testClockMetric) advance(d time.Duration) {
tcm.now = v1.NewTime(tcm.now.Add(d))
}

func (tcm *testClockMetric) assertResult(d time.Duration) {
require.NotNil(tcm.t, tcm.result)
assert.Equal(tcm.t, d, *tcm.result)
tcm.result = nil
}

func newTestClockMetric(t *testing.T) *testClockMetric {
tcm := &testClockMetric{
ClockSource: nil,
t: t,
now: v1.NewTime(time.Now()),
result: nil,
}

cb := func(d time.Duration) {
tcm.result = &d
}
tcm.ClockSource = logicclock.NewClockSource(cb)
tcm.ClockSource.Now = func() v1.Time {
return tcm.now
}

return tcm
}

func TestClockMetric(t *testing.T) {
tcm := newTestClockMetric(t)

// Generate new clock
cl := tcm.Next()
assert.Equal(t, int64(0), cl.Value)

// Observe it coming back in 5 seconds
tcm.advance(5 * time.Second)
err := tcm.Observe(&vmv1.LogicalTime{
Value: 0,
UpdatedAt: tcm.now,
})
assert.NoError(t, err)
tcm.assertResult(5 * time.Second)
}

func TestClockMetricSkip(t *testing.T) {
tcm := newTestClockMetric(t)

// Generate new clock
cl := tcm.Next()
assert.Equal(t, int64(0), cl.Value)

// Generate another one
tcm.advance(5 * time.Second)
cl = tcm.Next()
assert.Equal(t, int64(1), cl.Value)

// Observe the first one
tcm.advance(5 * time.Second)
err := tcm.Observe(&vmv1.LogicalTime{
Value: 0,
UpdatedAt: tcm.now,
})
assert.NoError(t, err)
tcm.assertResult(10 * time.Second)

// Observe the second one
tcm.advance(2 * time.Second)
err = tcm.Observe(&vmv1.LogicalTime{
Value: 1,
UpdatedAt: tcm.now,
})
assert.NoError(t, err)
tcm.assertResult(7 * time.Second)
}

func TestClockMetricStale(t *testing.T) {
tcm := newTestClockMetric(t)

// Generate new clock
cl := tcm.Next()
assert.Equal(t, int64(0), cl.Value)

// Observe it coming back in 5 seconds
tcm.advance(5 * time.Second)
err := tcm.Observe(&vmv1.LogicalTime{
Value: 0,
UpdatedAt: tcm.now,
})
assert.NoError(t, err)
tcm.assertResult(5 * time.Second)

// Observe it coming back again
tcm.advance(5 * time.Second)
err = tcm.Observe(&vmv1.LogicalTime{
Value: 0,
UpdatedAt: tcm.now,
})
// No error, but no result either
assert.NoError(t, err)
assert.Nil(t, tcm.result)
}
Loading

0 comments on commit adf115e

Please sign in to comment.