Skip to content

Commit

Permalink
RSDK-8819: Add FTDC file deletion. (#4565)
Browse files Browse the repository at this point in the history
  • Loading branch information
dgottlieb authored Nov 19, 2024
1 parent 79e7f99 commit ee52727
Show file tree
Hide file tree
Showing 3 changed files with 253 additions and 3 deletions.
128 changes: 126 additions & 2 deletions ftdc/ftdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,14 @@ import (
"errors"
"fmt"
"io"
"io/fs"
"os"
"path"
"path/filepath"
"regexp"
"slices"
"strconv"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -102,6 +107,7 @@ type FTDC struct {
readStatsWorker *utils.StoppableWorkers
datumCh chan datum
outputWorkerDone chan struct{}
stopOnce sync.Once

// Fields used to manage where serialized FTDC bytes are written.
outputWriter io.Writer
Expand All @@ -112,6 +118,7 @@ type FTDC struct {
bytesWrittenCounter countingWriter
currOutputFile *os.File
maxFileSizeBytes int64
maxNumFiles int
// ftdcDir controls where FTDC data files will be written.
ftdcDir string

Expand All @@ -132,6 +139,7 @@ func NewWithWriter(writer io.Writer, logger logging.Logger) *FTDC {
logger: logger,
outputWriter: writer,
maxFileSizeBytes: 1_000_000,
maxNumFiles: 10,
}
}

Expand Down Expand Up @@ -183,6 +191,14 @@ func (ftdc *FTDC) Remove(name string) {
func (ftdc *FTDC) Start() {
ftdc.readStatsWorker = utils.NewStoppableWorkerWithTicker(time.Second, ftdc.statsReader)
utils.PanicCapturingGo(ftdc.statsWriter)

// The `fileDeleter` goroutine mostly aligns with the "stoppable worker with ticker"
// pattern. But it has the additional desire that if file deletion exits with a panic, all of
// FTDC should stop.
utils.PanicCapturingGoWithCallback(ftdc.fileDeleter, func(err any) {
ftdc.logger.Warnw("File deleter errored, stopping FTDC", "err", err)
ftdc.StopAndJoin(context.Background())
})
}

func (ftdc *FTDC) statsReader(ctx context.Context) {
Expand Down Expand Up @@ -242,8 +258,12 @@ func (ftdc *FTDC) statsWriter() {
// `statsWriter` by hand, without the `statsReader` can `close(ftdc.datumCh)` followed by
// `<-ftdc.outputWorkerDone` to stop+wait for the `statsWriter`.
func (ftdc *FTDC) StopAndJoin(ctx context.Context) {
ftdc.readStatsWorker.Stop()
close(ftdc.datumCh)
ftdc.stopOnce.Do(func() {
// Only one caller should close the datum channel. And it should be the caller that called
// stop on the worker writing to the channel.
ftdc.readStatsWorker.Stop()
close(ftdc.datumCh)
})

// Closing the `statsCh` signals to the `outputWorker` to complete and exit. We use a timeout to
// limit how long we're willing to wait for the `outputWorker` to drain.
Expand Down Expand Up @@ -439,6 +459,110 @@ func (ftdc *FTDC) getWriter() (io.Writer, error) {
return ftdc.outputWriter, nil
}

func (ftdc *FTDC) fileDeleter() {
for {
select {
// The fileDeleter's goroutine lifetime should match the robot/FTDC lifetime. Borrow the
// `readStatsWorker`s context to track that.
case <-ftdc.readStatsWorker.Context().Done():
return
case <-time.After(time.Second):
}

if err := ftdc.checkAndDeleteOldFiles(); err != nil {
ftdc.logger.Warnw("Error checking FTDC files", "err", err)
}
}
}

// fileTime pairs a file with a time value.
type fileTime struct {
name string
time time.Time
}

func (ftdc *FTDC) checkAndDeleteOldFiles() error {
var files []fileTime

// Walk the `ftdcDir` and gather all of the found files into the captured `files` variable.
err := filepath.Walk(ftdc.ftdcDir, filepath.WalkFunc(func(path string, info fs.FileInfo, walkErr error) error {
if !strings.HasSuffix(path, ".ftdc") {
return nil
}

if walkErr != nil {
ftdc.logger.Warnw("Unexpected walk error. Continuing under the assumption any actual* problem will",
"be caught by the assertions.", "err", walkErr)
return nil
}

parsedTime, err := parseTimeFromFilename(path)
if err == nil {
files = append(files, fileTime{path, parsedTime})
} else {
ftdc.logger.Warnw("Error parsing time from FTDC file", "filename", path)
}
return nil
}))
if err != nil {
return err
}

if len(files) <= ftdc.maxNumFiles {
// We have yet to hit our file limit. Keep all of the files.
ftdc.logger.Debugw("Inside the budget for ftdc files", "numFiles", len(files), "maxNumFiles", ftdc.maxNumFiles)
return nil
}

slices.SortFunc(files, func(left, right fileTime) int {
// Sort in descending order. Such that files indexed first are safe. This eases walking the
// slice of files.
return right.time.Compare(left.time)
})

// If we, for example, have 30 files and we want to keep the newest 10, we delete the trailing
// 20 files.
for _, file := range files[ftdc.maxNumFiles:] {
ftdc.logger.Debugw("Deleting aged out FTDC file", "filename", file.name)
if err := os.Remove(file.name); err != nil {
ftdc.logger.Warnw("Error removing FTDC file", "filename", file.name)
}
}

return nil
}

// filenameTimeRe matches the files produced by ftdc. Filename <-> regex parity is exercised by file
// deletion testing. Filename generation uses padding such that we can rely on there before 2/4
// digits for every numeric value.
//
//nolint
// Example filename: countingBytesTest1228324349/viam-server-2024-11-18T20-37-01Z.ftdc
var filenameTimeRe = regexp.MustCompile(`viam-server-(\d{4})-(\d{2})-(\d{2})T(\d{2})-(\d{2})-(\d{2})Z.ftdc`)

func parseTimeFromFilename(path string) (time.Time, error) {
allMatches := filenameTimeRe.FindAllStringSubmatch(path, -1)
if len(allMatches) != 1 || len(allMatches[0]) != 7 {
return time.Time{}, errors.New("filename did not match pattern")
}

// There's exactly one match and 7 groups. The first "group" is the whole string. We only care
// about the numbers.
matches := allMatches[0][1:]

var numVals [6]int
for idx := 0; idx < 6; idx++ {
val, err := strconv.Atoi(matches[idx])
if err != nil {
return time.Time{}, err
}

numVals[idx] = val
}

return time.Date(numVals[0], time.Month(numVals[1]), numVals[2], numVals[3], numVals[4], numVals[5], 0, time.UTC), nil
}

type countingWriter struct {
count int64
}
Expand Down
126 changes: 126 additions & 0 deletions ftdc/ftdc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"io/fs"
"os"
"path/filepath"
"slices"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -400,3 +401,128 @@ func TestCountingBytes(t *testing.T) {
// `writeDatum`. Thus the subtraction of `1` to get the right equation.
test.That(t, timesRolledOver, test.ShouldEqual, numFTDCFiles-1)
}

func TestParseTimeFromFile(t *testing.T) {
timeVal, err := parseTimeFromFilename("countingBytesTest1228324349/viam-server-2024-11-18T20-37-01Z.ftdc")
test.That(t, err, test.ShouldBeNil)
test.That(t, timeVal.Year(), test.ShouldEqual, 2024)
test.That(t, timeVal.Month(), test.ShouldEqual, time.Month(11))
test.That(t, timeVal.Day(), test.ShouldEqual, 18)
test.That(t, timeVal.Hour(), test.ShouldEqual, 20)
test.That(t, timeVal.Minute(), test.ShouldEqual, 37)
test.That(t, timeVal.Second(), test.ShouldEqual, 1)
}

func TestFileDeletion(t *testing.T) {
// This test takes ~10 seconds due to file naming limitations. This test creates FTDC files on
// disk whose names include the timestamp with seconds resolution. In this case FTDC has to wait
// a second before being able to create the next file.
logger := logging.NewTestLogger(t)

// We must not use `NewWithWriter`. Forcing a writer for FTDC data is not compatible with FTDC
// file rotation.
ftdc := New(logger.Sublogger("ftdc"))

ftdcFileDir, err := os.MkdirTemp("./", "fileDeletionTest")
test.That(t, err, test.ShouldBeNil)
defer os.RemoveAll(ftdcFileDir)

// Isolate all of the files we're going to create to a single, fresh directory.
ftdc.ftdcDir = ftdcFileDir
// Expect a log rotation after 1,000 bytes. For a changing `foo` object, this is ~60 datums.
ftdc.maxFileSizeBytes = 1000
ftdc.maxNumFiles = 3

timesRolledOver := 0
foo := &foo{}
ftdc.Add("foo", foo)

// These settings should result in ~8 rollovers -> ~9 total files.
for cnt := 0; cnt < 500; cnt++ {
foo.x = cnt
foo.y = 2 * cnt

datum := ftdc.constructDatum()
datum.Time = int64(cnt)
err := ftdc.writeDatum(datum)
test.That(t, err, test.ShouldBeNil)

// If writing a datum takes the bytes written to larger than configured max file size, an
// explicit call to `getWriter` should create a new file and reset the count.
if ftdc.bytesWrittenCounter.count >= ftdc.maxFileSizeBytes {
// We're about to write a new ftdc file. The ftdc file names are a function of
// "now". Given the test runs fast, the generated name will collide (names only use
// seconds resolution). We accept this slowdown for this test.
_, err = ftdc.getWriter()
test.That(t, err, test.ShouldBeNil)
test.That(t, ftdc.bytesWrittenCounter.count, test.ShouldBeLessThan, 1000)
timesRolledOver++
}
}

// We created FTDC files by hand without the background deleter goroutine running. Assert that
// we have more than the max allowed. Otherwise the test will trivially "pass".
origFiles := getFTDCFiles(t, ftdc.ftdcDir, logger)
test.That(t, len(origFiles), test.ShouldBeGreaterThan, ftdc.maxNumFiles)
slices.SortFunc(origFiles, func(left, right fs.FileInfo) int {
// Sort in descending order. After deletion, the "leftmost" files should remain. The
// "rightmost" should be removed.
return right.ModTime().Compare(left.ModTime())
})
logger.Info("Orig files:")
for _, f := range origFiles {
logger.Info(" ", f.Name(), " ModTime: ", f.ModTime())
}

// Delete excess FTDC files. Check that we now have exactly the max number of allowed files.
ftdc.checkAndDeleteOldFiles()
leftoverFiles := getFTDCFiles(t, ftdc.ftdcDir, logger)
test.That(t, len(leftoverFiles), test.ShouldEqual, ftdc.maxNumFiles)
slices.SortFunc(leftoverFiles, func(left, right fs.FileInfo) int {
// Sort in descending order.
return right.ModTime().Compare(left.ModTime())
})

logger.Info("Leftover files:")
for _, f := range leftoverFiles {
logger.Info(" ", f.Name(), " ModTime: ", f.ModTime())
}

// We've sorted both files in descending timestamp order as per their filename. Assert that the
// "newest original" files are still remaining.
for idx := 0; idx < len(leftoverFiles); idx++ {
test.That(t, leftoverFiles[idx].Name(), test.ShouldEqual, origFiles[idx].Name())
}

// And assert the "oldest original" files are no longer found.
for idx := len(leftoverFiles); idx < len(origFiles); idx++ {
// The `fs.FileInfo` returned by `os.Lstat` does not include the directory as part of its
// file name. Reconstitute the relative path before testing.
_, err := os.Lstat(filepath.Join(ftdc.ftdcDir, origFiles[idx].Name()))
var pathErr *fs.PathError
if !errors.As(err, &pathErr) {
t.Fatalf("File should be deleted. Lstat error: %v", err)
}
}
}

func getFTDCFiles(t *testing.T, dir string, logger logging.Logger) []fs.FileInfo {
var ret []fs.FileInfo
err := filepath.Walk(dir, filepath.WalkFunc(func(path string, info fs.FileInfo, walkErr error) error {
if !strings.HasSuffix(path, ".ftdc") {
return nil
}

if walkErr != nil {
logger.Info("Unexpected walk error. Continuing under the assumption any actual* problem will",
"be caught by the assertions. WalkErr:", walkErr)
return nil
}

ret = append(ret, info)
return nil
}))
test.That(t, err, test.ShouldBeNil)

return ret
}
2 changes: 1 addition & 1 deletion logging/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func NewTestLogger(tb testing.TB) Logger {
func NewObservedTestLogger(tb testing.TB) (Logger, *observer.ObservedLogs) {
observerCore, observedLogs := observer.New(zap.LevelEnablerFunc(zapcore.DebugLevel.Enabled))
logger := &impl{
name: "",
name: tb.Name(),
level: NewAtomicLevelAt(DEBUG),
appenders: []Appender{
NewTestAppender(tb),
Expand Down

0 comments on commit ee52727

Please sign in to comment.