From 0b50eeab4fa89795b7dc9e754881649f1bb9853c Mon Sep 17 00:00:00 2001 From: James Otting Date: Fri, 8 Mar 2024 13:11:29 -0600 Subject: [PATCH] [APP-4166] [APP-4096] Add Syscfg Subsystem (#12) --- cmd/viam-agent/main.go | 2 + preinstall.sh | 47 +++-- subsystem.go | 244 +++++++++++++++++++++++ subsystems/provisioning/provisioning.go | 249 ++---------------------- subsystems/registry/registry.go | 4 +- subsystems/syscfg/syscfg.go | 37 ++++ 6 files changed, 328 insertions(+), 255 deletions(-) create mode 100644 subsystems/syscfg/syscfg.go diff --git a/cmd/viam-agent/main.go b/cmd/viam-agent/main.go index a89dd18..cc754e6 100644 --- a/cmd/viam-agent/main.go +++ b/cmd/viam-agent/main.go @@ -20,6 +20,7 @@ import ( "github.com/pkg/errors" "github.com/viamrobotics/agent" "github.com/viamrobotics/agent/subsystems/provisioning" + "github.com/viamrobotics/agent/subsystems/syscfg" "github.com/viamrobotics/agent/subsystems/viamagent" "github.com/viamrobotics/agent/subsystems/viamserver" "go.viam.com/utils" @@ -67,6 +68,7 @@ func main() { if opts.Debug { globalLogger = golog.NewDebugLogger("viam-agent") provisioning.Debug = true + syscfg.Debug = true } // need to be root to go any further than this diff --git a/preinstall.sh b/preinstall.sh index 495df09..7a46bed 100755 --- a/preinstall.sh +++ b/preinstall.sh @@ -31,20 +31,28 @@ EOF ) find_mountpoints_linux() { - if [ "$MOUNTS" == "" ]; then + if [ "$MOUNTS" = "" ]; then MOUNTS=$(findmnt -o TARGET -l | grep -v TARGET | grep -vE '^/$') fi } find_mountpoints_macos() { - if [ "$MOUNTS" == "" ]; then - vols=$(/usr/libexec/PlistBuddy -c "Print :VolumesFromDisks" /dev/stdin <<< "$(diskutil list -plist)" | grep -vE '[{}]' | awk '{$1=$1};1') + if [ "$MOUNTS" = "" ]; then + volsplist=$(mktemp) + diskutil list -plist > "$volsplist" + vols=$(/usr/libexec/PlistBuddy -c "Print :VolumesFromDisks" "$volsplist" | grep -vE '[{}]' | awk '{$1=$1};1') while read -r vol; do - newMount=$(/usr/libexec/PlistBuddy -c "Print :MountPoint" /dev/stdin <<< $(diskutil info -plist "$vol")) + volplist=$(mktemp) + diskutil info -plist "$vol" > "$volplist" + newMount=$(/usr/libexec/PlistBuddy -c "Print :MountPoint" "$volplist") if [ "$newMount" != "" ]; then MOUNTS=$(echo "$newMount\n$MOUNTS") fi - done <<< "$vols" + rm "$volplist" + done <<-EOF + $vols + EOF + rm "$volsplist" fi return 0 } @@ -69,7 +77,9 @@ check_fs() { ARCH=aarch64 echo "Found Raspberry Pi bootfs mounted at $BOOTFS" fi - done <<< "$MOUNTS" + done <<-EOF + $MOUNTS + EOF if [ "$ARCH" != "" ] && ([ "$ROOTFS" != "" ] || [ "$BOOTFS" != "" ]); then return 0 @@ -113,19 +123,19 @@ if [ "$(id -u)" -ne 0 ]; then fi if ! [ -z $1 ]; then - if [ "$1" == "--aarch64" ]; then + if [ "$1" = "--aarch64" ]; then ARCH=aarch64 TARBALL_ONLY=1 - elif [ "$1" == "--x86_64" ]; then + elif [ "$1" = "--x86_64" ]; then ARCH=x86_64 TARBALL_ONLY=1 elif [ -d "$1" ]; then MOUNTS="$1" fi else - if [ $(uname) == "Linux" ]; then + if [ "$(uname)" = "Linux" ]; then find_mountpoints_linux - elif [ $(uname) == "Darwin" ]; then + elif [ "$(uname)" = "Darwin" ]; then find_mountpoints_macos else echo "This script only supports auto-detection on Linux and MacOS." @@ -139,7 +149,8 @@ if [ "$TARBALL_ONLY" -ne 1 ] && ! check_fs ; then echo "Error: no valid image found at mountpoints (or manually provided path)" echo "If installing on a pi (sd card), please make sure it's freshly imaged with a custom hostname." echo "If the imager auto-ejected the disk, you may need to remove and reinsert it to make it visible again." - echo "Alternately, re-run this script with either '--x86_64' or '--aarch64' options to create a portable package to extract manually." + echo "Alternately, re-run this script with either '--x86_64' or '--aarch64' options to create a portable package to extract manually,"\ + "or explicitly specify the root path (/) if you want to install to the live/running system." exit 1 fi @@ -184,16 +195,20 @@ if [ "$IS_PI" -eq "1" ]; then sed 's/rm -f \/boot\/firstrun.sh/tar -xJpf \/boot\/firmware\/viam-preinstall.tar.xz -C \/\nrm -f \/boot\/firstrun.sh/' "$BOOTFS/firstrun.sh" > "$BOOTFS/firstrun.sh.new" mv "$BOOTFS/firstrun.sh.new" "$BOOTFS/firstrun.sh" fi -elif [ "$ROOTFS" != "" ] && [ "$ROOTFS" != "/" ]; then +elif [ "$ROOTFS" != "" ]; then tar -xJpf "$TARBALL" -C "$ROOTFS" else - echo "Refusing to install to live root or unknown ROOTFS ($ROOTFS)" + echo "Refusing to install to unknown/unset ROOTFS ($ROOTFS)" fi -if [ $TEMPDIR != "" ]; then - rm -rf $TEMPDIR +if [ "$TEMPDIR" != "" ]; then + rm -rf "$TEMPDIR" fi sync echo && echo -echo "Install complete! You can eject/unmount and boot the image now." +if [ "$ROOTFS" = "/" ]; then + echo "Install complete! Reboot, or manually start the service with 'systemctl start viam-agent'" +else + echo "Install complete! You can eject/unmount and boot the image now." +fi diff --git a/subsystem.go b/subsystem.go index f693833..aaf9d8a 100644 --- a/subsystem.go +++ b/subsystem.go @@ -9,9 +9,12 @@ import ( "fmt" "io/fs" "os" + "os/exec" "path" "path/filepath" + "regexp" "sync" + "syscall" "time" errw "github.com/pkg/errors" @@ -21,6 +24,8 @@ import ( const ( ShortFailTime = time.Second * 30 + StartTimeout = time.Minute + StopTimeout = time.Minute ) var ErrSubsystemDisabled = errors.New("subsystem disabled") @@ -367,3 +372,242 @@ func (s *AgentSubsystem) tryInner(ctx context.Context, cfg *pb.DeviceSubsystemCo return newVersion, nil } + +// InternalSubsystem is shared start/stop/update code between "internal" (not viam-server) subsystems. +type InternalSubsystem struct { + // only set during New + name string + cmdArgs []string + logger *zap.SugaredLogger + cfgPath string + + // protected by mutex + mu sync.Mutex + cmd *exec.Cmd + running bool + shouldRun bool + lastExit int + exitChan chan struct{} + + // for blocking start/stop/check ops while another is in progress + startStopMu sync.Mutex +} + +func NewInternalSubsystem(name string, extraArgs []string, logger *zap.SugaredLogger) (*InternalSubsystem, error) { + if name == "" { + return nil, errors.New("name cannot be empty") + } + if logger == nil { + return nil, errors.New("logger cannot be nil") + } + + cfgPath := path.Join(ViamDirs["etc"], name+".json") + + is := &InternalSubsystem{ + name: name, + cmdArgs: append([]string{"--config", cfgPath}, extraArgs...), + cfgPath: cfgPath, + logger: logger, + } + return is, nil +} + +func (is *InternalSubsystem) Start(ctx context.Context) error { + is.startStopMu.Lock() + defer is.startStopMu.Unlock() + + is.mu.Lock() + + if is.running { + is.mu.Unlock() + return nil + } + if is.shouldRun { + is.logger.Warnf("Restarting %s after unexpected exit", is.name) + } else { + is.logger.Infof("Starting %s", is.name) + is.shouldRun = true + } + + stdio := NewMatchingLogger(is.logger, false) + stderr := NewMatchingLogger(is.logger, true) + + //nolint:gosec + is.cmd = exec.Command(path.Join(ViamDirs["bin"], is.name), is.cmdArgs...) + is.cmd.Dir = ViamDirs["viam"] + is.cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} + is.cmd.Stdout = stdio + is.cmd.Stderr = stderr + + // watch for this line in the logs to indicate successful startup + c, err := stdio.AddMatcher("checkStartup", regexp.MustCompile(`startup complete`), false) + if err != nil { + is.mu.Unlock() + return err + } + defer stdio.DeleteMatcher("checkStartup") + + err = is.cmd.Start() + if err != nil { + is.mu.Unlock() + return errw.Wrapf(err, "error starting %s", is.name) + } + is.running = true + is.exitChan = make(chan struct{}) + + // must be unlocked before spawning goroutine + is.mu.Unlock() + go func() { + err := is.cmd.Wait() + is.mu.Lock() + defer is.mu.Unlock() + is.running = false + is.logger.Infof("%s exited", is.name) + if err != nil { + is.logger.Errorw("error while getting process status", "error", err) + } + if is.cmd.ProcessState != nil { + is.lastExit = is.cmd.ProcessState.ExitCode() + if is.lastExit != 0 { + is.logger.Errorw("non-zero exit code", "exit code", is.lastExit) + } + } + close(is.exitChan) + }() + + select { + case <-c: + is.logger.Infof("%s started", is.name) + return nil + case <-ctx.Done(): + return ctx.Err() + case <-time.After(StartTimeout): + return errw.New("startup timed out") + case <-is.exitChan: + return errw.New("startup failed") + } +} + +func (is *InternalSubsystem) Stop(ctx context.Context) error { + is.startStopMu.Lock() + defer is.startStopMu.Unlock() + + is.mu.Lock() + running := is.running + is.shouldRun = false + is.mu.Unlock() + + if !running { + return nil + } + + // interrupt early in startup + if is.cmd == nil { + return nil + } + + is.logger.Infof("Stopping %s", is.name) + + err := is.cmd.Process.Signal(syscall.SIGTERM) + if err != nil { + is.logger.Error(err) + } + + if is.waitForExit(ctx, StopTimeout/2) { + is.logger.Infof("%s successfully stopped", is.name) + return nil + } + + is.logger.Warnf("%s refused to exit, killing", is.name) + err = syscall.Kill(-is.cmd.Process.Pid, syscall.SIGKILL) + if err != nil { + is.logger.Error(err) + } + + if is.waitForExit(ctx, StopTimeout/2) { + is.logger.Infof("%s successfully killed", is.name) + return nil + } + + return errw.Errorf("%s process couldn't be killed", is.name) +} + +func (is *InternalSubsystem) waitForExit(ctx context.Context, timeout time.Duration) bool { + is.mu.Lock() + exitChan := is.exitChan + running := is.running + is.mu.Unlock() + + if !running { + return true + } + + select { + case <-exitChan: + return true + case <-ctx.Done(): + return false + case <-time.After(timeout): + return false + } +} + +// HealthCheck sends a USR1 signal to the subsystem process, which should cause it to log "HEALTHY" to stdout. +func (is *InternalSubsystem) HealthCheck(ctx context.Context) (errRet error) { + is.startStopMu.Lock() + defer is.startStopMu.Unlock() + is.mu.Lock() + defer is.mu.Unlock() + if !is.running { + return errw.Errorf("%s not running", is.name) + } + + is.logger.Debugf("starting healthcheck for %s", is.name) + + checkChan, err := is.cmd.Stdout.(*MatchingLogger).AddMatcher("healthcheck", regexp.MustCompile(`HEALTHY`), true) + if err != nil { + return err + } + defer func() { + matcher, ok := is.cmd.Stdout.(*MatchingLogger) + if ok { + matcher.DeleteMatcher("healthcheck") + } + }() + + err = is.cmd.Process.Signal(syscall.SIGUSR1) + if err != nil { + is.logger.Error(err) + } + + select { + case <-time.After(time.Second * 30): + case <-ctx.Done(): + case <-checkChan: + is.logger.Debugf("healthcheck for %s is good", is.name) + return nil + } + return errw.Errorf("timeout waiting for healthcheck on %s", is.name) +} + +func (is *InternalSubsystem) Update(ctx context.Context, cfg *pb.DeviceSubsystemConfig, newVersion bool) (bool, error) { + jsonBytes, err := cfg.GetAttributes().MarshalJSON() + if err != nil { + return true, err + } + + fileBytes, err := os.ReadFile(is.cfgPath) + // If no changes, only restart if there was a new version. + if err == nil && bytes.Equal(fileBytes, jsonBytes) { + return newVersion, nil + } + + // If an error reading the config file, restart and return the error + if err != nil && !errors.Is(err, fs.ErrNotExist) { + return true, err + } + + // If attribute changes, restart after writing the new config file. + //nolint:gosec + return true, os.WriteFile(is.cfgPath, jsonBytes, 0o644) +} diff --git a/subsystems/provisioning/provisioning.go b/subsystems/provisioning/provisioning.go index 5f2f982..628ad63 100644 --- a/subsystems/provisioning/provisioning.go +++ b/subsystems/provisioning/provisioning.go @@ -2,19 +2,8 @@ package provisioning import ( - "bytes" "context" - "errors" - "io/fs" - "os" - "os/exec" - "path" - "regexp" - "sync" - "syscall" - "time" - errw "github.com/pkg/errors" "github.com/viamrobotics/agent" "github.com/viamrobotics/agent/subsystems" "github.com/viamrobotics/agent/subsystems/registry" @@ -27,240 +16,26 @@ func init() { } var ( - Debug = false - DefaultConfig = &pb.DeviceSubsystemConfig{} + Debug = false + DefaultConfig = &pb.DeviceSubsystemConfig{} + AppConfigFilePath = "/etc/viam.json" ) const ( - startTimeout = time.Minute - stopTimeout = time.Minute - SubsysName = "agent-provisioning" + SubsysName = "agent-provisioning" ) -var ( - ConfigFilePath = path.Join(agent.ViamDirs["etc"], SubsysName+".json") - AppConfigFilePath = "/etc/viam.json" - ProvisioningConfigFilePath = "/etc/viam-provisioning.json" -) - -type provisioning struct { - mu sync.Mutex - cmd *exec.Cmd - running bool - shouldRun bool - lastExit int - exitChan chan struct{} - - // for blocking start/stop/check ops while another is in progress - startStopMu sync.Mutex - - logger *zap.SugaredLogger -} - -func (n *provisioning) Start(ctx context.Context) error { - n.startStopMu.Lock() - defer n.startStopMu.Unlock() - - n.mu.Lock() - - if n.running { - n.mu.Unlock() - return nil - } - if n.shouldRun { - n.logger.Warnf("Restarting %s after unexpected exit", SubsysName) - } else { - n.logger.Infof("Starting %s", SubsysName) - n.shouldRun = true +func NewSubsystem(ctx context.Context, logger *zap.SugaredLogger, updateConf *pb.DeviceSubsystemConfig) (subsystems.Subsystem, error) { + extraArgs := []string{ + "--app-config", AppConfigFilePath, + "--provisioning-config", "/etc/viam-provisioning.json", } - - stdio := agent.NewMatchingLogger(n.logger, false) - stderr := agent.NewMatchingLogger(n.logger, true) - - cmdArgs := []string{"--config", ConfigFilePath, "--app-config", AppConfigFilePath, "--provisioning-config", ProvisioningConfigFilePath} if Debug { - cmdArgs = append(cmdArgs, "--debug") - } - //nolint:gosec - n.cmd = exec.Command(path.Join(agent.ViamDirs["bin"], SubsysName), cmdArgs...) - n.cmd.Dir = agent.ViamDirs["viam"] - n.cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} - n.cmd.Stdout = stdio - n.cmd.Stderr = stderr - - // watch for this line in the logs to indicate successful startup - c, err := stdio.AddMatcher("checkStartup", regexp.MustCompile(`agent-provisioning startup complete`), false) - if err != nil { - n.mu.Unlock() - return err - } - defer stdio.DeleteMatcher("checkStartup") - - err = n.cmd.Start() - if err != nil { - n.mu.Unlock() - return errw.Wrapf(err, "error starting %s", SubsysName) - } - n.running = true - n.exitChan = make(chan struct{}) - - // must be unlocked before spawning goroutine - n.mu.Unlock() - go func() { - err := n.cmd.Wait() - n.mu.Lock() - defer n.mu.Unlock() - n.running = false - n.logger.Infof("%s exited", SubsysName) - if err != nil { - n.logger.Errorw("error while getting process status", "error", err) - } - if n.cmd.ProcessState != nil { - n.lastExit = n.cmd.ProcessState.ExitCode() - if n.lastExit != 0 { - n.logger.Errorw("non-zero exit code", "exit code", n.lastExit) - } - } - close(n.exitChan) - }() - - select { - case <-c: - n.logger.Infof("%s started", SubsysName) - return nil - case <-ctx.Done(): - return ctx.Err() - case <-time.After(startTimeout): - return errw.New("startup timed out") - case <-n.exitChan: - return errw.New("startup failed") - } -} - -func (n *provisioning) Stop(ctx context.Context) error { - n.startStopMu.Lock() - defer n.startStopMu.Unlock() - - n.mu.Lock() - running := n.running - n.shouldRun = false - n.mu.Unlock() - - if !running { - return nil - } - - // interrupt early in startup - if n.cmd == nil { - return nil - } - - n.logger.Infof("Stopping %s", SubsysName) - - err := n.cmd.Process.Signal(syscall.SIGTERM) - if err != nil { - n.logger.Error(err) - } - - if n.waitForExit(ctx, stopTimeout/2) { - n.logger.Infof("%s successfully stopped", SubsysName) - return nil - } - - n.logger.Warnf("%s refused to exit, killing", SubsysName) - err = syscall.Kill(-n.cmd.Process.Pid, syscall.SIGKILL) - if err != nil { - n.logger.Error(err) - } - - if n.waitForExit(ctx, stopTimeout/2) { - n.logger.Infof("%s successfully killed", SubsysName) - return nil - } - - return errw.Errorf("%s process couldn't be killed", SubsysName) -} - -func (n *provisioning) waitForExit(ctx context.Context, timeout time.Duration) bool { - n.mu.Lock() - exitChan := n.exitChan - running := n.running - n.mu.Unlock() - - if !running { - return true - } - - select { - case <-exitChan: - return true - case <-ctx.Done(): - return false - case <-time.After(timeout): - return false - } -} - -// Healthcheck sends a USR1 signal to the provisioning process, which should cause it to log "HEALTHY" to stdout. -func (n *provisioning) HealthCheck(ctx context.Context) (errRet error) { - n.startStopMu.Lock() - defer n.startStopMu.Unlock() - n.mu.Lock() - defer n.mu.Unlock() - if !n.running { - return errw.Errorf("%s not running", SubsysName) - } - - n.logger.Debugf("starting healthcheck for %s", SubsysName) - - checkChan, err := n.cmd.Stdout.(*agent.MatchingLogger).AddMatcher("healthcheck", regexp.MustCompile(`HEALTHY`), true) - if err != nil { - return err + extraArgs = append(extraArgs, "--debug") } - defer func() { - matcher, ok := n.cmd.Stdout.(*agent.MatchingLogger) - if ok { - matcher.DeleteMatcher("healthcheck") - } - }() - - err = n.cmd.Process.Signal(syscall.SIGUSR1) + is, err := agent.NewInternalSubsystem(SubsysName, extraArgs, logger) if err != nil { - n.logger.Error(err) + return nil, err } - - select { - case <-time.After(time.Second * 30): - case <-ctx.Done(): - case <-checkChan: - n.logger.Debugf("healthcheck for %s is good", SubsysName) - return nil - } - return errw.Errorf("timeout waiting for healthcheck on %s", SubsysName) -} - -func (n *provisioning) Update(ctx context.Context, cfg *pb.DeviceSubsystemConfig, newVersion bool) (bool, error) { - jsonBytes, err := cfg.GetAttributes().MarshalJSON() - if err != nil { - return true, err - } - //nolint:gosec - fileBytes, err := os.ReadFile(ConfigFilePath) - // If no changes, only restart if there was a new version. - if err == nil && bytes.Equal(fileBytes, jsonBytes) { - return newVersion, nil - } - - // If an error reading the config file, restart and return the error - if err != nil && !errors.Is(err, fs.ErrNotExist) { - return true, err - } - - // If attribute changes, restart after writing the new config file. - //nolint:gosec - return true, os.WriteFile(ConfigFilePath, jsonBytes, 0o644) -} - -func NewSubsystem(ctx context.Context, logger *zap.SugaredLogger, updateConf *pb.DeviceSubsystemConfig) (subsystems.Subsystem, error) { - return agent.NewAgentSubsystem(ctx, SubsysName, logger, &provisioning{logger: logger.Named(SubsysName)}) + return agent.NewAgentSubsystem(ctx, SubsysName, logger, is) } diff --git a/subsystems/registry/registry.go b/subsystems/registry/registry.go index 824a504..6c0e83f 100644 --- a/subsystems/registry/registry.go +++ b/subsystems/registry/registry.go @@ -18,11 +18,11 @@ var ( type CreatorFunc func(ctx context.Context, logger *zap.SugaredLogger, updateConf *pb.DeviceSubsystemConfig) (subsystems.Subsystem, error) -func Register(name string, creator CreatorFunc, cfg *pb.DeviceSubsystemConfig) { +func Register(name string, creator CreatorFunc, defaultCfg *pb.DeviceSubsystemConfig) { mu.Lock() defer mu.Unlock() creators[name] = creator - configs[name] = cfg + configs[name] = defaultCfg } func Deregister(name string) { diff --git a/subsystems/syscfg/syscfg.go b/subsystems/syscfg/syscfg.go new file mode 100644 index 0000000..71afd56 --- /dev/null +++ b/subsystems/syscfg/syscfg.go @@ -0,0 +1,37 @@ +// Package syscfg contains the system configuration agent subsystem. +package syscfg + +import ( + "context" + + "github.com/viamrobotics/agent" + "github.com/viamrobotics/agent/subsystems" + "github.com/viamrobotics/agent/subsystems/registry" + "go.uber.org/zap" + pb "go.viam.com/api/app/agent/v1" +) + +func init() { + registry.Register(SubsysName, NewSubsystem, DefaultConfig) +} + +var ( + Debug = false + DefaultConfig = &pb.DeviceSubsystemConfig{} +) + +const ( + SubsysName = "agent-syscfg" +) + +func NewSubsystem(ctx context.Context, logger *zap.SugaredLogger, updateConf *pb.DeviceSubsystemConfig) (subsystems.Subsystem, error) { + extraArgs := []string{} + if Debug { + extraArgs = []string{"--debug"} + } + is, err := agent.NewInternalSubsystem(SubsysName, extraArgs, logger) + if err != nil { + return nil, err + } + return agent.NewAgentSubsystem(ctx, SubsysName, logger, is) +}