From 36a4ccab0670b48cc8f998515e1049b5aa2c4155 Mon Sep 17 00:00:00 2001 From: Nick Sanford Date: Tue, 8 Oct 2024 15:52:36 -0400 Subject: [PATCH 1/6] DATA-3072-split-prog-and-capture-files --- data/capture_buffer.go | 21 +++- data/capture_file.go | 97 +------------------ data/capture_file_test.go | 6 +- data/collector.go | 18 ++-- .../datamanager/builtin/capture/capture.go | 7 +- 5 files changed, 29 insertions(+), 120 deletions(-) diff --git a/data/capture_buffer.go b/data/capture_buffer.go index 0989efcae57..85d27c0bfa7 100644 --- a/data/capture_buffer.go +++ b/data/capture_buffer.go @@ -17,7 +17,7 @@ type CaptureBufferedWriter interface { type CaptureBuffer struct { Directory string MetaData *v1.DataCaptureMetadata - nextFile *CaptureFile + nextFile *ProgFile lock sync.Mutex maxCaptureFileSize int64 } @@ -36,12 +36,23 @@ func NewCaptureBuffer(dir string, md *v1.DataCaptureMetadata, maxCaptureFileSize // are still being written to are indicated with the extension // InProgressFileExt. Files that have finished being written to are indicated by // FileExt. +func isBinary(item *v1.SensorData) bool { + if item == nil { + return false + } + switch item.Data.(type) { + case *v1.SensorData_Binary: + return true + default: + return false + } +} func (b *CaptureBuffer) Write(item *v1.SensorData) error { b.lock.Lock() defer b.lock.Unlock() - if item.GetBinary() != nil { - binFile, err := NewCaptureFile(b.Directory, b.MetaData) + if isBinary(item) { + binFile, err := NewProgFile(b.Directory, b.MetaData) if err != nil { return err } @@ -55,7 +66,7 @@ func (b *CaptureBuffer) Write(item *v1.SensorData) error { } if b.nextFile == nil { - nextFile, err := NewCaptureFile(b.Directory, b.MetaData) + nextFile, err := NewProgFile(b.Directory, b.MetaData) if err != nil { return err } @@ -67,7 +78,7 @@ func (b *CaptureBuffer) Write(item *v1.SensorData) error { if err := b.nextFile.Close(); err != nil { return err } - nextFile, err := NewCaptureFile(b.Directory, b.MetaData) + nextFile, err := NewProgFile(b.Directory, b.MetaData) if err != nil { return err } diff --git a/data/capture_file.go b/data/capture_file.go index 47b118e9926..7019f754db2 100644 --- a/data/capture_file.go +++ b/data/capture_file.go @@ -1,7 +1,6 @@ package data import ( - "bufio" "fmt" "io" "os" @@ -13,9 +12,7 @@ import ( "github.com/matttproud/golang_protobuf_extensions/pbutil" "github.com/pkg/errors" v1 "go.viam.com/api/app/datasync/v1" - "google.golang.org/protobuf/types/known/anypb" - "go.viam.com/rdk/resource" "go.viam.com/rdk/utils" ) @@ -45,13 +42,11 @@ type CaptureFile struct { path string lock sync.Mutex file *os.File - writer *bufio.Writer size int64 metadata *v1.DataCaptureMetadata initialReadOffset int64 readOffset int64 - writeOffset int64 } // ReadCaptureFile creates a File struct from a passed os.File previously constructed using NewFile. @@ -73,43 +68,15 @@ func ReadCaptureFile(f *os.File) (*CaptureFile, error) { ret := CaptureFile{ path: f.Name(), file: f, - writer: bufio.NewWriter(f), size: finfo.Size(), metadata: md, initialReadOffset: int64(initOffset), readOffset: int64(initOffset), - writeOffset: int64(initOffset), } return &ret, nil } -// NewCaptureFile creates a new *CaptureFile with the specified md in the specified directory. -func NewCaptureFile(dir string, md *v1.DataCaptureMetadata) (*CaptureFile, error) { - fileName := CaptureFilePathWithReplacedReservedChars( - filepath.Join(dir, getFileTimestampName()) + InProgressCaptureFileExt) - //nolint:gosec - f, err := os.OpenFile(fileName, os.O_APPEND|os.O_RDWR|os.O_CREATE, 0o600) - if err != nil { - return nil, err - } - - // Then write first metadata message to the file. - n, err := pbutil.WriteDelimited(f, md) - if err != nil { - return nil, err - } - return &CaptureFile{ - path: f.Name(), - writer: bufio.NewWriter(f), - file: f, - size: int64(n), - initialReadOffset: int64(n), - readOffset: int64(n), - writeOffset: int64(n), - }, nil -} - // ReadMetadata reads and returns the metadata in f. func (f *CaptureFile) ReadMetadata() *v1.DataCaptureMetadata { return f.metadata @@ -120,10 +87,6 @@ func (f *CaptureFile) ReadNext() (*v1.SensorData, error) { f.lock.Lock() defer f.lock.Unlock() - if err := f.writer.Flush(); err != nil { - return nil, err - } - if _, err := f.file.Seek(f.readOffset, io.SeekStart); err != nil { return nil, err } @@ -137,30 +100,6 @@ func (f *CaptureFile) ReadNext() (*v1.SensorData, error) { return &r, nil } -// WriteNext writes the next SensorData reading. -func (f *CaptureFile) WriteNext(data *v1.SensorData) error { - f.lock.Lock() - defer f.lock.Unlock() - - if _, err := f.file.Seek(f.writeOffset, 0); err != nil { - return err - } - n, err := pbutil.WriteDelimited(f.writer, data) - if err != nil { - return err - } - f.size += int64(n) - f.writeOffset += int64(n) - return nil -} - -// Flush flushes any buffered writes to disk. -func (f *CaptureFile) Flush() error { - f.lock.Lock() - defer f.lock.Unlock() - return f.writer.Flush() -} - // Reset resets the read pointer of f. func (f *CaptureFile) Reset() { f.lock.Lock() @@ -182,18 +121,6 @@ func (f *CaptureFile) GetPath() string { // Close closes the file. func (f *CaptureFile) Close() error { - f.lock.Lock() - defer f.lock.Unlock() - if err := f.writer.Flush(); err != nil { - return err - } - - // Rename file to indicate that it is done being written. - withoutExt := strings.TrimSuffix(f.file.Name(), filepath.Ext(f.file.Name())) - newName := withoutExt + CompletedCaptureFileExt - if err := os.Rename(f.file.Name(), newName); err != nil { - return err - } return f.file.Close() } @@ -207,31 +134,9 @@ func (f *CaptureFile) Delete() error { return os.Remove(f.GetPath()) } -// BuildCaptureMetadata builds a DataCaptureMetadata object and returns error if -// additionalParams fails to convert to anypb map. -func BuildCaptureMetadata( - compAPI resource.API, - compName string, - method string, - additionalParams map[string]string, - methodParams map[string]*anypb.Any, - tags []string, -) *v1.DataCaptureMetadata { - dataType := getDataType(method) - return &v1.DataCaptureMetadata{ - ComponentType: compAPI.String(), - ComponentName: compName, - MethodName: method, - Type: dataType, - MethodParameters: methodParams, - FileExtension: GetFileExt(dataType, method, additionalParams), - Tags: tags, - } -} - // IsDataCaptureFile returns whether or not f is a data capture file. func IsDataCaptureFile(f *os.File) bool { - return filepath.Ext(f.Name()) == CompletedCaptureFileExt || filepath.Ext(f.Name()) == InProgressCaptureFileExt + return filepath.Ext(f.Name()) == CompletedCaptureFileExt } // Create a filename based on the current time. diff --git a/data/capture_file_test.go b/data/capture_file_test.go index 3de5756a88c..bd70c3a760f 100644 --- a/data/capture_file_test.go +++ b/data/capture_file_test.go @@ -155,7 +155,7 @@ func TestReadCorruptedFile(t *testing.T) { md := &v1.DataCaptureMetadata{ Type: v1.DataType_DATA_TYPE_TABULAR_SENSOR, } - f, err := NewCaptureFile(dir, md) + f, err := NewProgFile(dir, md) test.That(t, err, test.ShouldBeNil) numReadings := 100 for i := 0; i < numReadings; i++ { @@ -167,10 +167,10 @@ func TestReadCorruptedFile(t *testing.T) { } _, err = f.writer.Write([]byte("invalid data")) test.That(t, err, test.ShouldBeNil) - test.That(t, f.writer.Flush(), test.ShouldBeNil) + test.That(t, f.Close(), test.ShouldBeNil) // Should still be able to successfully read all the successfully written data. - sd, err := SensorDataFromCaptureFilePath(f.GetPath()) + sd, err := SensorDataFromCaptureFilePath(captureFilePath(f.GetPath())) test.That(t, err, test.ShouldBeNil) test.That(t, len(sd), test.ShouldEqual, numReadings) } diff --git a/data/collector.go b/data/collector.go index 948d74768b1..3259e97aeb5 100644 --- a/data/collector.go +++ b/data/collector.go @@ -189,14 +189,15 @@ func (c *collector) getAndPushNextReading() { return } + md := &v1.SensorMetadata{ + TimeRequested: timeRequested, + TimeReceived: timeReceived, + } var msg v1.SensorData switch v := reading.(type) { case []byte: msg = v1.SensorData{ - Metadata: &v1.SensorMetadata{ - TimeRequested: timeRequested, - TimeReceived: timeReceived, - }, + Metadata: md, Data: &v1.SensorData_Binary{ Binary: v, }, @@ -204,7 +205,6 @@ func (c *collector) getAndPushNextReading() { default: // If it's not bytes, it's a struct. var pbReading *structpb.Struct - var err error if reflect.TypeOf(reading) == reflect.TypeOf(pb.GetReadingsResponse{}) { // We special-case the GetReadingsResponse because it already contains @@ -216,18 +216,16 @@ func (c *collector) getAndPushNextReading() { ) pbReading = &structpb.Struct{Fields: topLevelMap} } else { - pbReading, err = protoutils.StructToStructPbIgnoreOmitEmpty(reading) + tmp, err := protoutils.StructToStructPbIgnoreOmitEmpty(reading) if err != nil { c.captureErrors <- errors.Wrap(err, "error while converting reading to structpb.Struct") return } + pbReading = tmp } msg = v1.SensorData{ - Metadata: &v1.SensorMetadata{ - TimeRequested: timeRequested, - TimeReceived: timeReceived, - }, + Metadata: md, Data: &v1.SensorData_Struct{ Struct: pbReading, }, diff --git a/services/datamanager/builtin/capture/capture.go b/services/datamanager/builtin/capture/capture.go index ebb5038a54e..df81cdc3a75 100644 --- a/services/datamanager/builtin/capture/capture.go +++ b/services/datamanager/builtin/capture/capture.go @@ -19,12 +19,7 @@ import ( "go.viam.com/rdk/services/datamanager" ) -// TODO: re-determine if queue size is optimal given we now support 10khz+ capture rates -// The Collector's queue should be big enough to ensure that .capture() is never blocked by the queue being -// written to disk. A default value of 250 was chosen because even with the fastest reasonable capture interval (1ms), -// this would leave 250ms for a (buffered) disk write before blocking, which seems sufficient for the size of -// writes this would be performing. -const defaultCaptureQueueSize = 250 +const defaultCaptureQueueSize = 0 // Default bufio.Writer buffer size in bytes. const defaultCaptureBufferSize = 4096 From a888d0aed8e1eb1e47d766909737c933f4bffaf0 Mon Sep 17 00:00:00 2001 From: Nick Sanford Date: Wed, 9 Oct 2024 16:53:12 -0400 Subject: [PATCH 2/6] wip --- cli/data.go | 2 +- data/capture_buffer.go | 19 +++--- data/capture_buffer_test.go | 84 +++++++++++++++++++++++--- data/capture_file.go | 94 ++++++++++++++--------------- data/capture_file_test.go | 2 +- data/prog_file.go | 117 ++++++++++++++++++++++++++++++++++++ 6 files changed, 251 insertions(+), 67 deletions(-) create mode 100644 data/prog_file.go diff --git a/cli/data.go b/cli/data.go index 1f4e078b9da..71dc464692e 100644 --- a/cli/data.go +++ b/cli/data.go @@ -602,7 +602,7 @@ func filenameForDownload(meta *datapb.BinaryMetadata) string { } // Replace reserved characters. - fileName = data.CaptureFilePathWithReplacedReservedChars(fileName) + fileName = data.FilePathWithReplacedReservedChars(fileName) return fileName } diff --git a/data/capture_buffer.go b/data/capture_buffer.go index 85d27c0bfa7..9b8349678fc 100644 --- a/data/capture_buffer.go +++ b/data/capture_buffer.go @@ -15,8 +15,8 @@ type CaptureBufferedWriter interface { // CaptureBuffer is a persistent queue of SensorData backed by a series of *data.CaptureFile. type CaptureBuffer struct { - Directory string - MetaData *v1.DataCaptureMetadata + directory string + metaData *v1.DataCaptureMetadata nextFile *ProgFile lock sync.Mutex maxCaptureFileSize int64 @@ -25,8 +25,8 @@ type CaptureBuffer struct { // NewCaptureBuffer returns a new Buffer. func NewCaptureBuffer(dir string, md *v1.DataCaptureMetadata, maxCaptureFileSize int64) *CaptureBuffer { return &CaptureBuffer{ - Directory: dir, - MetaData: md, + directory: dir, + metaData: md, maxCaptureFileSize: maxCaptureFileSize, } } @@ -47,12 +47,13 @@ func isBinary(item *v1.SensorData) bool { return false } } + func (b *CaptureBuffer) Write(item *v1.SensorData) error { b.lock.Lock() defer b.lock.Unlock() if isBinary(item) { - binFile, err := NewProgFile(b.Directory, b.MetaData) + binFile, err := NewProgFile(b.directory, b.metaData) if err != nil { return err } @@ -66,7 +67,7 @@ func (b *CaptureBuffer) Write(item *v1.SensorData) error { } if b.nextFile == nil { - nextFile, err := NewProgFile(b.Directory, b.MetaData) + nextFile, err := NewProgFile(b.directory, b.metaData) if err != nil { return err } @@ -74,11 +75,11 @@ func (b *CaptureBuffer) Write(item *v1.SensorData) error { // We want to special case on "CaptureAllFromCamera" because it is sensor data that contains images // and their corresponding annotations. We want each image and its annotations to be stored in a // separate file. - } else if b.nextFile.Size() > b.maxCaptureFileSize || b.MetaData.MethodName == "CaptureAllFromCamera" { + } else if b.nextFile.Size() > b.maxCaptureFileSize || b.metaData.MethodName == "CaptureAllFromCamera" { if err := b.nextFile.Close(); err != nil { return err } - nextFile, err := NewProgFile(b.Directory, b.MetaData) + nextFile, err := NewProgFile(b.directory, b.metaData) if err != nil { return err } @@ -104,5 +105,5 @@ func (b *CaptureBuffer) Flush() error { // Path returns the path to the directory containing the backing data capture files. func (b *CaptureBuffer) Path() string { - return b.Directory + return b.directory } diff --git a/data/capture_buffer_test.go b/data/capture_buffer_test.go index a89f85b0c60..0cfdddac1a4 100644 --- a/data/capture_buffer_test.go +++ b/data/capture_buffer_test.go @@ -267,7 +267,7 @@ func TestCaptureBufferReader(t *testing.T) { test.That(t, err, test.ShouldBeNil) defer func() { utils.UncheckedError(f.Close()) }() - cf, err := ReadCaptureFile(f) + cf, err := NewCaptureFile(f) test.That(t, err, test.ShouldBeNil) test.That(t, cf.ReadMetadata(), test.ShouldResemble, readImageCaptureMetadata) @@ -335,7 +335,7 @@ func TestCaptureBufferReader(t *testing.T) { test.That(t, err, test.ShouldBeNil) defer func() { utils.UncheckedError(f2.Close()) }() - cf2, err := ReadCaptureFile(f2) + cf2, err := NewCaptureFile(f2) test.That(t, err, test.ShouldBeNil) test.That(t, cf2.ReadMetadata(), test.ShouldResemble, readImageCaptureMetadata) @@ -470,7 +470,7 @@ func TestCaptureBufferReader(t *testing.T) { test.That(t, err, test.ShouldBeNil) defer func() { utils.UncheckedError(f.Close()) }() - cf, err := ReadCaptureFile(f) + cf, err := NewCaptureFile(f) test.That(t, err, test.ShouldBeNil) test.That(t, cf.ReadMetadata(), test.ShouldResemble, readImageCaptureMetadata) @@ -508,7 +508,7 @@ func TestCaptureBufferReader(t *testing.T) { test.That(t, err, test.ShouldBeNil) defer func() { utils.UncheckedError(f2.Close()) }() - cf2, err := ReadCaptureFile(f2) + cf2, err := NewCaptureFile(f2) test.That(t, err, test.ShouldBeNil) test.That(t, cf2.ReadMetadata(), test.ShouldResemble, readImageCaptureMetadata) @@ -563,7 +563,7 @@ func TestCaptureBufferReader(t *testing.T) { f3, err := os.Open(filepath.Join(b.Path(), newFileNames[0])) test.That(t, err, test.ShouldBeNil) defer func() { utils.UncheckedError(f3.Close()) }() - cf3, err := ReadCaptureFile(f3) + cf3, err := NewCaptureFile(f3) test.That(t, err, test.ShouldBeNil) test.That(t, cf3.ReadMetadata(), test.ShouldResemble, readImageCaptureMetadata) sd3, err := cf3.ReadNext() @@ -576,7 +576,7 @@ func TestCaptureBufferReader(t *testing.T) { f4, err := os.Open(filepath.Join(b.Path(), newFileNames[1])) test.That(t, err, test.ShouldBeNil) defer func() { utils.UncheckedError(f4.Close()) }() - cf4, err := ReadCaptureFile(f4) + cf4, err := NewCaptureFile(f4) test.That(t, err, test.ShouldBeNil) test.That(t, cf4.ReadMetadata(), test.ShouldResemble, readImageCaptureMetadata) sd4, err := cf4.ReadNext() @@ -644,7 +644,7 @@ func TestCaptureBufferReader(t *testing.T) { test.That(t, err, test.ShouldBeNil) defer func() { utils.UncheckedError(f.Close()) }() - cf2, err := ReadCaptureFile(f) + cf2, err := NewCaptureFile(f) test.That(t, err, test.ShouldBeNil) test.That(t, cf2.ReadMetadata(), test.ShouldResemble, readImageCaptureMetadata) @@ -657,7 +657,75 @@ func TestCaptureBufferReader(t *testing.T) { }) } -//nolint +func NickTest(t *testing.T) { + tmpDir := t.TempDir() + name := resource.NewName(resource.APINamespaceRDK.WithComponentType("camera"), "my-cam") + method := readImage + additionalParams := map[string]string{"mime_type": rutils.MimeTypeJPEG, "test": "1"} + tags := []string{"my", "tags"} + methodParams, err := rprotoutils.ConvertStringMapToAnyPBMap(additionalParams) + test.That(t, err, test.ShouldBeNil) + + readImageCaptureMetadata := BuildCaptureMetadata( + name.API, + name.ShortName(), + method, + additionalParams, + methodParams, + tags, + ) + + test.That(t, readImageCaptureMetadata, test.ShouldResemble, &v1.DataCaptureMetadata{ + ComponentName: "my-cam", + ComponentType: "rdk:component:camera", + MethodName: readImage, + MethodParameters: methodParams, + Tags: tags, + Type: v1.DataType_DATA_TYPE_BINARY_SENSOR, + FileExtension: ".jpeg", + }) + + b := NewCaptureBuffer(tmpDir, readImageCaptureMetadata, int64(4*1024)) + + // Path() is the same as the first paramenter passed to NewCaptureBuffer + test.That(t, b.Path(), test.ShouldResemble, tmpDir) + test.That(t, b.MetaData, test.ShouldResemble, readImageCaptureMetadata) + + now := time.Now() + timeRequested := timestamppb.New(now.UTC()) + timeReceived := timestamppb.New(now.Add(time.Millisecond).UTC()) + msg := &v1.SensorData{ + Metadata: &v1.SensorMetadata{ + TimeRequested: timeRequested, + TimeReceived: timeReceived, + }, + Data: &v1.SensorData_Binary{ + Binary: []byte("this is a fake image"), + }, + } + test.That(t, b.Write(msg), test.ShouldBeNil) + test.That(t, b.Flush(), test.ShouldBeNil) + dirEntries, err := os.ReadDir(b.Path()) + test.That(t, err, test.ShouldBeNil) + test.That(t, len(dirEntries), test.ShouldEqual, 1) + test.That(t, filepath.Ext(dirEntries[0].Name()), test.ShouldResemble, CompletedCaptureFileExt) + f, err := os.Open(filepath.Join(b.Path(), dirEntries[0].Name())) + test.That(t, err, test.ShouldBeNil) + defer func() { test.That(t, f.Close(), test.ShouldBeNil) }() + + cf2, err := NewCaptureFile(f) + test.That(t, err, test.ShouldBeNil) + test.That(t, cf2.ReadMetadata(), test.ShouldResemble, readImageCaptureMetadata) + + sd2, err := cf2.ReadNext() + test.That(t, err, test.ShouldBeNil) + test.That(t, sd2, test.ShouldResemble, msg) + + _, err = cf2.ReadNext() + test.That(t, err, test.ShouldBeError, io.EOF) +} + +// nolint func getCaptureFiles(dir string) (dcFiles, progFiles []string) { _ = filepath.Walk(dir, func(path string, info os.FileInfo, err error) error { if err != nil { diff --git a/data/capture_file.go b/data/capture_file.go index 7019f754db2..97a9c915343 100644 --- a/data/capture_file.go +++ b/data/capture_file.go @@ -39,18 +39,18 @@ const ( // length delimited protobuf messages, where the first message is the CaptureMetadata for the file, and ensuing // messages contain the captured data. type CaptureFile struct { - path string - lock sync.Mutex - file *os.File - size int64 - metadata *v1.DataCaptureMetadata - + Metadata *v1.DataCaptureMetadata + path string + size int64 initialReadOffset int64 - readOffset int64 + + lock sync.Mutex + file *os.File + readOffset int64 } -// ReadCaptureFile creates a File struct from a passed os.File previously constructed using NewFile. -func ReadCaptureFile(f *os.File) (*CaptureFile, error) { +// NewCaptureFile creates a File struct from a passed os.File previously constructed using NewFile. +func NewCaptureFile(f *os.File) (*CaptureFile, error) { if !IsDataCaptureFile(f) { return nil, errors.Errorf("%s is not a data capture file", f.Name()) } @@ -69,7 +69,7 @@ func ReadCaptureFile(f *os.File) (*CaptureFile, error) { path: f.Name(), file: f, size: finfo.Size(), - metadata: md, + Metadata: md, initialReadOffset: int64(initOffset), readOffset: int64(initOffset), } @@ -79,7 +79,7 @@ func ReadCaptureFile(f *os.File) (*CaptureFile, error) { // ReadMetadata reads and returns the metadata in f. func (f *CaptureFile) ReadMetadata() *v1.DataCaptureMetadata { - return f.metadata + return f.Metadata } // ReadNext returns the next SensorData reading. @@ -109,8 +109,6 @@ func (f *CaptureFile) Reset() { // Size returns the size of the file. func (f *CaptureFile) Size() int64 { - f.lock.Lock() - defer f.lock.Unlock() return f.size } @@ -139,22 +137,6 @@ func IsDataCaptureFile(f *os.File) bool { return filepath.Ext(f.Name()) == CompletedCaptureFileExt } -// Create a filename based on the current time. -func getFileTimestampName() string { - // RFC3339Nano is a standard time format e.g. 2006-01-02T15:04:05Z07:00. - return time.Now().Format(time.RFC3339Nano) -} - -// TODO DATA-246: Implement this in some more robust, programmatic way. -func getDataType(methodName string) v1.DataType { - switch methodName { - case nextPointCloud, readImage, pointCloudMap, GetImages: - return v1.DataType_DATA_TYPE_BINARY_SENSOR - default: - return v1.DataType_DATA_TYPE_TABULAR_SENSOR - } -} - // GetFileExt gets the file extension for a capture file. func GetFileExt(dataType v1.DataType, methodName string, parameters map[string]string) string { defaultFileExt := "" @@ -188,22 +170,44 @@ func GetFileExt(dataType v1.DataType, methodName string, parameters map[string]s return defaultFileExt } -// SensorDataFromCaptureFilePath returns all readings in the file at filePath. -// NOTE: (Nick S) At time of writing this is only used in tests. -func SensorDataFromCaptureFilePath(filePath string) ([]*v1.SensorData, error) { - //nolint:gosec - f, err := os.Open(filePath) - if err != nil { - return nil, err - } - dcFile, err := ReadCaptureFile(f) - if err != nil { - return nil, err - } +// FilePathWithReplacedReservedChars returns the filepath with substitutions +// for reserved characters. +func FilePathWithReplacedReservedChars(filepath string) string { + return strings.ReplaceAll(filepath, filePathReservedChars, "_") +} - return SensorDataFromCaptureFile(dcFile) +// Create a filename based on the current time. +func getFileTimestampName() string { + // RFC3339Nano is a standard time format e.g. 2006-01-02T15:04:05Z07:00. + return time.Now().Format(time.RFC3339Nano) +} + +// TODO DATA-246: Implement this in some more robust, programmatic way. +func getDataType(methodName string) v1.DataType { + switch methodName { + case nextPointCloud, readImage, pointCloudMap, GetImages: + return v1.DataType_DATA_TYPE_BINARY_SENSOR + default: + return v1.DataType_DATA_TYPE_TABULAR_SENSOR + } } +// SensorDataFromCaptureFilePath returns all readings in the file at filePath. +// NOTE: (Nick S) At time of writing this is only used in tests. +//func SensorDataFromCaptureFilePath(filePath string) ([]*v1.SensorData, error) { +// //nolint:gosec +// f, err := os.Open(filePath) +// if err != nil { +// return nil, err +// } +// dcFile, err := NewCaptureFile(f) +// if err != nil { +// return nil, err +// } + +// return SensorDataFromCaptureFile(dcFile) +//} + // SensorDataFromCaptureFile returns all readings in f. func SensorDataFromCaptureFile(f *CaptureFile) ([]*v1.SensorData, error) { f.Reset() @@ -222,9 +226,3 @@ func SensorDataFromCaptureFile(f *CaptureFile) ([]*v1.SensorData, error) { } return ret, nil } - -// CaptureFilePathWithReplacedReservedChars returns the filepath with substitutions -// for reserved characters. -func CaptureFilePathWithReplacedReservedChars(filepath string) string { - return strings.ReplaceAll(filepath, filePathReservedChars, "_") -} diff --git a/data/capture_file_test.go b/data/capture_file_test.go index bd70c3a760f..6f704f590a2 100644 --- a/data/capture_file_test.go +++ b/data/capture_file_test.go @@ -170,7 +170,7 @@ func TestReadCorruptedFile(t *testing.T) { test.That(t, f.Close(), test.ShouldBeNil) // Should still be able to successfully read all the successfully written data. - sd, err := SensorDataFromCaptureFilePath(captureFilePath(f.GetPath())) + sd, err := SensorDataFromCaptureFilePath(captureFilePath(f.path)) test.That(t, err, test.ShouldBeNil) test.That(t, len(sd), test.ShouldEqual, numReadings) } diff --git a/data/prog_file.go b/data/prog_file.go new file mode 100644 index 00000000000..5924173a467 --- /dev/null +++ b/data/prog_file.go @@ -0,0 +1,117 @@ +package data + +import ( + "bufio" + "os" + "path/filepath" + "strings" + "sync" + + "github.com/matttproud/golang_protobuf_extensions/pbutil" + v1 "go.viam.com/api/app/datasync/v1" + "google.golang.org/protobuf/types/known/anypb" + + "go.viam.com/rdk/resource" +) + +// BuildCaptureMetadata builds a DataCaptureMetadata object. +func BuildCaptureMetadata( + compAPI resource.API, + compName string, + method string, + additionalParams map[string]string, + methodParams map[string]*anypb.Any, + tags []string, +) *v1.DataCaptureMetadata { + dataType := getDataType(method) + return &v1.DataCaptureMetadata{ + ComponentType: compAPI.String(), + ComponentName: compName, + MethodName: method, + Type: dataType, + MethodParameters: methodParams, + FileExtension: GetFileExt(dataType, method, additionalParams), + Tags: tags, + } +} + +// TODO Data-343: Reorganize this into a more standard interface/package, and add tests. + +// ProgFile is the data structure containing data captured by collectors. It is backed by a file on disk containing +// length delimited protobuf messages, where the first message is the CaptureMetadata for the file, and ensuing +// messages contain the captured data. +type ProgFile struct { + path string + lock sync.Mutex + file *os.File + writer *bufio.Writer + size int64 + writeOffset int64 +} + +// NewProgFile creates a new *ProgFile with the specified md in the specified directory. +func NewProgFile(dir string, md *v1.DataCaptureMetadata) (*ProgFile, error) { + fileName := FilePathWithReplacedReservedChars( + filepath.Join(dir, getFileTimestampName()) + InProgressCaptureFileExt) + //nolint:gosec + f, err := os.OpenFile(fileName, os.O_APPEND|os.O_RDWR|os.O_CREATE, 0o600) + if err != nil { + return nil, err + } + + // Then write first metadata message to the file. + n, err := pbutil.WriteDelimited(f, md) + if err != nil { + return nil, err + } + return &ProgFile{ + path: f.Name(), + writer: bufio.NewWriter(f), + file: f, + size: int64(n), + writeOffset: int64(n), + }, nil +} + +// WriteNext writes the next SensorData reading. +func (f *ProgFile) WriteNext(data *v1.SensorData) error { + f.lock.Lock() + defer f.lock.Unlock() + + if _, err := f.file.Seek(f.writeOffset, 0); err != nil { + return err + } + n, err := pbutil.WriteDelimited(f.writer, data) + if err != nil { + return err + } + f.size += int64(n) + f.writeOffset += int64(n) + return nil +} + +// Size returns the size of the file. +func (f *ProgFile) Size() int64 { + f.lock.Lock() + defer f.lock.Unlock() + return f.size +} + +// Close closes the file. +func (f *ProgFile) Close() error { + f.lock.Lock() + defer f.lock.Unlock() + if err := f.writer.Flush(); err != nil { + return err + } + + // Rename file to indicate that it is done being written. + if err := os.Rename(f.file.Name(), captureFilePath(f.file.Name())); err != nil { + return err + } + return f.file.Close() +} + +func captureFilePath(name string) string { + return strings.TrimSuffix(name, filepath.Ext(name)) + CompletedCaptureFileExt +} From ce4600f7b35e21f03c584a54ff0d92d9e3fc411b Mon Sep 17 00:00:00 2001 From: Nick Sanford Date: Wed, 9 Oct 2024 16:57:31 -0400 Subject: [PATCH 3/6] wip --- data/capture_buffer_test.go | 6 ++--- data/capture_file.go | 26 +++++++++---------- .../datamanager/builtin/builtin_sync_test.go | 2 +- .../datamanager/builtin/capture/capture.go | 2 +- services/datamanager/builtin/sync/sync.go | 2 +- 5 files changed, 19 insertions(+), 19 deletions(-) diff --git a/data/capture_buffer_test.go b/data/capture_buffer_test.go index 0cfdddac1a4..b0c48cf66ef 100644 --- a/data/capture_buffer_test.go +++ b/data/capture_buffer_test.go @@ -620,7 +620,7 @@ func TestCaptureBufferReader(t *testing.T) { // Path() is the same as the first paramenter passed to NewCaptureBuffer test.That(t, b.Path(), test.ShouldResemble, tmpDir) - test.That(t, b.MetaData, test.ShouldResemble, readImageCaptureMetadata) + test.That(t, b.metaData, test.ShouldResemble, readImageCaptureMetadata) now := time.Now() timeRequested := timestamppb.New(now.UTC()) @@ -689,7 +689,7 @@ func NickTest(t *testing.T) { // Path() is the same as the first paramenter passed to NewCaptureBuffer test.That(t, b.Path(), test.ShouldResemble, tmpDir) - test.That(t, b.MetaData, test.ShouldResemble, readImageCaptureMetadata) + test.That(t, b.metaData, test.ShouldResemble, readImageCaptureMetadata) now := time.Now() timeRequested := timestamppb.New(now.UTC()) @@ -725,7 +725,7 @@ func NickTest(t *testing.T) { test.That(t, err, test.ShouldBeError, io.EOF) } -// nolint +//nolint func getCaptureFiles(dir string) (dcFiles, progFiles []string) { _ = filepath.Walk(dir, func(path string, info os.FileInfo, err error) error { if err != nil { diff --git a/data/capture_file.go b/data/capture_file.go index 97a9c915343..914a2f97eb5 100644 --- a/data/capture_file.go +++ b/data/capture_file.go @@ -194,19 +194,19 @@ func getDataType(methodName string) v1.DataType { // SensorDataFromCaptureFilePath returns all readings in the file at filePath. // NOTE: (Nick S) At time of writing this is only used in tests. -//func SensorDataFromCaptureFilePath(filePath string) ([]*v1.SensorData, error) { -// //nolint:gosec -// f, err := os.Open(filePath) -// if err != nil { -// return nil, err -// } -// dcFile, err := NewCaptureFile(f) -// if err != nil { -// return nil, err -// } - -// return SensorDataFromCaptureFile(dcFile) -//} +func SensorDataFromCaptureFilePath(filePath string) ([]*v1.SensorData, error) { + //nolint:gosec + f, err := os.Open(filePath) + if err != nil { + return nil, err + } + dcFile, err := NewCaptureFile(f) + if err != nil { + return nil, err + } + + return SensorDataFromCaptureFile(dcFile) +} // SensorDataFromCaptureFile returns all readings in f. func SensorDataFromCaptureFile(f *CaptureFile) ([]*v1.SensorData, error) { diff --git a/services/datamanager/builtin/builtin_sync_test.go b/services/datamanager/builtin/builtin_sync_test.go index 844b8772095..4b33cf466f3 100644 --- a/services/datamanager/builtin/builtin_sync_test.go +++ b/services/datamanager/builtin/builtin_sync_test.go @@ -1055,7 +1055,7 @@ func getCapturedData(filePaths []string) (int, []*v1.SensorData, error) { if err != nil { return 0, nil, err } - dcFile, err := data.ReadCaptureFile(osFile) + dcFile, err := data.NewCaptureFile(osFile) if err != nil { return 0, nil, err } diff --git a/services/datamanager/builtin/capture/capture.go b/services/datamanager/builtin/capture/capture.go index df81cdc3a75..c2c1abf613b 100644 --- a/services/datamanager/builtin/capture/capture.go +++ b/services/datamanager/builtin/capture/capture.go @@ -269,7 +269,7 @@ func collectorConfigDescription( } func targetDir(captureDir string, collectorConfig datamanager.DataCaptureConfig) string { - return data.CaptureFilePathWithReplacedReservedChars( + return data.FilePathWithReplacedReservedChars( filepath.Join(captureDir, collectorConfig.Name.API.String(), collectorConfig.Name.ShortName(), collectorConfig.Method)) } diff --git a/services/datamanager/builtin/sync/sync.go b/services/datamanager/builtin/sync/sync.go index d99d7097fd2..80568551462 100644 --- a/services/datamanager/builtin/sync/sync.go +++ b/services/datamanager/builtin/sync/sync.go @@ -367,7 +367,7 @@ func (s *Sync) syncFile(config Config, filePath string) { } func (s *Sync) syncDataCaptureFile(f *os.File, captureDir string, logger logging.Logger) { - captureFile, err := data.ReadCaptureFile(f) + captureFile, err := data.NewCaptureFile(f) // if you can't read the capture file's metadata field, close & move it to the failed directory if err != nil { cause := errors.Wrap(err, "ReadCaptureFile failed") From 7b9f28d50631799159ab292cbe04f8d0dba942c1 Mon Sep 17 00:00:00 2001 From: Nick Sanford Date: Tue, 15 Oct 2024 17:32:03 -0400 Subject: [PATCH 4/6] wip --- data/capture_buffer_test.go | 301 ++++++++++++++++++++++++++++-------- data/capture_file.go | 177 +++++++++++++++++++++ 2 files changed, 415 insertions(+), 63 deletions(-) diff --git a/data/capture_buffer_test.go b/data/capture_buffer_test.go index b0c48cf66ef..bf08ba54e24 100644 --- a/data/capture_buffer_test.go +++ b/data/capture_buffer_test.go @@ -1,6 +1,7 @@ package data import ( + "crypto/sha1" "errors" "io" "os" @@ -657,75 +658,249 @@ func TestCaptureBufferReader(t *testing.T) { }) } -func NickTest(t *testing.T) { - tmpDir := t.TempDir() - name := resource.NewName(resource.APINamespaceRDK.WithComponentType("camera"), "my-cam") - method := readImage - additionalParams := map[string]string{"mime_type": rutils.MimeTypeJPEG, "test": "1"} - tags := []string{"my", "tags"} - methodParams, err := rprotoutils.ConvertStringMapToAnyPBMap(additionalParams) - test.That(t, err, test.ShouldBeNil) - - readImageCaptureMetadata := BuildCaptureMetadata( - name.API, - name.ShortName(), - method, - additionalParams, - methodParams, - tags, - ) - - test.That(t, readImageCaptureMetadata, test.ShouldResemble, &v1.DataCaptureMetadata{ - ComponentName: "my-cam", - ComponentType: "rdk:component:camera", - MethodName: readImage, - MethodParameters: methodParams, - Tags: tags, - Type: v1.DataType_DATA_TYPE_BINARY_SENSOR, - FileExtension: ".jpeg", - }) +func BenchmarkBinaryReader(b *testing.B) { + type testCase struct { + name string + data []byte + } + eightKBFilled := make([]byte, 1024*8, 1024*8) + for i := range eightKBFilled { + eightKBFilled[i] = uint8(i % 256) + } - b := NewCaptureBuffer(tmpDir, readImageCaptureMetadata, int64(4*1024)) + oneMbFilled := make([]byte, 1024*1000, 1024*1000) + for i := range eightKBFilled { + oneMbFilled[i] = uint8(i % 256) + } - // Path() is the same as the first paramenter passed to NewCaptureBuffer - test.That(t, b.Path(), test.ShouldResemble, tmpDir) - test.That(t, b.metaData, test.ShouldResemble, readImageCaptureMetadata) + eightMbFilled := make([]byte, 1024*1000*8, 1024*1000*8) + for i := range eightMbFilled { + eightMbFilled[i] = uint8(i % 256) + } - now := time.Now() - timeRequested := timestamppb.New(now.UTC()) - timeReceived := timestamppb.New(now.Add(time.Millisecond).UTC()) - msg := &v1.SensorData{ - Metadata: &v1.SensorMetadata{ - TimeRequested: timeRequested, - TimeReceived: timeReceived, - }, - Data: &v1.SensorData_Binary{ - Binary: []byte("this is a fake image"), - }, + tcs := []testCase{ + {"empty data", []byte{}}, + {"small data", []byte("this is a fake image")}, + {"8kb empty", make([]byte, 1024*8, 1024*8)}, + {"8kb filled", eightKBFilled}, + {"1mb empty", make([]byte, 1024*1000, 1024*1000)}, + {"1mb filled", oneMbFilled}, + {"8mb empty", make([]byte, 1024*1000*8, 1024*1000*8)}, + {"8mb filled", eightMbFilled}, } - test.That(t, b.Write(msg), test.ShouldBeNil) - test.That(t, b.Flush(), test.ShouldBeNil) - dirEntries, err := os.ReadDir(b.Path()) - test.That(t, err, test.ShouldBeNil) - test.That(t, len(dirEntries), test.ShouldEqual, 1) - test.That(t, filepath.Ext(dirEntries[0].Name()), test.ShouldResemble, CompletedCaptureFileExt) - f, err := os.Open(filepath.Join(b.Path(), dirEntries[0].Name())) - test.That(t, err, test.ShouldBeNil) - defer func() { test.That(t, f.Close(), test.ShouldBeNil) }() - - cf2, err := NewCaptureFile(f) - test.That(t, err, test.ShouldBeNil) - test.That(t, cf2.ReadMetadata(), test.ShouldResemble, readImageCaptureMetadata) - - sd2, err := cf2.ReadNext() - test.That(t, err, test.ShouldBeNil) - test.That(t, sd2, test.ShouldResemble, msg) - - _, err = cf2.ReadNext() - test.That(t, err, test.ShouldBeError, io.EOF) + + for _, tc := range tcs { + s := sha1.New() + _, err := s.Write(tc.data) + test.That(b, err, test.ShouldBeNil) + expectedHash := s.Sum(nil) + b.ResetTimer() + b.Run(tc.name+" read entire binary", func(b *testing.B) { + for i := 0; i < b.N; i++ { + tmpDir := b.TempDir() + name := resource.NewName(resource.APINamespaceRDK.WithComponentType("camera"), "my-cam") + additionalParams := map[string]string{"mime_type": rutils.MimeTypeJPEG, "test": "1"} + methodParams, err := rprotoutils.ConvertStringMapToAnyPBMap(additionalParams) + test.That(b, err, test.ShouldBeNil) + + readImageCaptureMetadata := BuildCaptureMetadata( + name.API, + name.ShortName(), + readImage, + additionalParams, + methodParams, + []string{"my", "tags"}, + ) + + now := time.Now() + timeRequested := timestamppb.New(now.UTC()) + timeReceived := timestamppb.New(now.Add(time.Millisecond).UTC()) + msg := &v1.SensorData{ + Metadata: &v1.SensorMetadata{ + TimeRequested: timeRequested, + TimeReceived: timeReceived, + }, + Data: &v1.SensorData_Binary{ + Binary: tc.data, + }, + } + + buf := NewCaptureBuffer(tmpDir, readImageCaptureMetadata, int64(4*1024)) + + // Path() is the same as the first paramenter passed to NewCaptureBuffer + test.That(b, buf.Path(), test.ShouldResemble, tmpDir) + test.That(b, buf.metaData, test.ShouldResemble, readImageCaptureMetadata) + + test.That(b, buf.Write(msg), test.ShouldBeNil) + test.That(b, buf.Flush(), test.ShouldBeNil) + dirEntries, err := os.ReadDir(buf.Path()) + test.That(b, err, test.ShouldBeNil) + test.That(b, len(dirEntries), test.ShouldEqual, 1) + test.That(b, filepath.Ext(dirEntries[0].Name()), test.ShouldResemble, CompletedCaptureFileExt) + f, err := os.Open(filepath.Join(buf.Path(), dirEntries[0].Name())) + test.That(b, err, test.ShouldBeNil) + defer func() { test.That(b, f.Close(), test.ShouldBeNil) }() + + cf2, err := NewCaptureFile(f) + test.That(b, err, test.ShouldBeNil) + test.That(b, cf2.ReadMetadata(), test.ShouldResemble, readImageCaptureMetadata) + + next, err := cf2.ReadNext() + test.That(b, err, test.ShouldBeNil) + test.That(b, next.GetMetadata(), test.ShouldResemble, msg.GetMetadata()) + h := sha1.New() + _, err = h.Write(next.GetBinary()) + test.That(b, err, test.ShouldBeNil) + actualHash := h.Sum(nil) + test.That(b, actualHash, test.ShouldResemble, expectedHash) + } + }) + // b.Run(tc.name+" chunked", func(b *testing.B) { + // for i := 0; i < b.N; i++ { + // tmpDir := b.TempDir() + // name := resource.NewName(resource.APINamespaceRDK.WithComponentType("camera"), "my-cam") + // additionalParams := map[string]string{"mime_type": rutils.MimeTypeJPEG, "test": "1"} + // methodParams, err := rprotoutils.ConvertStringMapToAnyPBMap(additionalParams) + // test.That(b, err, test.ShouldBeNil) + + // readImageCaptureMetadata := BuildCaptureMetadata( + // name.API, + // name.ShortName(), + // readImage, + // additionalParams, + // methodParams, + // []string{"my", "tags"}, + // ) + + // now := time.Now() + // timeRequested := timestamppb.New(now.UTC()) + // timeReceived := timestamppb.New(now.Add(time.Millisecond).UTC()) + // msg := &v1.SensorData{ + // Metadata: &v1.SensorMetadata{ + // TimeRequested: timeRequested, + // TimeReceived: timeReceived, + // }, + // Data: &v1.SensorData_Binary{ + // Binary: tc.data, + // }, + // } + + // buf := NewCaptureBuffer(tmpDir, readImageCaptureMetadata, int64(4*1024)) + + // // Path() is the same as the first paramenter passed to NewCaptureBuffer + // test.That(b, buf.Path(), test.ShouldResemble, tmpDir) + // test.That(b, buf.metaData, test.ShouldResemble, readImageCaptureMetadata) + + // test.That(b, buf.Write(msg), test.ShouldBeNil) + // test.That(b, buf.Flush(), test.ShouldBeNil) + // dirEntries, err := os.ReadDir(buf.Path()) + // test.That(b, err, test.ShouldBeNil) + // test.That(b, len(dirEntries), test.ShouldEqual, 1) + // test.That(b, filepath.Ext(dirEntries[0].Name()), test.ShouldResemble, CompletedCaptureFileExt) + // f, err := os.Open(filepath.Join(buf.Path(), dirEntries[0].Name())) + // test.That(b, err, test.ShouldBeNil) + // defer func() { test.That(b, f.Close(), test.ShouldBeNil) }() + + // cf2, err := NewCaptureFile(f) + // test.That(b, err, test.ShouldBeNil) + // test.That(b, cf2.ReadMetadata(), test.ShouldResemble, readImageCaptureMetadata) + + // var md v1.SensorMetadata + // r, err := cf2.BinaryReader(&md) + // test.That(b, err, test.ShouldBeNil) + // test.That(b, r, test.ShouldNotBeNil) + // test.That(b, &md, test.ShouldResemble, msg.GetMetadata()) + // data, err := io.ReadAll(r) + // test.That(b, err, test.ShouldBeNil) + // test.That(b, data, test.ShouldResemble, msg.GetBinary()) + // } + // }) + } +} + +func FuzzBinaryReader(f *testing.F) { + eightKBFilled := make([]byte, 1024*8, 1024*8) + for i := range eightKBFilled { + eightKBFilled[i] = uint8(i % 256) + } + + eightMbFilled := make([]byte, 1024*1000*8, 1024*1000*8) + for i := range eightMbFilled { + eightMbFilled[i] = uint8(i % 256) + } + + tcs := [][]byte{ + []byte{}, + []byte("this is a fake image"), + make([]byte, 1024*8, 1024*8), + eightKBFilled, + make([]byte, 1024*1000*8, 1024*1000*8), + } + + for _, tc := range tcs { + f.Add(tc) + } + f.Fuzz(func(t *testing.T, binary []byte) { + tmpDir := t.TempDir() + name := resource.NewName(resource.APINamespaceRDK.WithComponentType("camera"), "my-cam") + additionalParams := map[string]string{"mime_type": rutils.MimeTypeJPEG, "test": "1"} + methodParams, err := rprotoutils.ConvertStringMapToAnyPBMap(additionalParams) + test.That(t, err, test.ShouldBeNil) + + readImageCaptureMetadata := BuildCaptureMetadata( + name.API, + name.ShortName(), + readImage, + additionalParams, + methodParams, + []string{"my", "tags"}, + ) + + now := time.Now() + timeRequested := timestamppb.New(now.UTC()) + timeReceived := timestamppb.New(now.Add(time.Millisecond).UTC()) + msg := &v1.SensorData{ + Metadata: &v1.SensorMetadata{ + TimeRequested: timeRequested, + TimeReceived: timeReceived, + }, + Data: &v1.SensorData_Binary{ + Binary: binary, + }, + } + + b := NewCaptureBuffer(tmpDir, readImageCaptureMetadata, int64(4*1024)) + + // Path() is the same as the first paramenter passed to NewCaptureBuffer + test.That(t, b.Path(), test.ShouldResemble, tmpDir) + test.That(t, b.metaData, test.ShouldResemble, readImageCaptureMetadata) + + test.That(t, b.Write(msg), test.ShouldBeNil) + test.That(t, b.Flush(), test.ShouldBeNil) + dirEntries, err := os.ReadDir(b.Path()) + test.That(t, err, test.ShouldBeNil) + test.That(t, len(dirEntries), test.ShouldEqual, 1) + test.That(t, filepath.Ext(dirEntries[0].Name()), test.ShouldResemble, CompletedCaptureFileExt) + f, err := os.Open(filepath.Join(b.Path(), dirEntries[0].Name())) + test.That(t, err, test.ShouldBeNil) + defer func() { test.That(t, f.Close(), test.ShouldBeNil) }() + + cf2, err := NewCaptureFile(f) + test.That(t, err, test.ShouldBeNil) + test.That(t, cf2.ReadMetadata(), test.ShouldResemble, readImageCaptureMetadata) + + var md v1.SensorMetadata + r, err := cf2.BinaryReader(&md) + test.That(t, err, test.ShouldBeNil) + test.That(t, r, test.ShouldNotBeNil) + test.That(t, &md, test.ShouldResemble, msg.GetMetadata()) + data, err := io.ReadAll(r) + test.That(t, err, test.ShouldBeNil) + test.That(t, data, test.ShouldResemble, msg.GetBinary()) + }) } -//nolint +// nolint func getCaptureFiles(dir string) (dcFiles, progFiles []string) { _ = filepath.Walk(dir, func(path string, info os.FileInfo, err error) error { if err != nil { diff --git a/data/capture_file.go b/data/capture_file.go index 914a2f97eb5..b5c324d8d1f 100644 --- a/data/capture_file.go +++ b/data/capture_file.go @@ -1,6 +1,8 @@ package data import ( + "bufio" + "encoding/binary" "fmt" "io" "os" @@ -12,6 +14,8 @@ import ( "github.com/matttproud/golang_protobuf_extensions/pbutil" "github.com/pkg/errors" v1 "go.viam.com/api/app/datasync/v1" + "google.golang.org/protobuf/encoding/protowire" + "google.golang.org/protobuf/proto" "go.viam.com/rdk/utils" ) @@ -82,6 +86,179 @@ func (f *CaptureFile) ReadMetadata() *v1.DataCaptureMetadata { return f.Metadata } +var errInvalidVarint = errors.New("invalid varint32 encountered") + +func ReadTag(r io.Reader, num *protowire.Number, typ *protowire.Type) (n int, err error) { + // Per AbstractParser#parsePartialDelimitedFrom with + // CodedInputStream#readRawVarint32. + var headerBuf [binary.MaxVarintLen32]byte + var bytesRead, length int + var tagNum protowire.Number + var tagType protowire.Type + for length <= 0 { // i.e. no varint has been decoded yet. + if bytesRead >= len(headerBuf) { + return bytesRead, errInvalidVarint + } + // We have to read byte by byte here to avoid reading more bytes + // than required. Each read byte is appended to what we have + // read before. + newBytesRead, err := r.Read(headerBuf[bytesRead : bytesRead+1]) + if newBytesRead == 0 { + if err != nil { + return bytesRead, err + } + // A Reader should not return (0, nil), but if it does, + // it should be treated as no-op (according to the + // Reader contract). So let's go on... + continue + } + bytesRead += newBytesRead + // Now present everything read so far to the varint decoder and + // see if a varint with a tag type can be decoded already. + tagNum, tagType, length = protowire.ConsumeTag(headerBuf[:bytesRead]) + } + *num = tagNum + *typ = tagType + return bytesRead, nil +} + +// *SensorMetadata +func ReadMessageLength(r io.Reader, m *uint64) (n int, err error) { + // Per AbstractParser#parsePartialDelimitedFrom with + // CodedInputStream#readRawVarint32. + var headerBuf [binary.MaxVarintLen32]byte + var bytesRead, varIntBytes int + var messageLength uint64 + for varIntBytes <= 0 { // i.e. no varint has been decoded yet. + if bytesRead >= len(headerBuf) { + return bytesRead, errInvalidVarint + } + // We have to read byte by byte here to avoid reading more bytes + // than required. Each read byte is appended to what we have + // read before. + newBytesRead, err := r.Read(headerBuf[bytesRead : bytesRead+1]) + if newBytesRead == 0 { + if err != nil { + return bytesRead, err + } + // A Reader should not return (0, nil), but if it does, + // it should be treated as no-op (according to the + // Reader contract). So let's go on... + continue + } + bytesRead += newBytesRead + // Now present everything read so far to the varint decoder and + // see if a varint can be decoded already. + messageLength, varIntBytes = protowire.ConsumeVarint(headerBuf[:bytesRead]) + } + *m = messageLength + return bytesRead, nil +} + +func (f *CaptureFile) BinaryReader(md *v1.SensorMetadata) (io.Reader, error) { + f.lock.Lock() + defer f.lock.Unlock() + + if f.Metadata.Type != v1.DataType_DATA_TYPE_BINARY_SENSOR { + return nil, errors.New("expected CaptureFile to be of type BINARY") + } + + // seek to the first 32 bit varint delimeter + if _, err := f.file.Seek(f.initialReadOffset, io.SeekStart); err != nil { + return nil, err + } + // remove delimiter (we know we will only have one for the binary image) + var topLevelMsgLen uint64 + bytesRead, err := ReadMessageLength(f.file, &topLevelMsgLen) + if err != nil { + return nil, err + } + actualLen := f.size - (f.initialReadOffset + int64(bytesRead)) + if int64(topLevelMsgLen) != actualLen { + return nil, fmt.Errorf("binary capture file payload described as having byte size %d, actual size: %d", topLevelMsgLen, actualLen) + } + // now we parse the *v1.SensorMetadata and the binary payload of a binary *v1.SensorData + var ( + tagNum protowire.Number + tagType protowire.Type + n int + ) + n, err = ReadTag(f.file, &tagNum, &tagType) + bytesRead += n + if err != nil { + return nil, err + } + + if !tagNum.IsValid() { + return nil, fmt.Errorf("tagNum %d is invalid", tagNum) + } + + // TODO: Techically it isn't guranteed this value will be 1 + // but this code currently assumes it will for simplicity + // see: https://protobuf.dev/programming-guides/encoding/#optional + if tagNum != 1 { + return nil, fmt.Errorf("expected tagNum to be 1 but instead it is %d", tagNum) + } + + // expected LEN type https://protobuf.dev/programming-guides/encoding/#structure + // in this case an embedded message + if tagType != protowire.BytesType { + return nil, fmt.Errorf("expected tagNum 1 to have LEN wire type, instead it has wire type: %d", tagType) + } + + var sensorMDLen uint64 + n, err = ReadMessageLength(f.file, &sensorMDLen) + bytesRead += n + if err != nil { + return nil, err + } + sensorMDBytes := make([]byte, sensorMDLen) + n, err = io.ReadFull(f.file, sensorMDBytes) + bytesRead += n + if err != nil { + return nil, err + } + err = proto.Unmarshal(sensorMDBytes, md) + if err != nil { + return nil, err + } + + var ( + payloadTagNum protowire.Number + payloadTagType protowire.Type + ) + n, err = ReadTag(f.file, &payloadTagNum, &payloadTagType) + bytesRead += n + if err != nil { + return nil, err + } + + if !payloadTagNum.IsValid() { + return nil, fmt.Errorf("payloadTagNum %d is invalid", payloadTagNum) + } + + // should be 3 as that is the field number of v1.SensorData's binary oneof + if payloadTagNum != 3 { + return nil, fmt.Errorf("expected payloadTagNum to be 3 but was actually %d", payloadTagNum) + } + + if payloadTagType != protowire.BytesType { + return nil, fmt.Errorf("expected payloadTagType LEN wire type, instead it has wire type: %d", payloadTagType) + } + + var payloadLen uint64 + n, err = ReadMessageLength(f.file, &payloadLen) + bytesRead += n + if err != nil { + return nil, err + } + actualPayloadLen := f.size - (f.initialReadOffset + int64(bytesRead)) + if int64(payloadLen) != actualPayloadLen { + return nil, fmt.Errorf("capture file contains incomplete binary payload or data after the binary payload, payloadLength described in capture file: %d, actual payload length: %d, filesize: %d, bytesRead: %d", payloadLen, actualPayloadLen, f.size, bytesRead) + } + return bufio.NewReader(f.file), nil +} + // ReadNext returns the next SensorData reading. func (f *CaptureFile) ReadNext() (*v1.SensorData, error) { f.lock.Lock() From 40cdc6d8cd940f44365042a4c74dadff606b36df Mon Sep 17 00:00:00 2001 From: Nick Sanford Date: Wed, 16 Oct 2024 15:17:00 -0400 Subject: [PATCH 5/6] wip --- data/capture_buffer_test.go | 276 +++++++++++++++++++++--------------- data/capture_file.go | 26 ++-- 2 files changed, 181 insertions(+), 121 deletions(-) diff --git a/data/capture_buffer_test.go b/data/capture_buffer_test.go index bf08ba54e24..82cec5fea64 100644 --- a/data/capture_buffer_test.go +++ b/data/capture_buffer_test.go @@ -658,22 +658,22 @@ func TestCaptureBufferReader(t *testing.T) { }) } -func BenchmarkBinaryReader(b *testing.B) { +func BenchmarkChunked(b *testing.B) { type testCase struct { name string data []byte } - eightKBFilled := make([]byte, 1024*8, 1024*8) + eightKBFilled := make([]byte, 1024*8) for i := range eightKBFilled { eightKBFilled[i] = uint8(i % 256) } - oneMbFilled := make([]byte, 1024*1000, 1024*1000) + oneMbFilled := make([]byte, 1024*1000) for i := range eightKBFilled { oneMbFilled[i] = uint8(i % 256) } - eightMbFilled := make([]byte, 1024*1000*8, 1024*1000*8) + eightMbFilled := make([]byte, 1024*1000*8) for i := range eightMbFilled { eightMbFilled[i] = uint8(i % 256) } @@ -681,11 +681,11 @@ func BenchmarkBinaryReader(b *testing.B) { tcs := []testCase{ {"empty data", []byte{}}, {"small data", []byte("this is a fake image")}, - {"8kb empty", make([]byte, 1024*8, 1024*8)}, + {"8kb empty", make([]byte, 1024*8)}, {"8kb filled", eightKBFilled}, - {"1mb empty", make([]byte, 1024*1000, 1024*1000)}, + {"1mb empty", make([]byte, 1024*1000)}, {"1mb filled", oneMbFilled}, - {"8mb empty", make([]byte, 1024*1000*8, 1024*1000*8)}, + {"8mb empty", make([]byte, 1024*1000*8)}, {"8mb filled", eightMbFilled}, } @@ -694,53 +694,167 @@ func BenchmarkBinaryReader(b *testing.B) { _, err := s.Write(tc.data) test.That(b, err, test.ShouldBeNil) expectedHash := s.Sum(nil) + tmpDir := b.TempDir() + name := resource.NewName(resource.APINamespaceRDK.WithComponentType("camera"), "my-cam") + additionalParams := map[string]string{"mime_type": rutils.MimeTypeJPEG, "test": "1"} + methodParams, err := rprotoutils.ConvertStringMapToAnyPBMap(additionalParams) + test.That(b, err, test.ShouldBeNil) + + readImageCaptureMetadata := BuildCaptureMetadata( + name.API, + name.ShortName(), + readImage, + additionalParams, + methodParams, + []string{"my", "tags"}, + ) + + now := time.Now() + timeRequested := timestamppb.New(now.UTC()) + timeReceived := timestamppb.New(now.Add(time.Millisecond).UTC()) + msg := &v1.SensorData{ + Metadata: &v1.SensorMetadata{ + TimeRequested: timeRequested, + TimeReceived: timeReceived, + }, + Data: &v1.SensorData_Binary{ + Binary: tc.data, + }, + } + + buf := NewCaptureBuffer(tmpDir, readImageCaptureMetadata, int64(4*1024)) + + // Path() is the same as the first paramenter passed to NewCaptureBuffer + test.That(b, buf.Path(), test.ShouldResemble, tmpDir) + test.That(b, buf.metaData, test.ShouldResemble, readImageCaptureMetadata) + + test.That(b, buf.Write(msg), test.ShouldBeNil) + test.That(b, buf.Flush(), test.ShouldBeNil) + dirEntries, err := os.ReadDir(buf.Path()) + test.That(b, err, test.ShouldBeNil) + test.That(b, len(dirEntries), test.ShouldEqual, 1) + test.That(b, filepath.Ext(dirEntries[0].Name()), test.ShouldResemble, CompletedCaptureFileExt) + f, err := os.Open(filepath.Join(buf.Path(), dirEntries[0].Name())) + test.That(b, err, test.ShouldBeNil) + b.Cleanup(func() { test.That(b, f.Close(), test.ShouldBeNil) }) + b.ResetTimer() - b.Run(tc.name+" read entire binary", func(b *testing.B) { + b.Run("chunked "+tc.name, func(b *testing.B) { for i := 0; i < b.N; i++ { - tmpDir := b.TempDir() - name := resource.NewName(resource.APINamespaceRDK.WithComponentType("camera"), "my-cam") - additionalParams := map[string]string{"mime_type": rutils.MimeTypeJPEG, "test": "1"} - methodParams, err := rprotoutils.ConvertStringMapToAnyPBMap(additionalParams) + ret, err := f.Seek(0, io.SeekStart) test.That(b, err, test.ShouldBeNil) + test.That(b, ret, test.ShouldEqual, 0) + cf2, err := NewCaptureFile(f) + test.That(b, err, test.ShouldBeNil) + test.That(b, cf2.ReadMetadata(), test.ShouldResemble, readImageCaptureMetadata) - readImageCaptureMetadata := BuildCaptureMetadata( - name.API, - name.ShortName(), - readImage, - additionalParams, - methodParams, - []string{"my", "tags"}, - ) - - now := time.Now() - timeRequested := timestamppb.New(now.UTC()) - timeReceived := timestamppb.New(now.Add(time.Millisecond).UTC()) - msg := &v1.SensorData{ - Metadata: &v1.SensorMetadata{ - TimeRequested: timeRequested, - TimeReceived: timeReceived, - }, - Data: &v1.SensorData_Binary{ - Binary: tc.data, - }, + var md v1.SensorMetadata + r, err := cf2.BinaryReader(&md) + test.That(b, err, test.ShouldBeNil) + test.That(b, r, test.ShouldNotBeNil) + test.That(b, &md, test.ShouldResemble, msg.GetMetadata()) + data := make([]byte, 4064) + h := sha1.New() + for { + n, err := r.Read(data) + if errors.Is(err, io.EOF) { + break + } + test.That(b, err, test.ShouldBeNil) + _, err = h.Write(data[:n]) + test.That(b, err, test.ShouldBeNil) } + actualHash := h.Sum(nil) + test.That(b, actualHash, test.ShouldResemble, expectedHash) + } + }) + } +} - buf := NewCaptureBuffer(tmpDir, readImageCaptureMetadata, int64(4*1024)) +func BenchmarkNonChunked(b *testing.B) { + type testCase struct { + name string + data []byte + } + eightKBFilled := make([]byte, 1024*8) + for i := range eightKBFilled { + eightKBFilled[i] = uint8(i % 256) + } - // Path() is the same as the first paramenter passed to NewCaptureBuffer - test.That(b, buf.Path(), test.ShouldResemble, tmpDir) - test.That(b, buf.metaData, test.ShouldResemble, readImageCaptureMetadata) + oneMbFilled := make([]byte, 1024*1000) + for i := range eightKBFilled { + oneMbFilled[i] = uint8(i % 256) + } - test.That(b, buf.Write(msg), test.ShouldBeNil) - test.That(b, buf.Flush(), test.ShouldBeNil) - dirEntries, err := os.ReadDir(buf.Path()) - test.That(b, err, test.ShouldBeNil) - test.That(b, len(dirEntries), test.ShouldEqual, 1) - test.That(b, filepath.Ext(dirEntries[0].Name()), test.ShouldResemble, CompletedCaptureFileExt) - f, err := os.Open(filepath.Join(buf.Path(), dirEntries[0].Name())) - test.That(b, err, test.ShouldBeNil) - defer func() { test.That(b, f.Close(), test.ShouldBeNil) }() + eightMbFilled := make([]byte, 1024*1000*8) + for i := range eightMbFilled { + eightMbFilled[i] = uint8(i % 256) + } + + tcs := []testCase{ + {"empty data", []byte{}}, + {"small data", []byte("this is a fake image")}, + {"8kb empty", make([]byte, 1024*8)}, + {"8kb filled", eightKBFilled}, + {"1mb empty", make([]byte, 1024*1000)}, + {"1mb filled", oneMbFilled}, + {"8mb empty", make([]byte, 1024*1000*8)}, + {"8mb filled", eightMbFilled}, + } + + for _, tc := range tcs { + s := sha1.New() + _, err := s.Write(tc.data) + test.That(b, err, test.ShouldBeNil) + expectedHash := s.Sum(nil) + tmpDir := b.TempDir() + name := resource.NewName(resource.APINamespaceRDK.WithComponentType("camera"), "my-cam") + additionalParams := map[string]string{"mime_type": rutils.MimeTypeJPEG, "test": "1"} + methodParams, err := rprotoutils.ConvertStringMapToAnyPBMap(additionalParams) + test.That(b, err, test.ShouldBeNil) + + readImageCaptureMetadata := BuildCaptureMetadata( + name.API, + name.ShortName(), + readImage, + additionalParams, + methodParams, + []string{"my", "tags"}, + ) + + now := time.Now() + timeRequested := timestamppb.New(now.UTC()) + timeReceived := timestamppb.New(now.Add(time.Millisecond).UTC()) + msg := &v1.SensorData{ + Metadata: &v1.SensorMetadata{ + TimeRequested: timeRequested, + TimeReceived: timeReceived, + }, + Data: &v1.SensorData_Binary{ + Binary: tc.data, + }, + } + buf := NewCaptureBuffer(tmpDir, readImageCaptureMetadata, int64(4*1024)) + + test.That(b, buf.Path(), test.ShouldResemble, tmpDir) + test.That(b, buf.metaData, test.ShouldResemble, readImageCaptureMetadata) + + test.That(b, buf.Write(msg), test.ShouldBeNil) + test.That(b, buf.Flush(), test.ShouldBeNil) + dirEntries, err := os.ReadDir(buf.Path()) + test.That(b, err, test.ShouldBeNil) + test.That(b, len(dirEntries), test.ShouldEqual, 1) + test.That(b, filepath.Ext(dirEntries[0].Name()), test.ShouldResemble, CompletedCaptureFileExt) + f, err := os.Open(filepath.Join(buf.Path(), dirEntries[0].Name())) + test.That(b, err, test.ShouldBeNil) + b.Cleanup(func() { test.That(b, f.Close(), test.ShouldBeNil) }) + b.ResetTimer() + b.Run("non chunked "+tc.name, func(b *testing.B) { + for i := 0; i < b.N; i++ { + ret, err := f.Seek(0, io.SeekStart) + test.That(b, err, test.ShouldBeNil) + test.That(b, ret, test.ShouldEqual, 0) cf2, err := NewCaptureFile(f) test.That(b, err, test.ShouldBeNil) test.That(b, cf2.ReadMetadata(), test.ShouldResemble, readImageCaptureMetadata) @@ -755,86 +869,26 @@ func BenchmarkBinaryReader(b *testing.B) { test.That(b, actualHash, test.ShouldResemble, expectedHash) } }) - // b.Run(tc.name+" chunked", func(b *testing.B) { - // for i := 0; i < b.N; i++ { - // tmpDir := b.TempDir() - // name := resource.NewName(resource.APINamespaceRDK.WithComponentType("camera"), "my-cam") - // additionalParams := map[string]string{"mime_type": rutils.MimeTypeJPEG, "test": "1"} - // methodParams, err := rprotoutils.ConvertStringMapToAnyPBMap(additionalParams) - // test.That(b, err, test.ShouldBeNil) - - // readImageCaptureMetadata := BuildCaptureMetadata( - // name.API, - // name.ShortName(), - // readImage, - // additionalParams, - // methodParams, - // []string{"my", "tags"}, - // ) - - // now := time.Now() - // timeRequested := timestamppb.New(now.UTC()) - // timeReceived := timestamppb.New(now.Add(time.Millisecond).UTC()) - // msg := &v1.SensorData{ - // Metadata: &v1.SensorMetadata{ - // TimeRequested: timeRequested, - // TimeReceived: timeReceived, - // }, - // Data: &v1.SensorData_Binary{ - // Binary: tc.data, - // }, - // } - - // buf := NewCaptureBuffer(tmpDir, readImageCaptureMetadata, int64(4*1024)) - - // // Path() is the same as the first paramenter passed to NewCaptureBuffer - // test.That(b, buf.Path(), test.ShouldResemble, tmpDir) - // test.That(b, buf.metaData, test.ShouldResemble, readImageCaptureMetadata) - - // test.That(b, buf.Write(msg), test.ShouldBeNil) - // test.That(b, buf.Flush(), test.ShouldBeNil) - // dirEntries, err := os.ReadDir(buf.Path()) - // test.That(b, err, test.ShouldBeNil) - // test.That(b, len(dirEntries), test.ShouldEqual, 1) - // test.That(b, filepath.Ext(dirEntries[0].Name()), test.ShouldResemble, CompletedCaptureFileExt) - // f, err := os.Open(filepath.Join(buf.Path(), dirEntries[0].Name())) - // test.That(b, err, test.ShouldBeNil) - // defer func() { test.That(b, f.Close(), test.ShouldBeNil) }() - - // cf2, err := NewCaptureFile(f) - // test.That(b, err, test.ShouldBeNil) - // test.That(b, cf2.ReadMetadata(), test.ShouldResemble, readImageCaptureMetadata) - - // var md v1.SensorMetadata - // r, err := cf2.BinaryReader(&md) - // test.That(b, err, test.ShouldBeNil) - // test.That(b, r, test.ShouldNotBeNil) - // test.That(b, &md, test.ShouldResemble, msg.GetMetadata()) - // data, err := io.ReadAll(r) - // test.That(b, err, test.ShouldBeNil) - // test.That(b, data, test.ShouldResemble, msg.GetBinary()) - // } - // }) } } func FuzzBinaryReader(f *testing.F) { - eightKBFilled := make([]byte, 1024*8, 1024*8) + eightKBFilled := make([]byte, 1024*8) for i := range eightKBFilled { eightKBFilled[i] = uint8(i % 256) } - eightMbFilled := make([]byte, 1024*1000*8, 1024*1000*8) + eightMbFilled := make([]byte, 1024*1000*8) for i := range eightMbFilled { eightMbFilled[i] = uint8(i % 256) } tcs := [][]byte{ - []byte{}, + {}, []byte("this is a fake image"), - make([]byte, 1024*8, 1024*8), + make([]byte, 1024*8), eightKBFilled, - make([]byte, 1024*1000*8, 1024*1000*8), + make([]byte, 1024*1000*8), } for _, tc := range tcs { @@ -900,7 +954,7 @@ func FuzzBinaryReader(f *testing.F) { }) } -// nolint +//nolint func getCaptureFiles(dir string) (dcFiles, progFiles []string) { _ = filepath.Walk(dir, func(path string, info os.FileInfo, err error) error { if err != nil { diff --git a/data/capture_file.go b/data/capture_file.go index b5c324d8d1f..d44123eae0f 100644 --- a/data/capture_file.go +++ b/data/capture_file.go @@ -88,7 +88,7 @@ func (f *CaptureFile) ReadMetadata() *v1.DataCaptureMetadata { var errInvalidVarint = errors.New("invalid varint32 encountered") -func ReadTag(r io.Reader, num *protowire.Number, typ *protowire.Type) (n int, err error) { +func readTag(r io.Reader, num *protowire.Number, typ *protowire.Type) (n int, err error) { // Per AbstractParser#parsePartialDelimitedFrom with // CodedInputStream#readRawVarint32. var headerBuf [binary.MaxVarintLen32]byte @@ -122,8 +122,8 @@ func ReadTag(r io.Reader, num *protowire.Number, typ *protowire.Type) (n int, er return bytesRead, nil } -// *SensorMetadata -func ReadMessageLength(r io.Reader, m *uint64) (n int, err error) { +// *SensorMetadata. +func readMessageLength(r io.Reader, m *uint64) (n int, err error) { // Per AbstractParser#parsePartialDelimitedFrom with // CodedInputStream#readRawVarint32. var headerBuf [binary.MaxVarintLen32]byte @@ -155,6 +155,9 @@ func ReadMessageLength(r io.Reader, m *uint64) (n int, err error) { return bytesRead, nil } +// BinaryReader reads v1.SensorMetadata from a binary capture file and returns +// an io.Reader which will read the binary payload. +// Returns an error if the file does not contain a valid binary capture file. func (f *CaptureFile) BinaryReader(md *v1.SensorMetadata) (io.Reader, error) { f.lock.Lock() defer f.lock.Unlock() @@ -169,7 +172,7 @@ func (f *CaptureFile) BinaryReader(md *v1.SensorMetadata) (io.Reader, error) { } // remove delimiter (we know we will only have one for the binary image) var topLevelMsgLen uint64 - bytesRead, err := ReadMessageLength(f.file, &topLevelMsgLen) + bytesRead, err := readMessageLength(f.file, &topLevelMsgLen) if err != nil { return nil, err } @@ -183,7 +186,7 @@ func (f *CaptureFile) BinaryReader(md *v1.SensorMetadata) (io.Reader, error) { tagType protowire.Type n int ) - n, err = ReadTag(f.file, &tagNum, &tagType) + n, err = readTag(f.file, &tagNum, &tagType) bytesRead += n if err != nil { return nil, err @@ -193,7 +196,7 @@ func (f *CaptureFile) BinaryReader(md *v1.SensorMetadata) (io.Reader, error) { return nil, fmt.Errorf("tagNum %d is invalid", tagNum) } - // TODO: Techically it isn't guranteed this value will be 1 + // TODO: Techically it isn't guaranteed this value will be 1 // but this code currently assumes it will for simplicity // see: https://protobuf.dev/programming-guides/encoding/#optional if tagNum != 1 { @@ -207,7 +210,7 @@ func (f *CaptureFile) BinaryReader(md *v1.SensorMetadata) (io.Reader, error) { } var sensorMDLen uint64 - n, err = ReadMessageLength(f.file, &sensorMDLen) + n, err = readMessageLength(f.file, &sensorMDLen) bytesRead += n if err != nil { return nil, err @@ -227,7 +230,7 @@ func (f *CaptureFile) BinaryReader(md *v1.SensorMetadata) (io.Reader, error) { payloadTagNum protowire.Number payloadTagType protowire.Type ) - n, err = ReadTag(f.file, &payloadTagNum, &payloadTagType) + n, err = readTag(f.file, &payloadTagNum, &payloadTagType) bytesRead += n if err != nil { return nil, err @@ -247,14 +250,17 @@ func (f *CaptureFile) BinaryReader(md *v1.SensorMetadata) (io.Reader, error) { } var payloadLen uint64 - n, err = ReadMessageLength(f.file, &payloadLen) + n, err = readMessageLength(f.file, &payloadLen) bytesRead += n if err != nil { return nil, err } actualPayloadLen := f.size - (f.initialReadOffset + int64(bytesRead)) if int64(payloadLen) != actualPayloadLen { - return nil, fmt.Errorf("capture file contains incomplete binary payload or data after the binary payload, payloadLength described in capture file: %d, actual payload length: %d, filesize: %d, bytesRead: %d", payloadLen, actualPayloadLen, f.size, bytesRead) + return nil, fmt.Errorf("capture file contains incomplete binary payload "+ + "or data after the binary payload, payloadLength described in capture file: "+ + "%d, actual payload length: %d, filesize: %d, bytesRead: %d", + payloadLen, actualPayloadLen, f.size, bytesRead) } return bufio.NewReader(f.file), nil } From 215ebb8fa1fe08cad87d712d2dd7ee68866152f5 Mon Sep 17 00:00:00 2001 From: Nick Sanford Date: Thu, 17 Oct 2024 10:50:25 -0400 Subject: [PATCH 6/6] wip --- services/datamanager/builtin/config.go | 2 + services/datamanager/builtin/sync/config.go | 1 + services/datamanager/builtin/sync/sync.go | 2 +- .../builtin/sync/upload_data_capture_file.go | 135 +++++++++++++++--- 4 files changed, 121 insertions(+), 19 deletions(-) diff --git a/services/datamanager/builtin/config.go b/services/datamanager/builtin/config.go index 526429d9cdb..10b4dea8545 100644 --- a/services/datamanager/builtin/config.go +++ b/services/datamanager/builtin/config.go @@ -49,6 +49,7 @@ type Config struct { ScheduledSyncDisabled bool `json:"sync_disabled"` SelectiveSyncerName string `json:"selective_syncer_name"` SyncIntervalMins float64 `json:"sync_interval_mins"` + Flag bool `json:"flag"` } // Validate returns components which will be depended upon weakly due to the above matcher. @@ -119,6 +120,7 @@ func (c *Config) syncConfig(syncSensor sensor.Sensor, syncSensorEnabled bool, lo } return datasync.Config{ + Flag: c.Flag, AdditionalSyncPaths: c.AdditionalSyncPaths, Tags: c.Tags, CaptureDir: c.getCaptureDir(), diff --git a/services/datamanager/builtin/sync/config.go b/services/datamanager/builtin/sync/config.go index 833bce9622f..fd9f085c3f2 100644 --- a/services/datamanager/builtin/sync/config.go +++ b/services/datamanager/builtin/sync/config.go @@ -10,6 +10,7 @@ import ( // Config is the sync config from builtin. type Config struct { + Flag bool // AdditionalSyncPaths defines the file system paths // that should be synced in addition to the CaptureDir. // Generally 3rd party programs will write arbitrary diff --git a/services/datamanager/builtin/sync/sync.go b/services/datamanager/builtin/sync/sync.go index 80568551462..fbf6cce0122 100644 --- a/services/datamanager/builtin/sync/sync.go +++ b/services/datamanager/builtin/sync/sync.go @@ -388,7 +388,7 @@ func (s *Sync) syncDataCaptureFile(f *os.File, captureDir string, logger logging retry := newExponentialRetry(s.configCtx, s.clock, s.logger, f.Name(), func(ctx context.Context) (uint64, error) { msg := "error uploading data capture file %s, size: %s, md: %s" errMetadata := fmt.Sprintf(msg, captureFile.GetPath(), data.FormatBytesI64(captureFile.Size()), captureFile.ReadMetadata()) - bytesUploaded, err := uploadDataCaptureFile(ctx, captureFile, s.cloudConn, logger) + bytesUploaded, err := uploadDataCaptureFile(ctx, captureFile, s.cloudConn, s.config.Flag, logger) if err != nil { return 0, errors.Wrap(err, errMetadata) } diff --git a/services/datamanager/builtin/sync/upload_data_capture_file.go b/services/datamanager/builtin/sync/upload_data_capture_file.go index c913d5f478f..f6bc28737d6 100644 --- a/services/datamanager/builtin/sync/upload_data_capture_file.go +++ b/services/datamanager/builtin/sync/upload_data_capture_file.go @@ -3,6 +3,7 @@ package sync import ( "context" "fmt" + "io" "github.com/docker/go-units" "github.com/go-viper/mapstructure/v2" @@ -25,34 +26,32 @@ var MaxUnaryFileSize = int64(units.MB) // uses StreamingDataCaptureUpload API so as to not exceed the unary response size. // Otherwise, uploads data over DataCaptureUpload API. // Note: the bytes size returned is the size of the input file. It only returns a non 0 value in the success case. -func uploadDataCaptureFile(ctx context.Context, f *data.CaptureFile, conn cloudConn, logger logging.Logger) (uint64, error) { +func uploadDataCaptureFile(ctx context.Context, f *data.CaptureFile, conn cloudConn, flag bool, logger logging.Logger) (uint64, error) { logger.Debugf("preparing to upload data capture file: %s, size: %d", f.GetPath(), f.Size()) md := f.ReadMetadata() + + // camera.GetImages is a special case. For that API we make 2 binary data upload requests + if md.GetType() == v1.DataType_DATA_TYPE_BINARY_SENSOR && md.GetMethodName() == data.GetImages { + return uint64(f.Size()), uploadGetImages(ctx, conn, md, f, logger) + } + + metaData := uploadMetadata(conn.partID, md, md.GetFileExtension()) + if md.GetType() == v1.DataType_DATA_TYPE_BINARY_SENSOR && flag { + return uint64(f.Size()), uploadChunkedBinaryData(ctx, conn.client, metaData, f, logger) + } + sensorData, err := data.SensorDataFromCaptureFile(f) if err != nil { return 0, errors.Wrap(err, "error reading sensor data") } - // Do not attempt to upload a file without any sensor readings. if len(sensorData) == 0 { logger.Warnf("ignoring and deleting empty capture file without syncing it: %s", f.GetPath()) // log here as this will delete a .capture file without uploading it and without moving it to the failed directory return 0, nil } - if md.GetType() == v1.DataType_DATA_TYPE_BINARY_SENSOR && len(sensorData) > 1 { - return 0, fmt.Errorf("binary sensor data file with more than one sensor reading is not supported: %s", f.GetPath()) - } - - // camera.GetImages is a special case. For that API we make 2 binary data upload requests - if md.GetType() == v1.DataType_DATA_TYPE_BINARY_SENSOR && md.GetMethodName() == data.GetImages { - logger.Debugf("attemping to upload camera.GetImages data: %s", f.GetPath()) - - return uint64(f.Size()), uploadGetImages(ctx, conn, md, sensorData[0], f.Size(), f.GetPath(), logger) - } - - metaData := uploadMetadata(conn.partID, md, md.GetFileExtension()) return uint64(f.Size()), uploadSensorData(ctx, conn.client, metaData, sensorData, f.Size(), f.GetPath(), logger) } @@ -73,11 +72,26 @@ func uploadGetImages( ctx context.Context, conn cloudConn, md *v1.DataCaptureMetadata, - sd *v1.SensorData, - size int64, - path string, + f *data.CaptureFile, logger logging.Logger, ) error { + logger.Debugf("attemping to upload camera.GetImages data: %s", f.GetPath()) + + sensorData, err := data.SensorDataFromCaptureFile(f) + if err != nil { + return errors.Wrap(err, "error reading sensor data") + } + + if len(sensorData) == 0 { + logger.Warnf("ignoring and deleting empty capture file without syncing it: %s", f.GetPath()) + // log here as this will delete a .capture file without uploading it and without moving it to the failed directory + return nil + } + + if len(sensorData) > 1 { + return fmt.Errorf("binary sensor data file with more than one sensor reading is not supported: %s", f.GetPath()) + } + sd := sensorData[0] var res pb.GetImagesResponse if err := mapstructure.Decode(sd.GetStruct().AsMap(), &res); err != nil { return errors.Wrap(err, "failed to decode camera.GetImagesResponse") @@ -100,7 +114,7 @@ func uploadGetImages( metadata := uploadMetadata(conn.partID, md, getFileExtFromImageFormat(img.GetFormat())) // TODO: This is wrong as the size describes the size of the entire GetImages response, but we are only // uploading one of the 2 images in that response here. - if err := uploadSensorData(ctx, conn.client, metadata, newSensorData, size, path, logger); err != nil { + if err := uploadSensorData(ctx, conn.client, metadata, newSensorData, f.Size(), f.GetPath(), logger); err != nil { return errors.Wrapf(err, "failed uploading GetImages image index: %d", i) } } @@ -123,6 +137,45 @@ func getImagesTimestamps(res *pb.GetImagesResponse, sensorData *v1.SensorData) ( return timeRequested, timeReceived } +func uploadChunkedBinaryData( + ctx context.Context, + client v1.DataSyncServiceClient, + uploadMD *v1.UploadMetadata, + f *data.CaptureFile, + logger logging.Logger, +) error { + // If it's a large binary file, we need to upload it in chunks. + logger.Debugf("attempting to upload large binary file using StreamingDataCaptureUpload, file: %s", f.GetPath()) + var smd v1.SensorMetadata + r, err := f.BinaryReader(&smd) + if err != nil { + return err + } + c, err := client.StreamingDataCaptureUpload(ctx) + if err != nil { + return errors.Wrap(err, "error creating StreamingDataCaptureUpload client") + } + + // First send metadata. + streamMD := &v1.StreamingDataCaptureUploadRequest_Metadata{ + Metadata: &v1.DataCaptureUploadMetadata{ + UploadMetadata: uploadMD, + SensorMetadata: &smd, + }, + } + if err := c.Send(&v1.StreamingDataCaptureUploadRequest{UploadPacket: streamMD}); err != nil { + return errors.Wrap(err, "StreamingDataCaptureUpload failed sending metadata") + } + + // Then call the function to send the rest. + if err := sendChunkedStreamingDCRequests(ctx, c, r, f.GetPath(), logger); err != nil { + return errors.Wrap(err, "StreamingDataCaptureUpload failed to sync") + } + + _, err = c.CloseAndRecv() + return errors.Wrap(err, "StreamingDataCaptureUpload CloseAndRecv failed") +} + func uploadSensorData( ctx context.Context, client v1.DataSyncServiceClient, @@ -171,6 +224,52 @@ func uploadSensorData( return errors.Wrap(err, "DataCaptureUpload failed") } +func sendChunkedStreamingDCRequests( + ctx context.Context, + stream v1.DataSyncService_StreamingDataCaptureUploadClient, + r io.Reader, + path string, + logger logging.Logger, +) error { + chunk := make([]byte, UploadChunkSize) + // Loop until there is no more content to send. + chunkCount := 0 + for { + select { + case <-ctx.Done(): + return ctx.Err() + default: + n, errRead := r.Read(chunk) + if n > 0 { + // if there is data, send it + // Build request with contents. + uploadReq := &v1.StreamingDataCaptureUploadRequest{ + UploadPacket: &v1.StreamingDataCaptureUploadRequest_Data{ + Data: chunk[:n], + }, + } + + // Send request + logger.Debugf("datasync.StreamingDataCaptureUpload sending chunk %d for file: %s", chunkCount, path) + if errSend := stream.Send(uploadReq); errSend != nil { + return errSend + } + } + + // if we reached the end of the file return nil err (success) + if errors.Is(errRead, io.EOF) { + return nil + } + + // if Read hit an unexpected error, return the error + if errRead != nil { + return errRead + } + chunkCount++ + } + } +} + func sendStreamingDCRequests( ctx context.Context, stream v1.DataSyncService_StreamingDataCaptureUploadClient,