Skip to content

Commit

Permalink
Fix/scaling issues (#22)
Browse files Browse the repository at this point in the history
* chg: fix: instance spec parsing

* new: test: add unit tests for instance spec parsing

* chg: fix: bugs with ssh during scaling

* chg: feat: add protection to wait for EBS to unnatach
  • Loading branch information
felipemarinho97 authored Sep 19, 2023
1 parent 2023cb6 commit 6f0a6de
Show file tree
Hide file tree
Showing 8 changed files with 177 additions and 24 deletions.
6 changes: 3 additions & 3 deletions cli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func GetCLI() *cli.App {
Name: "ami",
Aliases: []string{"i"},
Required: true,
Usage: "Amazon Machine Image to use",
Usage: "Amazon Machine Image to use. Example: \"id:ami-123456789\" or \"arch:x86_64,name:my-ami*,owner:123456789012\"",
},
&cli.StringFlag{
Name: "instance-profile-arn",
Expand All @@ -161,8 +161,8 @@ func GetCLI() *cli.App {
&cli.StringFlag{
Name: "prefered-instance-type",
Aliases: []string{"t"},
Value: "t2.micro",
Usage: "Prefered instance type to use, this will optimize the price for this type",
Value: "mem:1,cpus:2",
Usage: "Prefered instance type to use, this will optimize the price for this type. Valid examples: \"mem:1.5,cpus:1\" or \"type:t2.micro\"",
},
&cli.StringFlag{
Name: "custom-host-ami",
Expand Down
10 changes: 5 additions & 5 deletions cli/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func ParseInstanceSpec(spec string) (InstanceSpec, error) {
return InstanceSpec{}, nil
}
// check if it's a valid spec with regex
re := regexp.MustCompile(`^(mem:(\d+\.\d+)|cpus:(\d+)|type:([\w\.]+)|,|[\w\.]+)+$`)
re := regexp.MustCompile(`^((mem:\s*(\d+(\.\d+)?),?\s*|cpus:\s*(\d+),?\s*|type:\s*([\w\.]+),?\s*){0,3}|,|[\w\.]+)+$`)
if !re.MatchString(spec) {
fmt.Println("Invalid instance spec:", spec)
return InstanceSpec{}, fmt.Errorf("invalid instance spec: %s", spec)
Expand All @@ -30,15 +30,15 @@ func ParseInstanceSpec(spec string) (InstanceSpec, error) {
for _, part := range parts {
keyValue := strings.Split(part, ":")

switch keyValue[0] {
switch strings.TrimSpace(keyValue[0]) {
case "mem":
mem, _ := strconv.ParseFloat(keyValue[1], 64)
mem, _ := strconv.ParseFloat(strings.TrimSpace(keyValue[1]), 64)
instanceSpec.MinMemory = int32(float32(mem) * 1024)
case "cpus":
cpus, _ := strconv.Atoi(keyValue[1])
cpus, _ := strconv.Atoi(strings.TrimSpace(keyValue[1]))
instanceSpec.MinCPU = int32(cpus)
case "type":
instanceSpec.InstanceType = keyValue[1]
instanceSpec.InstanceType = strings.TrimSpace(keyValue[1])
default:
return InstanceSpec{
InstanceType: keyValue[0],
Expand Down
95 changes: 95 additions & 0 deletions cli/util/util_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package util

import (
"reflect"
"testing"
)

func TestParseInstanceSpec(t *testing.T) {
type args struct {
spec string
}
tests := []struct {
name string
args args
want InstanceSpec
wantErr bool
}{
{
name: "the spec is just the instance type",
args: args{
spec: "t2.micro",
},
want: InstanceSpec{
InstanceType: "t2.micro",
},
wantErr: false,
},
{
name: "the spec is just the instance type on filter format",
args: args{
spec: "type:t2.micro",
},
want: InstanceSpec{
InstanceType: "t2.micro",
},
wantErr: false,
},
{
name: "the spec is just the instance type on filter format with spaces",
args: args{
spec: "type: t2.micro",
},
want: InstanceSpec{
InstanceType: "t2.micro",
},
wantErr: false,
},
{
name: "the spec is just the instance type on filter format with spaces and other filters",
args: args{
spec: "type: t2.micro, mem: 0.5, cpus: 1",
},
want: InstanceSpec{
InstanceType: "t2.micro",
MinMemory: 512,
MinCPU: 1,
},
wantErr: false,
},
{
name: "the spec is the cpu and memory",
args: args{
spec: "mem:0.5,cpus:1",
},
want: InstanceSpec{
MinMemory: 512,
MinCPU: 1,
},
wantErr: false,
},
{
name: "the spec is the cpu and memory is an integer",
args: args{
spec: "mem:1,cpus:1",
},
want: InstanceSpec{
MinMemory: 1024,
MinCPU: 1,
},
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := ParseInstanceSpec(tt.args.spec)
if (err != nil) != tt.wantErr {
t.Errorf("ParseInstanceSpec() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("ParseInstanceSpec() = %v, want %v", got, tt.want)
}
})
}
}
18 changes: 13 additions & 5 deletions core/cfg_spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,24 +88,32 @@ func (h *Handler) EditSpec(ctx context.Context, opts EditSpecOptions) (EditOutpu
}

// power off devspace
timeout := 60 * time.Second
sshClient, err := ssh.NewSSHClient(*currentInstance.PublicIpAddress, 22, "ec2-user", string(identityKey))
if err != nil {
return EditOutput{}, err
sshClient, err = ssh.NewSSHClient(*currentInstance.PublicIpAddress, 22, "root", string(identityKey))
if err != nil {
return EditOutput{}, err
}
}
_, err = sshClient.Run("sudo machinectl terminate devspace")
_, err = sshClient.Run("sudo machinectl terminate devspace", timeout)
if err != nil {
return EditOutput{}, err
log.Warn("Error powering off devspace: ", err)
}
_, err = sshClient.Run("sudo umount /dev/sdf1")
log.Debug("Powered off devspace")
_, err = sshClient.Run("sudo umount /dev/sdf1", timeout)
if err != nil {
return EditOutput{}, err
log.Warn("Error unmounting EBS volume: ", err)
} else {
log.Debug("Unmounted EBS volume")
}

// detach ebs volume
_, err = helpers.DetachEBSVolume(ctx, client, volumeID)
if err != nil {
return EditOutput{}, err
}
log.Debug("Detached EBS volume with id: ", volumeID)

// wait until ebs volume is detached
err = helpers.WaitUntilEBSUnattached(ctx, client, volumeID)
Expand Down
6 changes: 6 additions & 0 deletions core/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,12 @@ touch $MOUNTPOINT/root/.ssh/authorized_keys
TOKEN=$(curl -X PUT "http://169.254.169.254/latest/api/token" -H "X-aws-ec2-metadata-token-ttl-seconds: 21600")
curl http://169.254.169.254/latest/meta-data/public-keys/0/openssh-key -H "X-aws-ec2-metadata-token: $TOKEN" > $MOUNTPOINT/root/.ssh/authorized_keys
## for each user, add the public key to authorized_keys
for user in $(ls $MOUNTPOINT/home); do
mkdir -p $MOUNTPOINT/home/$user/.ssh/
cat $MOUNTPOINT/root/.ssh/authorized_keys > $MOUNTPOINT/home/$user/.ssh/authorized_keys
done
## boot the chroot machine
export SYSTEMD_SECCOMP=0
systemd-nspawn --boot --quiet --machine=devspace --capability=all -D $MOUNTPOINT/
Expand Down
1 change: 1 addition & 0 deletions core/helpers/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ func WaitUntilReachable(host string, port int) error {
conn.Close()
return nil
}
time.Sleep(1 * time.Second)
}
}

Expand Down
11 changes: 10 additions & 1 deletion core/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,16 @@ func (h *Handler) Start(ctx context.Context, startOptions StartOptions) (StartOu
return StartOutput{}, err
}

// get volume id from template tags
volumeID := util.GetTag(template.Tags, "dev-spaces:volume-id")

// wait until ebs volume is detached
log.Info("Waiting for EBS volume to be available...")
err = helpers.WaitUntilEBSUnattached(ctx, client, volumeID)
if err != nil {
return StartOutput{}, err
}

out, err := helpers.CreateSpotRequest(ctx, client, tName, tVersion, cpusSpec, minMemory, maxPrice, template, timeout)
if err != nil {
return StartOutput{}, err
Expand All @@ -78,7 +88,6 @@ func (h *Handler) Start(ctx context.Context, startOptions StartOptions) (StartOu
ip := *instance.PublicIpAddress

// attach ebs volume
volumeID := util.GetTag(template.Tags, "dev-spaces:volume-id")
err = helpers.AttachEBSVolume(ctx, client, *instance.InstanceId, volumeID)
if err != nil {
return StartOutput{}, err
Expand Down
54 changes: 44 additions & 10 deletions core/util/ssh/ssh.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package ssh

import (
"fmt"
"net"
"time"

"golang.org/x/crypto/ssh"
)
Expand All @@ -26,8 +28,24 @@ func NewSSHClient(host string, port int, user string, identityKey string) (*SSHC
},
HostKeyCallback: ssh.InsecureIgnoreHostKey(),
}

agentDialer := &net.Dialer{
Timeout: 60 * time.Second,
KeepAlive: 5 * time.Second,
}
conn, err := agentDialer.Dial("tcp", fmt.Sprintf("%s:%d", host, port))
if err != nil {
conn.Close()
return nil, err
}
err = conn.SetDeadline(time.Now().Add(60 * time.Second))
if err != nil {
conn.Close()
return nil, err
}

// connect ot ssh server
conn, err := ssh.Dial("tcp", fmt.Sprintf("%s:%d", host, port), config)
clientConn, channelCh, reqCh, err := ssh.NewClientConn(conn, "tcp", config)
if err != nil {
return nil, err
}
Expand All @@ -37,18 +55,34 @@ func NewSSHClient(host string, port int, user string, identityKey string) (*SSHC
port: port,
user: user,
identityKey: identityKey,
conn: conn,
conn: ssh.NewClient(clientConn, channelCh, reqCh),
}, nil
}

func (c *SSHClient) Run(cmd string) (string, error) {
session, err := c.conn.NewSession()
if err != nil {
return "", err
}
out, err := session.Output(cmd)
if err != nil {
func (c *SSHClient) Run(cmd string, timeout time.Duration) (string, error) {
outCh := make(chan string)
errCh := make(chan error)
go func() {
session, err := c.conn.NewSession()
if err != nil {
errCh <- err
return
}
out, err := session.Output(cmd)
if err != nil {
errCh <- err
return
}

outCh <- string(out)
}()

select {
case <-time.After(timeout):
return "", fmt.Errorf("Timeout after %s", timeout)
case out := <-outCh:
return out, nil
case err := <-errCh:
return "", err
}
return string(out), nil
}

0 comments on commit 6f0a6de

Please sign in to comment.