Skip to content

Commit

Permalink
DATA-3338-fix-stability-of-vision-capture-all-from-camera
Browse files Browse the repository at this point in the history
  • Loading branch information
nicksanford committed Nov 4, 2024
1 parent 02e1b4a commit 1d17e45
Show file tree
Hide file tree
Showing 7 changed files with 346 additions and 94 deletions.
54 changes: 34 additions & 20 deletions data/capture_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ import (

// CaptureBufferedWriter is a buffered, persistent queue of SensorData.
type CaptureBufferedWriter interface {
Write(item *v1.SensorData) error
WriteBinary(items []*v1.SensorData) error
WriteTabular(items []*v1.SensorData) error
Flush() error
Path() string
}
Expand All @@ -32,38 +33,45 @@ func NewCaptureBuffer(dir string, md *v1.DataCaptureMetadata, maxCaptureFileSize
}

// Write writes item onto b. Binary sensor data is written to its own file.
// Tabular data is written to disk in maxCaptureFileSize sized files. Files that
// are still being written to are indicated with the extension
// InProgressFileExt. Files that have finished being written to are indicated by
// FileExt.
func (b *CaptureBuffer) Write(item *v1.SensorData) error {
// Files that are still being written to are indicated with the extension
// '.prog'.
// Files that have finished being written to are indicated by
// '.capture'.
func (b *CaptureBuffer) WriteBinary(items []*v1.SensorData) error {
b.lock.Lock()
defer b.lock.Unlock()

if item.GetBinary() != nil {
binFile, err := NewCaptureFile(b.Directory, b.MetaData)
if err != nil {
return err
}
binFile, err := NewCaptureFile(b.Directory, b.MetaData)
if err != nil {
return err
}
for _, item := range items {
if err := binFile.WriteNext(item); err != nil {
return err
}
if err := binFile.Close(); err != nil {
return err
}
return nil
}
if err := binFile.Close(); err != nil {
return err
}
return nil
}

// Tabular data is written to disk in maxCaptureFileSize sized files.
// Files that are still being written to are indicated with the extension
// '.prog'.
// Files that have finished being written to are indicated by
// '.capture'.
func (b *CaptureBuffer) WriteTabular(items []*v1.SensorData) error {
b.lock.Lock()
defer b.lock.Unlock()

if b.nextFile == nil {
nextFile, err := NewCaptureFile(b.Directory, b.MetaData)
if err != nil {
return err
}
b.nextFile = nextFile
// We want to special case on "CaptureAllFromCamera" because it is sensor data that contains images
// and their corresponding annotations. We want each image and its annotations to be stored in a
// separate file.
} else if b.nextFile.Size() > b.maxCaptureFileSize || b.MetaData.MethodName == "CaptureAllFromCamera" {
} else if b.nextFile.Size() > b.maxCaptureFileSize {
if err := b.nextFile.Close(); err != nil {
return err
}
Expand All @@ -74,7 +82,13 @@ func (b *CaptureBuffer) Write(item *v1.SensorData) error {
b.nextFile = nextFile
}

return b.nextFile.WriteNext(item)
for _, item := range items {
if err := b.nextFile.WriteNext(item); err != nil {
return err
}
}

return nil
}

// Flush flushes all buffered data to disk and marks any in progress file as complete.
Expand Down
48 changes: 31 additions & 17 deletions data/capture_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,17 +216,17 @@ func BuildCaptureMetadata(
additionalParams map[string]string,
methodParams map[string]*anypb.Any,
tags []string,
) *v1.DataCaptureMetadata {
dataType := getDataType(method)
) (*v1.DataCaptureMetadata, DataType) {
dataType := GetDataType(method)
return &v1.DataCaptureMetadata{
ComponentType: compAPI.String(),
ComponentName: compName,
MethodName: method,
Type: dataType,
Type: dataType.ToProto(),
MethodParameters: methodParams,
FileExtension: GetFileExt(dataType, method, additionalParams),
FileExtension: getFileExt(dataType, method, additionalParams),
Tags: tags,
}
}, dataType
}

// IsDataCaptureFile returns whether or not f is a data capture file.
Expand All @@ -240,25 +240,41 @@ func getFileTimestampName() string {
return time.Now().Format(time.RFC3339Nano)
}

// TODO DATA-246: Implement this in some more robust, programmatic way.
func getDataType(methodName string) v1.DataType {
type DataType int

const (
DataTypeUnspecified DataType = iota
DataTypeTabular
DataTypeBinary
)

func (dt DataType) ToProto() v1.DataType {
switch dt {
case DataTypeTabular:
return v1.DataType_DATA_TYPE_TABULAR_SENSOR
case DataTypeBinary:
return v1.DataType_DATA_TYPE_BINARY_SENSOR
default:
return v1.DataType_DATA_TYPE_UNSPECIFIED
}
}

func GetDataType(methodName string) DataType {
switch methodName {
case nextPointCloud, readImage, pointCloudMap, GetImages:
return v1.DataType_DATA_TYPE_BINARY_SENSOR
return DataTypeBinary
default:
return v1.DataType_DATA_TYPE_TABULAR_SENSOR
return DataTypeTabular
}
}

// GetFileExt gets the file extension for a capture file.
func GetFileExt(dataType v1.DataType, methodName string, parameters map[string]string) string {
// getFileExt gets the file extension for a capture file.
func getFileExt(dataType DataType, methodName string, parameters map[string]string) string {
defaultFileExt := ""
switch dataType {
case v1.DataType_DATA_TYPE_TABULAR_SENSOR:
case DataTypeTabular:
return ".dat"
case v1.DataType_DATA_TYPE_FILE:
return defaultFileExt
case v1.DataType_DATA_TYPE_BINARY_SENSOR:
case DataTypeBinary:
if methodName == nextPointCloud {
return ".pcd"
}
Expand All @@ -275,8 +291,6 @@ func GetFileExt(dataType v1.DataType, methodName string, parameters map[string]s
return defaultFileExt
}
}
case v1.DataType_DATA_TYPE_UNSPECIFIED:
return defaultFileExt
default:
return defaultFileExt
}
Expand Down
Loading

0 comments on commit 1d17e45

Please sign in to comment.