Skip to content

Commit

Permalink
[APP-4166] [APP-4096] Add Syscfg Subsystem (#12)
Browse files Browse the repository at this point in the history
  • Loading branch information
Otterverse authored Mar 8, 2024
1 parent 12c83a7 commit 0b50eea
Show file tree
Hide file tree
Showing 6 changed files with 328 additions and 255 deletions.
2 changes: 2 additions & 0 deletions cmd/viam-agent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/pkg/errors"
"github.com/viamrobotics/agent"
"github.com/viamrobotics/agent/subsystems/provisioning"
"github.com/viamrobotics/agent/subsystems/syscfg"
"github.com/viamrobotics/agent/subsystems/viamagent"
"github.com/viamrobotics/agent/subsystems/viamserver"
"go.viam.com/utils"
Expand Down Expand Up @@ -67,6 +68,7 @@ func main() {
if opts.Debug {
globalLogger = golog.NewDebugLogger("viam-agent")
provisioning.Debug = true
syscfg.Debug = true
}

// need to be root to go any further than this
Expand Down
47 changes: 31 additions & 16 deletions preinstall.sh
Original file line number Diff line number Diff line change
Expand Up @@ -31,20 +31,28 @@ EOF
)

find_mountpoints_linux() {
if [ "$MOUNTS" == "" ]; then
if [ "$MOUNTS" = "" ]; then
MOUNTS=$(findmnt -o TARGET -l | grep -v TARGET | grep -vE '^/$')
fi
}

find_mountpoints_macos() {
if [ "$MOUNTS" == "" ]; then
vols=$(/usr/libexec/PlistBuddy -c "Print :VolumesFromDisks" /dev/stdin <<< "$(diskutil list -plist)" | grep -vE '[{}]' | awk '{$1=$1};1')
if [ "$MOUNTS" = "" ]; then
volsplist=$(mktemp)
diskutil list -plist > "$volsplist"
vols=$(/usr/libexec/PlistBuddy -c "Print :VolumesFromDisks" "$volsplist" | grep -vE '[{}]' | awk '{$1=$1};1')
while read -r vol; do
newMount=$(/usr/libexec/PlistBuddy -c "Print :MountPoint" /dev/stdin <<< $(diskutil info -plist "$vol"))
volplist=$(mktemp)
diskutil info -plist "$vol" > "$volplist"
newMount=$(/usr/libexec/PlistBuddy -c "Print :MountPoint" "$volplist")
if [ "$newMount" != "" ]; then
MOUNTS=$(echo "$newMount\n$MOUNTS")
fi
done <<< "$vols"
rm "$volplist"
done <<-EOF
$vols
EOF
rm "$volsplist"
fi
return 0
}
Expand All @@ -69,7 +77,9 @@ check_fs() {
ARCH=aarch64
echo "Found Raspberry Pi bootfs mounted at $BOOTFS"
fi
done <<< "$MOUNTS"
done <<-EOF
$MOUNTS
EOF

if [ "$ARCH" != "" ] && ([ "$ROOTFS" != "" ] || [ "$BOOTFS" != "" ]); then
return 0
Expand Down Expand Up @@ -113,19 +123,19 @@ if [ "$(id -u)" -ne 0 ]; then
fi

if ! [ -z $1 ]; then
if [ "$1" == "--aarch64" ]; then
if [ "$1" = "--aarch64" ]; then
ARCH=aarch64
TARBALL_ONLY=1
elif [ "$1" == "--x86_64" ]; then
elif [ "$1" = "--x86_64" ]; then
ARCH=x86_64
TARBALL_ONLY=1
elif [ -d "$1" ]; then
MOUNTS="$1"
fi
else
if [ $(uname) == "Linux" ]; then
if [ "$(uname)" = "Linux" ]; then
find_mountpoints_linux
elif [ $(uname) == "Darwin" ]; then
elif [ "$(uname)" = "Darwin" ]; then
find_mountpoints_macos
else
echo "This script only supports auto-detection on Linux and MacOS."
Expand All @@ -139,7 +149,8 @@ if [ "$TARBALL_ONLY" -ne 1 ] && ! check_fs ; then
echo "Error: no valid image found at mountpoints (or manually provided path)"
echo "If installing on a pi (sd card), please make sure it's freshly imaged with a custom hostname."
echo "If the imager auto-ejected the disk, you may need to remove and reinsert it to make it visible again."
echo "Alternately, re-run this script with either '--x86_64' or '--aarch64' options to create a portable package to extract manually."
echo "Alternately, re-run this script with either '--x86_64' or '--aarch64' options to create a portable package to extract manually,"\
"or explicitly specify the root path (/) if you want to install to the live/running system."
exit 1
fi

Expand Down Expand Up @@ -184,16 +195,20 @@ if [ "$IS_PI" -eq "1" ]; then
sed 's/rm -f \/boot\/firstrun.sh/tar -xJpf \/boot\/firmware\/viam-preinstall.tar.xz -C \/\nrm -f \/boot\/firstrun.sh/' "$BOOTFS/firstrun.sh" > "$BOOTFS/firstrun.sh.new"
mv "$BOOTFS/firstrun.sh.new" "$BOOTFS/firstrun.sh"
fi
elif [ "$ROOTFS" != "" ] && [ "$ROOTFS" != "/" ]; then
elif [ "$ROOTFS" != "" ]; then
tar -xJpf "$TARBALL" -C "$ROOTFS"
else
echo "Refusing to install to live root or unknown ROOTFS ($ROOTFS)"
echo "Refusing to install to unknown/unset ROOTFS ($ROOTFS)"
fi

if [ $TEMPDIR != "" ]; then
rm -rf $TEMPDIR
if [ "$TEMPDIR" != "" ]; then
rm -rf "$TEMPDIR"
fi

sync
echo && echo
echo "Install complete! You can eject/unmount and boot the image now."
if [ "$ROOTFS" = "/" ]; then
echo "Install complete! Reboot, or manually start the service with 'systemctl start viam-agent'"
else
echo "Install complete! You can eject/unmount and boot the image now."
fi
244 changes: 244 additions & 0 deletions subsystem.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,12 @@ import (
"fmt"
"io/fs"
"os"
"os/exec"
"path"
"path/filepath"
"regexp"
"sync"
"syscall"
"time"

errw "github.com/pkg/errors"
Expand All @@ -21,6 +24,8 @@ import (

const (
ShortFailTime = time.Second * 30
StartTimeout = time.Minute
StopTimeout = time.Minute
)

var ErrSubsystemDisabled = errors.New("subsystem disabled")
Expand Down Expand Up @@ -367,3 +372,242 @@ func (s *AgentSubsystem) tryInner(ctx context.Context, cfg *pb.DeviceSubsystemCo

return newVersion, nil
}

// InternalSubsystem is shared start/stop/update code between "internal" (not viam-server) subsystems.
type InternalSubsystem struct {
// only set during New
name string
cmdArgs []string
logger *zap.SugaredLogger
cfgPath string

// protected by mutex
mu sync.Mutex
cmd *exec.Cmd
running bool
shouldRun bool
lastExit int
exitChan chan struct{}

// for blocking start/stop/check ops while another is in progress
startStopMu sync.Mutex
}

func NewInternalSubsystem(name string, extraArgs []string, logger *zap.SugaredLogger) (*InternalSubsystem, error) {
if name == "" {
return nil, errors.New("name cannot be empty")
}
if logger == nil {
return nil, errors.New("logger cannot be nil")
}

cfgPath := path.Join(ViamDirs["etc"], name+".json")

is := &InternalSubsystem{
name: name,
cmdArgs: append([]string{"--config", cfgPath}, extraArgs...),
cfgPath: cfgPath,
logger: logger,
}
return is, nil
}

func (is *InternalSubsystem) Start(ctx context.Context) error {
is.startStopMu.Lock()
defer is.startStopMu.Unlock()

is.mu.Lock()

if is.running {
is.mu.Unlock()
return nil
}
if is.shouldRun {
is.logger.Warnf("Restarting %s after unexpected exit", is.name)
} else {
is.logger.Infof("Starting %s", is.name)
is.shouldRun = true
}

stdio := NewMatchingLogger(is.logger, false)
stderr := NewMatchingLogger(is.logger, true)

//nolint:gosec
is.cmd = exec.Command(path.Join(ViamDirs["bin"], is.name), is.cmdArgs...)
is.cmd.Dir = ViamDirs["viam"]
is.cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
is.cmd.Stdout = stdio
is.cmd.Stderr = stderr

// watch for this line in the logs to indicate successful startup
c, err := stdio.AddMatcher("checkStartup", regexp.MustCompile(`startup complete`), false)
if err != nil {
is.mu.Unlock()
return err
}
defer stdio.DeleteMatcher("checkStartup")

err = is.cmd.Start()
if err != nil {
is.mu.Unlock()
return errw.Wrapf(err, "error starting %s", is.name)
}
is.running = true
is.exitChan = make(chan struct{})

// must be unlocked before spawning goroutine
is.mu.Unlock()
go func() {
err := is.cmd.Wait()
is.mu.Lock()
defer is.mu.Unlock()
is.running = false
is.logger.Infof("%s exited", is.name)
if err != nil {
is.logger.Errorw("error while getting process status", "error", err)
}
if is.cmd.ProcessState != nil {
is.lastExit = is.cmd.ProcessState.ExitCode()
if is.lastExit != 0 {
is.logger.Errorw("non-zero exit code", "exit code", is.lastExit)
}
}
close(is.exitChan)
}()

select {
case <-c:
is.logger.Infof("%s started", is.name)
return nil
case <-ctx.Done():
return ctx.Err()
case <-time.After(StartTimeout):
return errw.New("startup timed out")
case <-is.exitChan:
return errw.New("startup failed")
}
}

func (is *InternalSubsystem) Stop(ctx context.Context) error {
is.startStopMu.Lock()
defer is.startStopMu.Unlock()

is.mu.Lock()
running := is.running
is.shouldRun = false
is.mu.Unlock()

if !running {
return nil
}

// interrupt early in startup
if is.cmd == nil {
return nil
}

is.logger.Infof("Stopping %s", is.name)

err := is.cmd.Process.Signal(syscall.SIGTERM)
if err != nil {
is.logger.Error(err)
}

if is.waitForExit(ctx, StopTimeout/2) {
is.logger.Infof("%s successfully stopped", is.name)
return nil
}

is.logger.Warnf("%s refused to exit, killing", is.name)
err = syscall.Kill(-is.cmd.Process.Pid, syscall.SIGKILL)
if err != nil {
is.logger.Error(err)
}

if is.waitForExit(ctx, StopTimeout/2) {
is.logger.Infof("%s successfully killed", is.name)
return nil
}

return errw.Errorf("%s process couldn't be killed", is.name)
}

func (is *InternalSubsystem) waitForExit(ctx context.Context, timeout time.Duration) bool {
is.mu.Lock()
exitChan := is.exitChan
running := is.running
is.mu.Unlock()

if !running {
return true
}

select {
case <-exitChan:
return true
case <-ctx.Done():
return false
case <-time.After(timeout):
return false
}
}

// HealthCheck sends a USR1 signal to the subsystem process, which should cause it to log "HEALTHY" to stdout.
func (is *InternalSubsystem) HealthCheck(ctx context.Context) (errRet error) {
is.startStopMu.Lock()
defer is.startStopMu.Unlock()
is.mu.Lock()
defer is.mu.Unlock()
if !is.running {
return errw.Errorf("%s not running", is.name)
}

is.logger.Debugf("starting healthcheck for %s", is.name)

checkChan, err := is.cmd.Stdout.(*MatchingLogger).AddMatcher("healthcheck", regexp.MustCompile(`HEALTHY`), true)
if err != nil {
return err
}
defer func() {
matcher, ok := is.cmd.Stdout.(*MatchingLogger)
if ok {
matcher.DeleteMatcher("healthcheck")
}
}()

err = is.cmd.Process.Signal(syscall.SIGUSR1)
if err != nil {
is.logger.Error(err)
}

select {
case <-time.After(time.Second * 30):
case <-ctx.Done():
case <-checkChan:
is.logger.Debugf("healthcheck for %s is good", is.name)
return nil
}
return errw.Errorf("timeout waiting for healthcheck on %s", is.name)
}

func (is *InternalSubsystem) Update(ctx context.Context, cfg *pb.DeviceSubsystemConfig, newVersion bool) (bool, error) {
jsonBytes, err := cfg.GetAttributes().MarshalJSON()
if err != nil {
return true, err
}

fileBytes, err := os.ReadFile(is.cfgPath)
// If no changes, only restart if there was a new version.
if err == nil && bytes.Equal(fileBytes, jsonBytes) {
return newVersion, nil
}

// If an error reading the config file, restart and return the error
if err != nil && !errors.Is(err, fs.ErrNotExist) {
return true, err
}

// If attribute changes, restart after writing the new config file.
//nolint:gosec
return true, os.WriteFile(is.cfgPath, jsonBytes, 0o644)
}
Loading

0 comments on commit 0b50eea

Please sign in to comment.