diff --git a/cli/data.go b/cli/data.go index 1f4e078b9da..71dc464692e 100644 --- a/cli/data.go +++ b/cli/data.go @@ -602,7 +602,7 @@ func filenameForDownload(meta *datapb.BinaryMetadata) string { } // Replace reserved characters. - fileName = data.CaptureFilePathWithReplacedReservedChars(fileName) + fileName = data.FilePathWithReplacedReservedChars(fileName) return fileName } diff --git a/data/capture_buffer.go b/data/capture_buffer.go index 0989efcae57..9b8349678fc 100644 --- a/data/capture_buffer.go +++ b/data/capture_buffer.go @@ -15,9 +15,9 @@ type CaptureBufferedWriter interface { // CaptureBuffer is a persistent queue of SensorData backed by a series of *data.CaptureFile. type CaptureBuffer struct { - Directory string - MetaData *v1.DataCaptureMetadata - nextFile *CaptureFile + directory string + metaData *v1.DataCaptureMetadata + nextFile *ProgFile lock sync.Mutex maxCaptureFileSize int64 } @@ -25,8 +25,8 @@ type CaptureBuffer struct { // NewCaptureBuffer returns a new Buffer. func NewCaptureBuffer(dir string, md *v1.DataCaptureMetadata, maxCaptureFileSize int64) *CaptureBuffer { return &CaptureBuffer{ - Directory: dir, - MetaData: md, + directory: dir, + metaData: md, maxCaptureFileSize: maxCaptureFileSize, } } @@ -36,12 +36,24 @@ func NewCaptureBuffer(dir string, md *v1.DataCaptureMetadata, maxCaptureFileSize // are still being written to are indicated with the extension // InProgressFileExt. Files that have finished being written to are indicated by // FileExt. +func isBinary(item *v1.SensorData) bool { + if item == nil { + return false + } + switch item.Data.(type) { + case *v1.SensorData_Binary: + return true + default: + return false + } +} + func (b *CaptureBuffer) Write(item *v1.SensorData) error { b.lock.Lock() defer b.lock.Unlock() - if item.GetBinary() != nil { - binFile, err := NewCaptureFile(b.Directory, b.MetaData) + if isBinary(item) { + binFile, err := NewProgFile(b.directory, b.metaData) if err != nil { return err } @@ -55,7 +67,7 @@ func (b *CaptureBuffer) Write(item *v1.SensorData) error { } if b.nextFile == nil { - nextFile, err := NewCaptureFile(b.Directory, b.MetaData) + nextFile, err := NewProgFile(b.directory, b.metaData) if err != nil { return err } @@ -63,11 +75,11 @@ func (b *CaptureBuffer) Write(item *v1.SensorData) error { // We want to special case on "CaptureAllFromCamera" because it is sensor data that contains images // and their corresponding annotations. We want each image and its annotations to be stored in a // separate file. - } else if b.nextFile.Size() > b.maxCaptureFileSize || b.MetaData.MethodName == "CaptureAllFromCamera" { + } else if b.nextFile.Size() > b.maxCaptureFileSize || b.metaData.MethodName == "CaptureAllFromCamera" { if err := b.nextFile.Close(); err != nil { return err } - nextFile, err := NewCaptureFile(b.Directory, b.MetaData) + nextFile, err := NewProgFile(b.directory, b.metaData) if err != nil { return err } @@ -93,5 +105,5 @@ func (b *CaptureBuffer) Flush() error { // Path returns the path to the directory containing the backing data capture files. func (b *CaptureBuffer) Path() string { - return b.Directory + return b.directory } diff --git a/data/capture_buffer_test.go b/data/capture_buffer_test.go index a89f85b0c60..82cec5fea64 100644 --- a/data/capture_buffer_test.go +++ b/data/capture_buffer_test.go @@ -1,6 +1,7 @@ package data import ( + "crypto/sha1" "errors" "io" "os" @@ -267,7 +268,7 @@ func TestCaptureBufferReader(t *testing.T) { test.That(t, err, test.ShouldBeNil) defer func() { utils.UncheckedError(f.Close()) }() - cf, err := ReadCaptureFile(f) + cf, err := NewCaptureFile(f) test.That(t, err, test.ShouldBeNil) test.That(t, cf.ReadMetadata(), test.ShouldResemble, readImageCaptureMetadata) @@ -335,7 +336,7 @@ func TestCaptureBufferReader(t *testing.T) { test.That(t, err, test.ShouldBeNil) defer func() { utils.UncheckedError(f2.Close()) }() - cf2, err := ReadCaptureFile(f2) + cf2, err := NewCaptureFile(f2) test.That(t, err, test.ShouldBeNil) test.That(t, cf2.ReadMetadata(), test.ShouldResemble, readImageCaptureMetadata) @@ -470,7 +471,7 @@ func TestCaptureBufferReader(t *testing.T) { test.That(t, err, test.ShouldBeNil) defer func() { utils.UncheckedError(f.Close()) }() - cf, err := ReadCaptureFile(f) + cf, err := NewCaptureFile(f) test.That(t, err, test.ShouldBeNil) test.That(t, cf.ReadMetadata(), test.ShouldResemble, readImageCaptureMetadata) @@ -508,7 +509,7 @@ func TestCaptureBufferReader(t *testing.T) { test.That(t, err, test.ShouldBeNil) defer func() { utils.UncheckedError(f2.Close()) }() - cf2, err := ReadCaptureFile(f2) + cf2, err := NewCaptureFile(f2) test.That(t, err, test.ShouldBeNil) test.That(t, cf2.ReadMetadata(), test.ShouldResemble, readImageCaptureMetadata) @@ -563,7 +564,7 @@ func TestCaptureBufferReader(t *testing.T) { f3, err := os.Open(filepath.Join(b.Path(), newFileNames[0])) test.That(t, err, test.ShouldBeNil) defer func() { utils.UncheckedError(f3.Close()) }() - cf3, err := ReadCaptureFile(f3) + cf3, err := NewCaptureFile(f3) test.That(t, err, test.ShouldBeNil) test.That(t, cf3.ReadMetadata(), test.ShouldResemble, readImageCaptureMetadata) sd3, err := cf3.ReadNext() @@ -576,7 +577,7 @@ func TestCaptureBufferReader(t *testing.T) { f4, err := os.Open(filepath.Join(b.Path(), newFileNames[1])) test.That(t, err, test.ShouldBeNil) defer func() { utils.UncheckedError(f4.Close()) }() - cf4, err := ReadCaptureFile(f4) + cf4, err := NewCaptureFile(f4) test.That(t, err, test.ShouldBeNil) test.That(t, cf4.ReadMetadata(), test.ShouldResemble, readImageCaptureMetadata) sd4, err := cf4.ReadNext() @@ -620,7 +621,7 @@ func TestCaptureBufferReader(t *testing.T) { // Path() is the same as the first paramenter passed to NewCaptureBuffer test.That(t, b.Path(), test.ShouldResemble, tmpDir) - test.That(t, b.MetaData, test.ShouldResemble, readImageCaptureMetadata) + test.That(t, b.metaData, test.ShouldResemble, readImageCaptureMetadata) now := time.Now() timeRequested := timestamppb.New(now.UTC()) @@ -644,7 +645,7 @@ func TestCaptureBufferReader(t *testing.T) { test.That(t, err, test.ShouldBeNil) defer func() { utils.UncheckedError(f.Close()) }() - cf2, err := ReadCaptureFile(f) + cf2, err := NewCaptureFile(f) test.That(t, err, test.ShouldBeNil) test.That(t, cf2.ReadMetadata(), test.ShouldResemble, readImageCaptureMetadata) @@ -657,6 +658,302 @@ func TestCaptureBufferReader(t *testing.T) { }) } +func BenchmarkChunked(b *testing.B) { + type testCase struct { + name string + data []byte + } + eightKBFilled := make([]byte, 1024*8) + for i := range eightKBFilled { + eightKBFilled[i] = uint8(i % 256) + } + + oneMbFilled := make([]byte, 1024*1000) + for i := range eightKBFilled { + oneMbFilled[i] = uint8(i % 256) + } + + eightMbFilled := make([]byte, 1024*1000*8) + for i := range eightMbFilled { + eightMbFilled[i] = uint8(i % 256) + } + + tcs := []testCase{ + {"empty data", []byte{}}, + {"small data", []byte("this is a fake image")}, + {"8kb empty", make([]byte, 1024*8)}, + {"8kb filled", eightKBFilled}, + {"1mb empty", make([]byte, 1024*1000)}, + {"1mb filled", oneMbFilled}, + {"8mb empty", make([]byte, 1024*1000*8)}, + {"8mb filled", eightMbFilled}, + } + + for _, tc := range tcs { + s := sha1.New() + _, err := s.Write(tc.data) + test.That(b, err, test.ShouldBeNil) + expectedHash := s.Sum(nil) + tmpDir := b.TempDir() + name := resource.NewName(resource.APINamespaceRDK.WithComponentType("camera"), "my-cam") + additionalParams := map[string]string{"mime_type": rutils.MimeTypeJPEG, "test": "1"} + methodParams, err := rprotoutils.ConvertStringMapToAnyPBMap(additionalParams) + test.That(b, err, test.ShouldBeNil) + + readImageCaptureMetadata := BuildCaptureMetadata( + name.API, + name.ShortName(), + readImage, + additionalParams, + methodParams, + []string{"my", "tags"}, + ) + + now := time.Now() + timeRequested := timestamppb.New(now.UTC()) + timeReceived := timestamppb.New(now.Add(time.Millisecond).UTC()) + msg := &v1.SensorData{ + Metadata: &v1.SensorMetadata{ + TimeRequested: timeRequested, + TimeReceived: timeReceived, + }, + Data: &v1.SensorData_Binary{ + Binary: tc.data, + }, + } + + buf := NewCaptureBuffer(tmpDir, readImageCaptureMetadata, int64(4*1024)) + + // Path() is the same as the first paramenter passed to NewCaptureBuffer + test.That(b, buf.Path(), test.ShouldResemble, tmpDir) + test.That(b, buf.metaData, test.ShouldResemble, readImageCaptureMetadata) + + test.That(b, buf.Write(msg), test.ShouldBeNil) + test.That(b, buf.Flush(), test.ShouldBeNil) + dirEntries, err := os.ReadDir(buf.Path()) + test.That(b, err, test.ShouldBeNil) + test.That(b, len(dirEntries), test.ShouldEqual, 1) + test.That(b, filepath.Ext(dirEntries[0].Name()), test.ShouldResemble, CompletedCaptureFileExt) + f, err := os.Open(filepath.Join(buf.Path(), dirEntries[0].Name())) + test.That(b, err, test.ShouldBeNil) + b.Cleanup(func() { test.That(b, f.Close(), test.ShouldBeNil) }) + + b.ResetTimer() + b.Run("chunked "+tc.name, func(b *testing.B) { + for i := 0; i < b.N; i++ { + ret, err := f.Seek(0, io.SeekStart) + test.That(b, err, test.ShouldBeNil) + test.That(b, ret, test.ShouldEqual, 0) + cf2, err := NewCaptureFile(f) + test.That(b, err, test.ShouldBeNil) + test.That(b, cf2.ReadMetadata(), test.ShouldResemble, readImageCaptureMetadata) + + var md v1.SensorMetadata + r, err := cf2.BinaryReader(&md) + test.That(b, err, test.ShouldBeNil) + test.That(b, r, test.ShouldNotBeNil) + test.That(b, &md, test.ShouldResemble, msg.GetMetadata()) + data := make([]byte, 4064) + h := sha1.New() + for { + n, err := r.Read(data) + if errors.Is(err, io.EOF) { + break + } + test.That(b, err, test.ShouldBeNil) + _, err = h.Write(data[:n]) + test.That(b, err, test.ShouldBeNil) + } + actualHash := h.Sum(nil) + test.That(b, actualHash, test.ShouldResemble, expectedHash) + } + }) + } +} + +func BenchmarkNonChunked(b *testing.B) { + type testCase struct { + name string + data []byte + } + eightKBFilled := make([]byte, 1024*8) + for i := range eightKBFilled { + eightKBFilled[i] = uint8(i % 256) + } + + oneMbFilled := make([]byte, 1024*1000) + for i := range eightKBFilled { + oneMbFilled[i] = uint8(i % 256) + } + + eightMbFilled := make([]byte, 1024*1000*8) + for i := range eightMbFilled { + eightMbFilled[i] = uint8(i % 256) + } + + tcs := []testCase{ + {"empty data", []byte{}}, + {"small data", []byte("this is a fake image")}, + {"8kb empty", make([]byte, 1024*8)}, + {"8kb filled", eightKBFilled}, + {"1mb empty", make([]byte, 1024*1000)}, + {"1mb filled", oneMbFilled}, + {"8mb empty", make([]byte, 1024*1000*8)}, + {"8mb filled", eightMbFilled}, + } + + for _, tc := range tcs { + s := sha1.New() + _, err := s.Write(tc.data) + test.That(b, err, test.ShouldBeNil) + expectedHash := s.Sum(nil) + tmpDir := b.TempDir() + name := resource.NewName(resource.APINamespaceRDK.WithComponentType("camera"), "my-cam") + additionalParams := map[string]string{"mime_type": rutils.MimeTypeJPEG, "test": "1"} + methodParams, err := rprotoutils.ConvertStringMapToAnyPBMap(additionalParams) + test.That(b, err, test.ShouldBeNil) + + readImageCaptureMetadata := BuildCaptureMetadata( + name.API, + name.ShortName(), + readImage, + additionalParams, + methodParams, + []string{"my", "tags"}, + ) + + now := time.Now() + timeRequested := timestamppb.New(now.UTC()) + timeReceived := timestamppb.New(now.Add(time.Millisecond).UTC()) + msg := &v1.SensorData{ + Metadata: &v1.SensorMetadata{ + TimeRequested: timeRequested, + TimeReceived: timeReceived, + }, + Data: &v1.SensorData_Binary{ + Binary: tc.data, + }, + } + + buf := NewCaptureBuffer(tmpDir, readImageCaptureMetadata, int64(4*1024)) + + test.That(b, buf.Path(), test.ShouldResemble, tmpDir) + test.That(b, buf.metaData, test.ShouldResemble, readImageCaptureMetadata) + + test.That(b, buf.Write(msg), test.ShouldBeNil) + test.That(b, buf.Flush(), test.ShouldBeNil) + dirEntries, err := os.ReadDir(buf.Path()) + test.That(b, err, test.ShouldBeNil) + test.That(b, len(dirEntries), test.ShouldEqual, 1) + test.That(b, filepath.Ext(dirEntries[0].Name()), test.ShouldResemble, CompletedCaptureFileExt) + f, err := os.Open(filepath.Join(buf.Path(), dirEntries[0].Name())) + test.That(b, err, test.ShouldBeNil) + b.Cleanup(func() { test.That(b, f.Close(), test.ShouldBeNil) }) + b.ResetTimer() + b.Run("non chunked "+tc.name, func(b *testing.B) { + for i := 0; i < b.N; i++ { + ret, err := f.Seek(0, io.SeekStart) + test.That(b, err, test.ShouldBeNil) + test.That(b, ret, test.ShouldEqual, 0) + cf2, err := NewCaptureFile(f) + test.That(b, err, test.ShouldBeNil) + test.That(b, cf2.ReadMetadata(), test.ShouldResemble, readImageCaptureMetadata) + + next, err := cf2.ReadNext() + test.That(b, err, test.ShouldBeNil) + test.That(b, next.GetMetadata(), test.ShouldResemble, msg.GetMetadata()) + h := sha1.New() + _, err = h.Write(next.GetBinary()) + test.That(b, err, test.ShouldBeNil) + actualHash := h.Sum(nil) + test.That(b, actualHash, test.ShouldResemble, expectedHash) + } + }) + } +} + +func FuzzBinaryReader(f *testing.F) { + eightKBFilled := make([]byte, 1024*8) + for i := range eightKBFilled { + eightKBFilled[i] = uint8(i % 256) + } + + eightMbFilled := make([]byte, 1024*1000*8) + for i := range eightMbFilled { + eightMbFilled[i] = uint8(i % 256) + } + + tcs := [][]byte{ + {}, + []byte("this is a fake image"), + make([]byte, 1024*8), + eightKBFilled, + make([]byte, 1024*1000*8), + } + + for _, tc := range tcs { + f.Add(tc) + } + f.Fuzz(func(t *testing.T, binary []byte) { + tmpDir := t.TempDir() + name := resource.NewName(resource.APINamespaceRDK.WithComponentType("camera"), "my-cam") + additionalParams := map[string]string{"mime_type": rutils.MimeTypeJPEG, "test": "1"} + methodParams, err := rprotoutils.ConvertStringMapToAnyPBMap(additionalParams) + test.That(t, err, test.ShouldBeNil) + + readImageCaptureMetadata := BuildCaptureMetadata( + name.API, + name.ShortName(), + readImage, + additionalParams, + methodParams, + []string{"my", "tags"}, + ) + + now := time.Now() + timeRequested := timestamppb.New(now.UTC()) + timeReceived := timestamppb.New(now.Add(time.Millisecond).UTC()) + msg := &v1.SensorData{ + Metadata: &v1.SensorMetadata{ + TimeRequested: timeRequested, + TimeReceived: timeReceived, + }, + Data: &v1.SensorData_Binary{ + Binary: binary, + }, + } + + b := NewCaptureBuffer(tmpDir, readImageCaptureMetadata, int64(4*1024)) + + // Path() is the same as the first paramenter passed to NewCaptureBuffer + test.That(t, b.Path(), test.ShouldResemble, tmpDir) + test.That(t, b.metaData, test.ShouldResemble, readImageCaptureMetadata) + + test.That(t, b.Write(msg), test.ShouldBeNil) + test.That(t, b.Flush(), test.ShouldBeNil) + dirEntries, err := os.ReadDir(b.Path()) + test.That(t, err, test.ShouldBeNil) + test.That(t, len(dirEntries), test.ShouldEqual, 1) + test.That(t, filepath.Ext(dirEntries[0].Name()), test.ShouldResemble, CompletedCaptureFileExt) + f, err := os.Open(filepath.Join(b.Path(), dirEntries[0].Name())) + test.That(t, err, test.ShouldBeNil) + defer func() { test.That(t, f.Close(), test.ShouldBeNil) }() + + cf2, err := NewCaptureFile(f) + test.That(t, err, test.ShouldBeNil) + test.That(t, cf2.ReadMetadata(), test.ShouldResemble, readImageCaptureMetadata) + + var md v1.SensorMetadata + r, err := cf2.BinaryReader(&md) + test.That(t, err, test.ShouldBeNil) + test.That(t, r, test.ShouldNotBeNil) + test.That(t, &md, test.ShouldResemble, msg.GetMetadata()) + data, err := io.ReadAll(r) + test.That(t, err, test.ShouldBeNil) + test.That(t, data, test.ShouldResemble, msg.GetBinary()) + }) +} + //nolint func getCaptureFiles(dir string) (dcFiles, progFiles []string) { _ = filepath.Walk(dir, func(path string, info os.FileInfo, err error) error { diff --git a/data/capture_file.go b/data/capture_file.go index 47b118e9926..d44123eae0f 100644 --- a/data/capture_file.go +++ b/data/capture_file.go @@ -2,6 +2,7 @@ package data import ( "bufio" + "encoding/binary" "fmt" "io" "os" @@ -13,9 +14,9 @@ import ( "github.com/matttproud/golang_protobuf_extensions/pbutil" "github.com/pkg/errors" v1 "go.viam.com/api/app/datasync/v1" - "google.golang.org/protobuf/types/known/anypb" + "google.golang.org/protobuf/encoding/protowire" + "google.golang.org/protobuf/proto" - "go.viam.com/rdk/resource" "go.viam.com/rdk/utils" ) @@ -42,20 +43,18 @@ const ( // length delimited protobuf messages, where the first message is the CaptureMetadata for the file, and ensuing // messages contain the captured data. type CaptureFile struct { - path string - lock sync.Mutex - file *os.File - writer *bufio.Writer - size int64 - metadata *v1.DataCaptureMetadata - + Metadata *v1.DataCaptureMetadata + path string + size int64 initialReadOffset int64 - readOffset int64 - writeOffset int64 + + lock sync.Mutex + file *os.File + readOffset int64 } -// ReadCaptureFile creates a File struct from a passed os.File previously constructed using NewFile. -func ReadCaptureFile(f *os.File) (*CaptureFile, error) { +// NewCaptureFile creates a File struct from a passed os.File previously constructed using NewFile. +func NewCaptureFile(f *os.File) (*CaptureFile, error) { if !IsDataCaptureFile(f) { return nil, errors.Errorf("%s is not a data capture file", f.Name()) } @@ -73,92 +72,215 @@ func ReadCaptureFile(f *os.File) (*CaptureFile, error) { ret := CaptureFile{ path: f.Name(), file: f, - writer: bufio.NewWriter(f), size: finfo.Size(), - metadata: md, + Metadata: md, initialReadOffset: int64(initOffset), readOffset: int64(initOffset), - writeOffset: int64(initOffset), } return &ret, nil } -// NewCaptureFile creates a new *CaptureFile with the specified md in the specified directory. -func NewCaptureFile(dir string, md *v1.DataCaptureMetadata) (*CaptureFile, error) { - fileName := CaptureFilePathWithReplacedReservedChars( - filepath.Join(dir, getFileTimestampName()) + InProgressCaptureFileExt) - //nolint:gosec - f, err := os.OpenFile(fileName, os.O_APPEND|os.O_RDWR|os.O_CREATE, 0o600) - if err != nil { - return nil, err - } +// ReadMetadata reads and returns the metadata in f. +func (f *CaptureFile) ReadMetadata() *v1.DataCaptureMetadata { + return f.Metadata +} - // Then write first metadata message to the file. - n, err := pbutil.WriteDelimited(f, md) - if err != nil { - return nil, err +var errInvalidVarint = errors.New("invalid varint32 encountered") + +func readTag(r io.Reader, num *protowire.Number, typ *protowire.Type) (n int, err error) { + // Per AbstractParser#parsePartialDelimitedFrom with + // CodedInputStream#readRawVarint32. + var headerBuf [binary.MaxVarintLen32]byte + var bytesRead, length int + var tagNum protowire.Number + var tagType protowire.Type + for length <= 0 { // i.e. no varint has been decoded yet. + if bytesRead >= len(headerBuf) { + return bytesRead, errInvalidVarint + } + // We have to read byte by byte here to avoid reading more bytes + // than required. Each read byte is appended to what we have + // read before. + newBytesRead, err := r.Read(headerBuf[bytesRead : bytesRead+1]) + if newBytesRead == 0 { + if err != nil { + return bytesRead, err + } + // A Reader should not return (0, nil), but if it does, + // it should be treated as no-op (according to the + // Reader contract). So let's go on... + continue + } + bytesRead += newBytesRead + // Now present everything read so far to the varint decoder and + // see if a varint with a tag type can be decoded already. + tagNum, tagType, length = protowire.ConsumeTag(headerBuf[:bytesRead]) } - return &CaptureFile{ - path: f.Name(), - writer: bufio.NewWriter(f), - file: f, - size: int64(n), - initialReadOffset: int64(n), - readOffset: int64(n), - writeOffset: int64(n), - }, nil + *num = tagNum + *typ = tagType + return bytesRead, nil } -// ReadMetadata reads and returns the metadata in f. -func (f *CaptureFile) ReadMetadata() *v1.DataCaptureMetadata { - return f.metadata +// *SensorMetadata. +func readMessageLength(r io.Reader, m *uint64) (n int, err error) { + // Per AbstractParser#parsePartialDelimitedFrom with + // CodedInputStream#readRawVarint32. + var headerBuf [binary.MaxVarintLen32]byte + var bytesRead, varIntBytes int + var messageLength uint64 + for varIntBytes <= 0 { // i.e. no varint has been decoded yet. + if bytesRead >= len(headerBuf) { + return bytesRead, errInvalidVarint + } + // We have to read byte by byte here to avoid reading more bytes + // than required. Each read byte is appended to what we have + // read before. + newBytesRead, err := r.Read(headerBuf[bytesRead : bytesRead+1]) + if newBytesRead == 0 { + if err != nil { + return bytesRead, err + } + // A Reader should not return (0, nil), but if it does, + // it should be treated as no-op (according to the + // Reader contract). So let's go on... + continue + } + bytesRead += newBytesRead + // Now present everything read so far to the varint decoder and + // see if a varint can be decoded already. + messageLength, varIntBytes = protowire.ConsumeVarint(headerBuf[:bytesRead]) + } + *m = messageLength + return bytesRead, nil } -// ReadNext returns the next SensorData reading. -func (f *CaptureFile) ReadNext() (*v1.SensorData, error) { +// BinaryReader reads v1.SensorMetadata from a binary capture file and returns +// an io.Reader which will read the binary payload. +// Returns an error if the file does not contain a valid binary capture file. +func (f *CaptureFile) BinaryReader(md *v1.SensorMetadata) (io.Reader, error) { f.lock.Lock() defer f.lock.Unlock() - if err := f.writer.Flush(); err != nil { + if f.Metadata.Type != v1.DataType_DATA_TYPE_BINARY_SENSOR { + return nil, errors.New("expected CaptureFile to be of type BINARY") + } + + // seek to the first 32 bit varint delimeter + if _, err := f.file.Seek(f.initialReadOffset, io.SeekStart); err != nil { + return nil, err + } + // remove delimiter (we know we will only have one for the binary image) + var topLevelMsgLen uint64 + bytesRead, err := readMessageLength(f.file, &topLevelMsgLen) + if err != nil { + return nil, err + } + actualLen := f.size - (f.initialReadOffset + int64(bytesRead)) + if int64(topLevelMsgLen) != actualLen { + return nil, fmt.Errorf("binary capture file payload described as having byte size %d, actual size: %d", topLevelMsgLen, actualLen) + } + // now we parse the *v1.SensorMetadata and the binary payload of a binary *v1.SensorData + var ( + tagNum protowire.Number + tagType protowire.Type + n int + ) + n, err = readTag(f.file, &tagNum, &tagType) + bytesRead += n + if err != nil { return nil, err } - if _, err := f.file.Seek(f.readOffset, io.SeekStart); err != nil { + if !tagNum.IsValid() { + return nil, fmt.Errorf("tagNum %d is invalid", tagNum) + } + + // TODO: Techically it isn't guaranteed this value will be 1 + // but this code currently assumes it will for simplicity + // see: https://protobuf.dev/programming-guides/encoding/#optional + if tagNum != 1 { + return nil, fmt.Errorf("expected tagNum to be 1 but instead it is %d", tagNum) + } + + // expected LEN type https://protobuf.dev/programming-guides/encoding/#structure + // in this case an embedded message + if tagType != protowire.BytesType { + return nil, fmt.Errorf("expected tagNum 1 to have LEN wire type, instead it has wire type: %d", tagType) + } + + var sensorMDLen uint64 + n, err = readMessageLength(f.file, &sensorMDLen) + bytesRead += n + if err != nil { return nil, err } - r := v1.SensorData{} - read, err := pbutil.ReadDelimited(f.file, &r) + sensorMDBytes := make([]byte, sensorMDLen) + n, err = io.ReadFull(f.file, sensorMDBytes) + bytesRead += n + if err != nil { + return nil, err + } + err = proto.Unmarshal(sensorMDBytes, md) if err != nil { return nil, err } - f.readOffset += int64(read) - return &r, nil -} + var ( + payloadTagNum protowire.Number + payloadTagType protowire.Type + ) + n, err = readTag(f.file, &payloadTagNum, &payloadTagType) + bytesRead += n + if err != nil { + return nil, err + } -// WriteNext writes the next SensorData reading. -func (f *CaptureFile) WriteNext(data *v1.SensorData) error { - f.lock.Lock() - defer f.lock.Unlock() + if !payloadTagNum.IsValid() { + return nil, fmt.Errorf("payloadTagNum %d is invalid", payloadTagNum) + } - if _, err := f.file.Seek(f.writeOffset, 0); err != nil { - return err + // should be 3 as that is the field number of v1.SensorData's binary oneof + if payloadTagNum != 3 { + return nil, fmt.Errorf("expected payloadTagNum to be 3 but was actually %d", payloadTagNum) } - n, err := pbutil.WriteDelimited(f.writer, data) + + if payloadTagType != protowire.BytesType { + return nil, fmt.Errorf("expected payloadTagType LEN wire type, instead it has wire type: %d", payloadTagType) + } + + var payloadLen uint64 + n, err = readMessageLength(f.file, &payloadLen) + bytesRead += n if err != nil { - return err + return nil, err + } + actualPayloadLen := f.size - (f.initialReadOffset + int64(bytesRead)) + if int64(payloadLen) != actualPayloadLen { + return nil, fmt.Errorf("capture file contains incomplete binary payload "+ + "or data after the binary payload, payloadLength described in capture file: "+ + "%d, actual payload length: %d, filesize: %d, bytesRead: %d", + payloadLen, actualPayloadLen, f.size, bytesRead) } - f.size += int64(n) - f.writeOffset += int64(n) - return nil + return bufio.NewReader(f.file), nil } -// Flush flushes any buffered writes to disk. -func (f *CaptureFile) Flush() error { +// ReadNext returns the next SensorData reading. +func (f *CaptureFile) ReadNext() (*v1.SensorData, error) { f.lock.Lock() defer f.lock.Unlock() - return f.writer.Flush() + + if _, err := f.file.Seek(f.readOffset, io.SeekStart); err != nil { + return nil, err + } + r := v1.SensorData{} + read, err := pbutil.ReadDelimited(f.file, &r) + if err != nil { + return nil, err + } + f.readOffset += int64(read) + + return &r, nil } // Reset resets the read pointer of f. @@ -170,8 +292,6 @@ func (f *CaptureFile) Reset() { // Size returns the size of the file. func (f *CaptureFile) Size() int64 { - f.lock.Lock() - defer f.lock.Unlock() return f.size } @@ -182,18 +302,6 @@ func (f *CaptureFile) GetPath() string { // Close closes the file. func (f *CaptureFile) Close() error { - f.lock.Lock() - defer f.lock.Unlock() - if err := f.writer.Flush(); err != nil { - return err - } - - // Rename file to indicate that it is done being written. - withoutExt := strings.TrimSuffix(f.file.Name(), filepath.Ext(f.file.Name())) - newName := withoutExt + CompletedCaptureFileExt - if err := os.Rename(f.file.Name(), newName); err != nil { - return err - } return f.file.Close() } @@ -207,47 +315,9 @@ func (f *CaptureFile) Delete() error { return os.Remove(f.GetPath()) } -// BuildCaptureMetadata builds a DataCaptureMetadata object and returns error if -// additionalParams fails to convert to anypb map. -func BuildCaptureMetadata( - compAPI resource.API, - compName string, - method string, - additionalParams map[string]string, - methodParams map[string]*anypb.Any, - tags []string, -) *v1.DataCaptureMetadata { - dataType := getDataType(method) - return &v1.DataCaptureMetadata{ - ComponentType: compAPI.String(), - ComponentName: compName, - MethodName: method, - Type: dataType, - MethodParameters: methodParams, - FileExtension: GetFileExt(dataType, method, additionalParams), - Tags: tags, - } -} - // IsDataCaptureFile returns whether or not f is a data capture file. func IsDataCaptureFile(f *os.File) bool { - return filepath.Ext(f.Name()) == CompletedCaptureFileExt || filepath.Ext(f.Name()) == InProgressCaptureFileExt -} - -// Create a filename based on the current time. -func getFileTimestampName() string { - // RFC3339Nano is a standard time format e.g. 2006-01-02T15:04:05Z07:00. - return time.Now().Format(time.RFC3339Nano) -} - -// TODO DATA-246: Implement this in some more robust, programmatic way. -func getDataType(methodName string) v1.DataType { - switch methodName { - case nextPointCloud, readImage, pointCloudMap, GetImages: - return v1.DataType_DATA_TYPE_BINARY_SENSOR - default: - return v1.DataType_DATA_TYPE_TABULAR_SENSOR - } + return filepath.Ext(f.Name()) == CompletedCaptureFileExt } // GetFileExt gets the file extension for a capture file. @@ -283,6 +353,28 @@ func GetFileExt(dataType v1.DataType, methodName string, parameters map[string]s return defaultFileExt } +// FilePathWithReplacedReservedChars returns the filepath with substitutions +// for reserved characters. +func FilePathWithReplacedReservedChars(filepath string) string { + return strings.ReplaceAll(filepath, filePathReservedChars, "_") +} + +// Create a filename based on the current time. +func getFileTimestampName() string { + // RFC3339Nano is a standard time format e.g. 2006-01-02T15:04:05Z07:00. + return time.Now().Format(time.RFC3339Nano) +} + +// TODO DATA-246: Implement this in some more robust, programmatic way. +func getDataType(methodName string) v1.DataType { + switch methodName { + case nextPointCloud, readImage, pointCloudMap, GetImages: + return v1.DataType_DATA_TYPE_BINARY_SENSOR + default: + return v1.DataType_DATA_TYPE_TABULAR_SENSOR + } +} + // SensorDataFromCaptureFilePath returns all readings in the file at filePath. // NOTE: (Nick S) At time of writing this is only used in tests. func SensorDataFromCaptureFilePath(filePath string) ([]*v1.SensorData, error) { @@ -291,7 +383,7 @@ func SensorDataFromCaptureFilePath(filePath string) ([]*v1.SensorData, error) { if err != nil { return nil, err } - dcFile, err := ReadCaptureFile(f) + dcFile, err := NewCaptureFile(f) if err != nil { return nil, err } @@ -317,9 +409,3 @@ func SensorDataFromCaptureFile(f *CaptureFile) ([]*v1.SensorData, error) { } return ret, nil } - -// CaptureFilePathWithReplacedReservedChars returns the filepath with substitutions -// for reserved characters. -func CaptureFilePathWithReplacedReservedChars(filepath string) string { - return strings.ReplaceAll(filepath, filePathReservedChars, "_") -} diff --git a/data/capture_file_test.go b/data/capture_file_test.go index 3de5756a88c..6f704f590a2 100644 --- a/data/capture_file_test.go +++ b/data/capture_file_test.go @@ -155,7 +155,7 @@ func TestReadCorruptedFile(t *testing.T) { md := &v1.DataCaptureMetadata{ Type: v1.DataType_DATA_TYPE_TABULAR_SENSOR, } - f, err := NewCaptureFile(dir, md) + f, err := NewProgFile(dir, md) test.That(t, err, test.ShouldBeNil) numReadings := 100 for i := 0; i < numReadings; i++ { @@ -167,10 +167,10 @@ func TestReadCorruptedFile(t *testing.T) { } _, err = f.writer.Write([]byte("invalid data")) test.That(t, err, test.ShouldBeNil) - test.That(t, f.writer.Flush(), test.ShouldBeNil) + test.That(t, f.Close(), test.ShouldBeNil) // Should still be able to successfully read all the successfully written data. - sd, err := SensorDataFromCaptureFilePath(f.GetPath()) + sd, err := SensorDataFromCaptureFilePath(captureFilePath(f.path)) test.That(t, err, test.ShouldBeNil) test.That(t, len(sd), test.ShouldEqual, numReadings) } diff --git a/data/collector.go b/data/collector.go index 948d74768b1..3259e97aeb5 100644 --- a/data/collector.go +++ b/data/collector.go @@ -189,14 +189,15 @@ func (c *collector) getAndPushNextReading() { return } + md := &v1.SensorMetadata{ + TimeRequested: timeRequested, + TimeReceived: timeReceived, + } var msg v1.SensorData switch v := reading.(type) { case []byte: msg = v1.SensorData{ - Metadata: &v1.SensorMetadata{ - TimeRequested: timeRequested, - TimeReceived: timeReceived, - }, + Metadata: md, Data: &v1.SensorData_Binary{ Binary: v, }, @@ -204,7 +205,6 @@ func (c *collector) getAndPushNextReading() { default: // If it's not bytes, it's a struct. var pbReading *structpb.Struct - var err error if reflect.TypeOf(reading) == reflect.TypeOf(pb.GetReadingsResponse{}) { // We special-case the GetReadingsResponse because it already contains @@ -216,18 +216,16 @@ func (c *collector) getAndPushNextReading() { ) pbReading = &structpb.Struct{Fields: topLevelMap} } else { - pbReading, err = protoutils.StructToStructPbIgnoreOmitEmpty(reading) + tmp, err := protoutils.StructToStructPbIgnoreOmitEmpty(reading) if err != nil { c.captureErrors <- errors.Wrap(err, "error while converting reading to structpb.Struct") return } + pbReading = tmp } msg = v1.SensorData{ - Metadata: &v1.SensorMetadata{ - TimeRequested: timeRequested, - TimeReceived: timeReceived, - }, + Metadata: md, Data: &v1.SensorData_Struct{ Struct: pbReading, }, diff --git a/data/prog_file.go b/data/prog_file.go new file mode 100644 index 00000000000..5924173a467 --- /dev/null +++ b/data/prog_file.go @@ -0,0 +1,117 @@ +package data + +import ( + "bufio" + "os" + "path/filepath" + "strings" + "sync" + + "github.com/matttproud/golang_protobuf_extensions/pbutil" + v1 "go.viam.com/api/app/datasync/v1" + "google.golang.org/protobuf/types/known/anypb" + + "go.viam.com/rdk/resource" +) + +// BuildCaptureMetadata builds a DataCaptureMetadata object. +func BuildCaptureMetadata( + compAPI resource.API, + compName string, + method string, + additionalParams map[string]string, + methodParams map[string]*anypb.Any, + tags []string, +) *v1.DataCaptureMetadata { + dataType := getDataType(method) + return &v1.DataCaptureMetadata{ + ComponentType: compAPI.String(), + ComponentName: compName, + MethodName: method, + Type: dataType, + MethodParameters: methodParams, + FileExtension: GetFileExt(dataType, method, additionalParams), + Tags: tags, + } +} + +// TODO Data-343: Reorganize this into a more standard interface/package, and add tests. + +// ProgFile is the data structure containing data captured by collectors. It is backed by a file on disk containing +// length delimited protobuf messages, where the first message is the CaptureMetadata for the file, and ensuing +// messages contain the captured data. +type ProgFile struct { + path string + lock sync.Mutex + file *os.File + writer *bufio.Writer + size int64 + writeOffset int64 +} + +// NewProgFile creates a new *ProgFile with the specified md in the specified directory. +func NewProgFile(dir string, md *v1.DataCaptureMetadata) (*ProgFile, error) { + fileName := FilePathWithReplacedReservedChars( + filepath.Join(dir, getFileTimestampName()) + InProgressCaptureFileExt) + //nolint:gosec + f, err := os.OpenFile(fileName, os.O_APPEND|os.O_RDWR|os.O_CREATE, 0o600) + if err != nil { + return nil, err + } + + // Then write first metadata message to the file. + n, err := pbutil.WriteDelimited(f, md) + if err != nil { + return nil, err + } + return &ProgFile{ + path: f.Name(), + writer: bufio.NewWriter(f), + file: f, + size: int64(n), + writeOffset: int64(n), + }, nil +} + +// WriteNext writes the next SensorData reading. +func (f *ProgFile) WriteNext(data *v1.SensorData) error { + f.lock.Lock() + defer f.lock.Unlock() + + if _, err := f.file.Seek(f.writeOffset, 0); err != nil { + return err + } + n, err := pbutil.WriteDelimited(f.writer, data) + if err != nil { + return err + } + f.size += int64(n) + f.writeOffset += int64(n) + return nil +} + +// Size returns the size of the file. +func (f *ProgFile) Size() int64 { + f.lock.Lock() + defer f.lock.Unlock() + return f.size +} + +// Close closes the file. +func (f *ProgFile) Close() error { + f.lock.Lock() + defer f.lock.Unlock() + if err := f.writer.Flush(); err != nil { + return err + } + + // Rename file to indicate that it is done being written. + if err := os.Rename(f.file.Name(), captureFilePath(f.file.Name())); err != nil { + return err + } + return f.file.Close() +} + +func captureFilePath(name string) string { + return strings.TrimSuffix(name, filepath.Ext(name)) + CompletedCaptureFileExt +} diff --git a/services/datamanager/builtin/builtin_sync_test.go b/services/datamanager/builtin/builtin_sync_test.go index 844b8772095..4b33cf466f3 100644 --- a/services/datamanager/builtin/builtin_sync_test.go +++ b/services/datamanager/builtin/builtin_sync_test.go @@ -1055,7 +1055,7 @@ func getCapturedData(filePaths []string) (int, []*v1.SensorData, error) { if err != nil { return 0, nil, err } - dcFile, err := data.ReadCaptureFile(osFile) + dcFile, err := data.NewCaptureFile(osFile) if err != nil { return 0, nil, err } diff --git a/services/datamanager/builtin/capture/capture.go b/services/datamanager/builtin/capture/capture.go index ebb5038a54e..c2c1abf613b 100644 --- a/services/datamanager/builtin/capture/capture.go +++ b/services/datamanager/builtin/capture/capture.go @@ -19,12 +19,7 @@ import ( "go.viam.com/rdk/services/datamanager" ) -// TODO: re-determine if queue size is optimal given we now support 10khz+ capture rates -// The Collector's queue should be big enough to ensure that .capture() is never blocked by the queue being -// written to disk. A default value of 250 was chosen because even with the fastest reasonable capture interval (1ms), -// this would leave 250ms for a (buffered) disk write before blocking, which seems sufficient for the size of -// writes this would be performing. -const defaultCaptureQueueSize = 250 +const defaultCaptureQueueSize = 0 // Default bufio.Writer buffer size in bytes. const defaultCaptureBufferSize = 4096 @@ -274,7 +269,7 @@ func collectorConfigDescription( } func targetDir(captureDir string, collectorConfig datamanager.DataCaptureConfig) string { - return data.CaptureFilePathWithReplacedReservedChars( + return data.FilePathWithReplacedReservedChars( filepath.Join(captureDir, collectorConfig.Name.API.String(), collectorConfig.Name.ShortName(), collectorConfig.Method)) } diff --git a/services/datamanager/builtin/config.go b/services/datamanager/builtin/config.go index 526429d9cdb..10b4dea8545 100644 --- a/services/datamanager/builtin/config.go +++ b/services/datamanager/builtin/config.go @@ -49,6 +49,7 @@ type Config struct { ScheduledSyncDisabled bool `json:"sync_disabled"` SelectiveSyncerName string `json:"selective_syncer_name"` SyncIntervalMins float64 `json:"sync_interval_mins"` + Flag bool `json:"flag"` } // Validate returns components which will be depended upon weakly due to the above matcher. @@ -119,6 +120,7 @@ func (c *Config) syncConfig(syncSensor sensor.Sensor, syncSensorEnabled bool, lo } return datasync.Config{ + Flag: c.Flag, AdditionalSyncPaths: c.AdditionalSyncPaths, Tags: c.Tags, CaptureDir: c.getCaptureDir(), diff --git a/services/datamanager/builtin/sync/config.go b/services/datamanager/builtin/sync/config.go index 833bce9622f..fd9f085c3f2 100644 --- a/services/datamanager/builtin/sync/config.go +++ b/services/datamanager/builtin/sync/config.go @@ -10,6 +10,7 @@ import ( // Config is the sync config from builtin. type Config struct { + Flag bool // AdditionalSyncPaths defines the file system paths // that should be synced in addition to the CaptureDir. // Generally 3rd party programs will write arbitrary diff --git a/services/datamanager/builtin/sync/sync.go b/services/datamanager/builtin/sync/sync.go index d99d7097fd2..fbf6cce0122 100644 --- a/services/datamanager/builtin/sync/sync.go +++ b/services/datamanager/builtin/sync/sync.go @@ -367,7 +367,7 @@ func (s *Sync) syncFile(config Config, filePath string) { } func (s *Sync) syncDataCaptureFile(f *os.File, captureDir string, logger logging.Logger) { - captureFile, err := data.ReadCaptureFile(f) + captureFile, err := data.NewCaptureFile(f) // if you can't read the capture file's metadata field, close & move it to the failed directory if err != nil { cause := errors.Wrap(err, "ReadCaptureFile failed") @@ -388,7 +388,7 @@ func (s *Sync) syncDataCaptureFile(f *os.File, captureDir string, logger logging retry := newExponentialRetry(s.configCtx, s.clock, s.logger, f.Name(), func(ctx context.Context) (uint64, error) { msg := "error uploading data capture file %s, size: %s, md: %s" errMetadata := fmt.Sprintf(msg, captureFile.GetPath(), data.FormatBytesI64(captureFile.Size()), captureFile.ReadMetadata()) - bytesUploaded, err := uploadDataCaptureFile(ctx, captureFile, s.cloudConn, logger) + bytesUploaded, err := uploadDataCaptureFile(ctx, captureFile, s.cloudConn, s.config.Flag, logger) if err != nil { return 0, errors.Wrap(err, errMetadata) } diff --git a/services/datamanager/builtin/sync/upload_data_capture_file.go b/services/datamanager/builtin/sync/upload_data_capture_file.go index c913d5f478f..f6bc28737d6 100644 --- a/services/datamanager/builtin/sync/upload_data_capture_file.go +++ b/services/datamanager/builtin/sync/upload_data_capture_file.go @@ -3,6 +3,7 @@ package sync import ( "context" "fmt" + "io" "github.com/docker/go-units" "github.com/go-viper/mapstructure/v2" @@ -25,34 +26,32 @@ var MaxUnaryFileSize = int64(units.MB) // uses StreamingDataCaptureUpload API so as to not exceed the unary response size. // Otherwise, uploads data over DataCaptureUpload API. // Note: the bytes size returned is the size of the input file. It only returns a non 0 value in the success case. -func uploadDataCaptureFile(ctx context.Context, f *data.CaptureFile, conn cloudConn, logger logging.Logger) (uint64, error) { +func uploadDataCaptureFile(ctx context.Context, f *data.CaptureFile, conn cloudConn, flag bool, logger logging.Logger) (uint64, error) { logger.Debugf("preparing to upload data capture file: %s, size: %d", f.GetPath(), f.Size()) md := f.ReadMetadata() + + // camera.GetImages is a special case. For that API we make 2 binary data upload requests + if md.GetType() == v1.DataType_DATA_TYPE_BINARY_SENSOR && md.GetMethodName() == data.GetImages { + return uint64(f.Size()), uploadGetImages(ctx, conn, md, f, logger) + } + + metaData := uploadMetadata(conn.partID, md, md.GetFileExtension()) + if md.GetType() == v1.DataType_DATA_TYPE_BINARY_SENSOR && flag { + return uint64(f.Size()), uploadChunkedBinaryData(ctx, conn.client, metaData, f, logger) + } + sensorData, err := data.SensorDataFromCaptureFile(f) if err != nil { return 0, errors.Wrap(err, "error reading sensor data") } - // Do not attempt to upload a file without any sensor readings. if len(sensorData) == 0 { logger.Warnf("ignoring and deleting empty capture file without syncing it: %s", f.GetPath()) // log here as this will delete a .capture file without uploading it and without moving it to the failed directory return 0, nil } - if md.GetType() == v1.DataType_DATA_TYPE_BINARY_SENSOR && len(sensorData) > 1 { - return 0, fmt.Errorf("binary sensor data file with more than one sensor reading is not supported: %s", f.GetPath()) - } - - // camera.GetImages is a special case. For that API we make 2 binary data upload requests - if md.GetType() == v1.DataType_DATA_TYPE_BINARY_SENSOR && md.GetMethodName() == data.GetImages { - logger.Debugf("attemping to upload camera.GetImages data: %s", f.GetPath()) - - return uint64(f.Size()), uploadGetImages(ctx, conn, md, sensorData[0], f.Size(), f.GetPath(), logger) - } - - metaData := uploadMetadata(conn.partID, md, md.GetFileExtension()) return uint64(f.Size()), uploadSensorData(ctx, conn.client, metaData, sensorData, f.Size(), f.GetPath(), logger) } @@ -73,11 +72,26 @@ func uploadGetImages( ctx context.Context, conn cloudConn, md *v1.DataCaptureMetadata, - sd *v1.SensorData, - size int64, - path string, + f *data.CaptureFile, logger logging.Logger, ) error { + logger.Debugf("attemping to upload camera.GetImages data: %s", f.GetPath()) + + sensorData, err := data.SensorDataFromCaptureFile(f) + if err != nil { + return errors.Wrap(err, "error reading sensor data") + } + + if len(sensorData) == 0 { + logger.Warnf("ignoring and deleting empty capture file without syncing it: %s", f.GetPath()) + // log here as this will delete a .capture file without uploading it and without moving it to the failed directory + return nil + } + + if len(sensorData) > 1 { + return fmt.Errorf("binary sensor data file with more than one sensor reading is not supported: %s", f.GetPath()) + } + sd := sensorData[0] var res pb.GetImagesResponse if err := mapstructure.Decode(sd.GetStruct().AsMap(), &res); err != nil { return errors.Wrap(err, "failed to decode camera.GetImagesResponse") @@ -100,7 +114,7 @@ func uploadGetImages( metadata := uploadMetadata(conn.partID, md, getFileExtFromImageFormat(img.GetFormat())) // TODO: This is wrong as the size describes the size of the entire GetImages response, but we are only // uploading one of the 2 images in that response here. - if err := uploadSensorData(ctx, conn.client, metadata, newSensorData, size, path, logger); err != nil { + if err := uploadSensorData(ctx, conn.client, metadata, newSensorData, f.Size(), f.GetPath(), logger); err != nil { return errors.Wrapf(err, "failed uploading GetImages image index: %d", i) } } @@ -123,6 +137,45 @@ func getImagesTimestamps(res *pb.GetImagesResponse, sensorData *v1.SensorData) ( return timeRequested, timeReceived } +func uploadChunkedBinaryData( + ctx context.Context, + client v1.DataSyncServiceClient, + uploadMD *v1.UploadMetadata, + f *data.CaptureFile, + logger logging.Logger, +) error { + // If it's a large binary file, we need to upload it in chunks. + logger.Debugf("attempting to upload large binary file using StreamingDataCaptureUpload, file: %s", f.GetPath()) + var smd v1.SensorMetadata + r, err := f.BinaryReader(&smd) + if err != nil { + return err + } + c, err := client.StreamingDataCaptureUpload(ctx) + if err != nil { + return errors.Wrap(err, "error creating StreamingDataCaptureUpload client") + } + + // First send metadata. + streamMD := &v1.StreamingDataCaptureUploadRequest_Metadata{ + Metadata: &v1.DataCaptureUploadMetadata{ + UploadMetadata: uploadMD, + SensorMetadata: &smd, + }, + } + if err := c.Send(&v1.StreamingDataCaptureUploadRequest{UploadPacket: streamMD}); err != nil { + return errors.Wrap(err, "StreamingDataCaptureUpload failed sending metadata") + } + + // Then call the function to send the rest. + if err := sendChunkedStreamingDCRequests(ctx, c, r, f.GetPath(), logger); err != nil { + return errors.Wrap(err, "StreamingDataCaptureUpload failed to sync") + } + + _, err = c.CloseAndRecv() + return errors.Wrap(err, "StreamingDataCaptureUpload CloseAndRecv failed") +} + func uploadSensorData( ctx context.Context, client v1.DataSyncServiceClient, @@ -171,6 +224,52 @@ func uploadSensorData( return errors.Wrap(err, "DataCaptureUpload failed") } +func sendChunkedStreamingDCRequests( + ctx context.Context, + stream v1.DataSyncService_StreamingDataCaptureUploadClient, + r io.Reader, + path string, + logger logging.Logger, +) error { + chunk := make([]byte, UploadChunkSize) + // Loop until there is no more content to send. + chunkCount := 0 + for { + select { + case <-ctx.Done(): + return ctx.Err() + default: + n, errRead := r.Read(chunk) + if n > 0 { + // if there is data, send it + // Build request with contents. + uploadReq := &v1.StreamingDataCaptureUploadRequest{ + UploadPacket: &v1.StreamingDataCaptureUploadRequest_Data{ + Data: chunk[:n], + }, + } + + // Send request + logger.Debugf("datasync.StreamingDataCaptureUpload sending chunk %d for file: %s", chunkCount, path) + if errSend := stream.Send(uploadReq); errSend != nil { + return errSend + } + } + + // if we reached the end of the file return nil err (success) + if errors.Is(errRead, io.EOF) { + return nil + } + + // if Read hit an unexpected error, return the error + if errRead != nil { + return errRead + } + chunkCount++ + } + } +} + func sendStreamingDCRequests( ctx context.Context, stream v1.DataSyncService_StreamingDataCaptureUploadClient,