Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
nicksanford committed Nov 21, 2024
1 parent e48a8ec commit 24f1702
Show file tree
Hide file tree
Showing 3 changed files with 170 additions and 10 deletions.
7 changes: 4 additions & 3 deletions data/capture_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,10 @@ const (
CompletedCaptureFileExt = ".capture"
readImage = "ReadImage"
// GetImages is used for getting simultaneous images from different imagers.
GetImages = "GetImages"
nextPointCloud = "NextPointCloud"
pointCloudMap = "PointCloudMap"
GetImages = "GetImages"
nextPointCloud = "NextPointCloud"
pointCloudMap = "PointCloudMap"
captureAllFromCamera = "CaptureAllFromCamera"
// Non-exhaustive list of characters to strip from file paths, since not allowed
// on certain file systems.
filePathReservedChars = ":"
Expand Down
2 changes: 1 addition & 1 deletion data/collector_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ func (dt CaptureType) ToProto() datasyncPB.DataType {
// GetDataType returns the DataType of the method.
func GetDataType(methodName string) CaptureType {
switch methodName {
case nextPointCloud, readImage, pointCloudMap, GetImages:
case nextPointCloud, readImage, pointCloudMap, GetImages, captureAllFromCamera:
return CaptureTypeBinary
default:
return CaptureTypeTabular
Expand Down
171 changes: 165 additions & 6 deletions services/datamanager/builtin/sync/upload_data_capture_file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"go.viam.com/rdk/logging"
rprotoutils "go.viam.com/rdk/protoutils"
"go.viam.com/rdk/resource"
"go.viam.com/rdk/services/vision"
"go.viam.com/rdk/utils"
)

Expand Down Expand Up @@ -82,6 +83,66 @@ func TestUploadDataCaptureFile(t *testing.T) {
{Payload: largeBinaryPayload, MimeType: data.MimeTypeImageJpeg},
{Payload: largeBinaryPayload, MimeType: data.MimeTypeImagePng},
})
conf := 0.888
smallVisionCaptureAllFromCamera := data.NewBinaryCaptureResult(ts, []data.Binary{
{
Payload: []byte("I'm a small binary jpeg result"),
MimeType: data.MimeTypeImageJpeg,
Annotations: data.Annotations{
BoundingBoxes: []data.BoundingBox{
{
Label: "a",
Confidence: &conf,
XMinNormalized: 1,
XMaxNormalized: 2,
YMinNormalized: 3,
YMaxNormalized: 4,
},
{
Label: "b",
XMinNormalized: 5,
XMaxNormalized: 6,
YMinNormalized: 7,
YMaxNormalized: 8,
},
},
Classifications: []data.Classification{
{Label: "a", Confidence: &conf},
{Label: "b"},
},
},
},
})

largeVisionCaptureAllFromCamera := data.NewBinaryCaptureResult(ts, []data.Binary{
{
Payload: largeBinaryPayload,
MimeType: data.MimeTypeImagePng,
Annotations: data.Annotations{
BoundingBoxes: []data.BoundingBox{
{
Label: "a",
Confidence: &conf,
XMinNormalized: 1,
XMaxNormalized: 2,
YMinNormalized: 3,
YMaxNormalized: 4,
},
{
Label: "b",
XMinNormalized: 5,
XMaxNormalized: 6,
YMinNormalized: 7,
YMaxNormalized: 8,
},
},
Classifications: []data.Classification{
{Label: "a", Confidence: &conf},
{Label: "b"},
},
},
},
})

reqs0 := make(chan *v1.DataCaptureUploadRequest, 1)
reqs1 := make(chan *v1.DataCaptureUploadRequest, 1)
Expand All @@ -93,6 +154,12 @@ func TestUploadDataCaptureFile(t *testing.T) {
make(chan *v1.StreamingDataCaptureUploadRequest, 100),
make(chan *v1.StreamingDataCaptureUploadRequest, 100),
}
reqs5 := make(chan *v1.DataCaptureUploadRequest, 2)
largeVisionCaptureAllFromCameraIdx := atomic.Int64{}
largeVisionCaptureAllFromCameraReqs := []chan *v1.StreamingDataCaptureUploadRequest{
make(chan *v1.StreamingDataCaptureUploadRequest, 100),
}

tcs := []testCase{
{
testName: "sensor readings",
Expand Down Expand Up @@ -392,12 +459,102 @@ func TestUploadDataCaptureFile(t *testing.T) {
},
steamingReqs: largeGetImagesReqs,
},
// {
// testName: "small vision.CaptureAllFromCamera",
// },
// {
// testName: "large vision.CaptureAllFromCamera",
// },
{
testName: "small vision.CaptureAllFromCamera",
captureResults: smallVisionCaptureAllFromCamera,
captureType: data.CaptureTypeBinary,
client: MockDataSyncServiceClient{
T: t,
DataCaptureUploadFunc: func(
ctx context.Context,
in *v1.DataCaptureUploadRequest,
opts ...grpc.CallOption,
) (*v1.DataCaptureUploadResponse, error) {
t.Log("called")
select {
case <-testCtx.Done():
t.Error("timeout")
t.FailNow()
case reqs5 <- in:
}
return &v1.DataCaptureUploadResponse{}, nil
},
},
api: vision.API,
name: "vision-1",
method: "CaptureAllFromCamera",
tags: []string{"tag1", "tag2"},
expectedUploads: []upload{
{
md: &v1.UploadMetadata{
ComponentName: "vision-1",
ComponentType: vision.API.String(),
FileExtension: ".jpeg",
MethodName: "CaptureAllFromCamera",
PartId: partID,
Tags: []string{"tag1", "tag2"},
Type: v1.DataType_DATA_TYPE_BINARY_SENSOR,
},
sd: smallVisionCaptureAllFromCamera.ToProto(),
},
},
unaryReqs: reqs5,
},
{
testName: "large vision.CaptureAllFromCamera",
captureResults: largeVisionCaptureAllFromCamera,
captureType: data.CaptureTypeBinary,
client: MockDataSyncServiceClient{
T: t,
StreamingDataCaptureUploadFunc: func(
ctx context.Context,
_ ...grpc.CallOption,
) (v1.DataSyncService_StreamingDataCaptureUploadClient, error) {
mockStreamingClient := &ClientStreamingMock[
*v1.StreamingDataCaptureUploadRequest,
*v1.StreamingDataCaptureUploadResponse,
]{
T: t,
SendFunc: func(in *v1.StreamingDataCaptureUploadRequest) error {
idx := largeVisionCaptureAllFromCameraIdx.Load()
t.Logf("writing to index: %d", idx)
ch := largeVisionCaptureAllFromCameraReqs[idx]
select {
case <-testCtx.Done():
t.Error("timeout")
t.FailNow()
case ch <- in:
}
return nil
},
CloseAndRecvFunc: func() (*v1.StreamingDataCaptureUploadResponse, error) {
close(largeVisionCaptureAllFromCameraReqs[largeVisionCaptureAllFromCameraIdx.Add(1)-1])
return &v1.StreamingDataCaptureUploadResponse{}, nil
},
}
return mockStreamingClient, nil
},
},
api: vision.API,
name: "vision-1",
method: "CaptureAllFromCamera",
tags: []string{"tag1", "tag2"},
expectedUploads: []upload{
{
md: &v1.UploadMetadata{
ComponentName: "vision-1",
ComponentType: vision.API.String(),
FileExtension: ".png",
MethodName: "CaptureAllFromCamera",
PartId: partID,
Tags: []string{"tag1", "tag2"},
Type: v1.DataType_DATA_TYPE_BINARY_SENSOR,
},
sd: largeVisionCaptureAllFromCamera.ToProto(),
},
},
steamingReqs: largeVisionCaptureAllFromCameraReqs,
},
}

tempDir := t.TempDir()
Expand Down Expand Up @@ -432,12 +589,14 @@ func TestUploadDataCaptureFile(t *testing.T) {
test.That(t, bytesUploaded, test.ShouldEqual, stat.Size())
if tc.unaryReqs != nil {
for i := 0; i < len(tc.expectedUploads); i++ {
t.Logf("unaryReqs: i: %d", i)
tc.expectedUploads[i].md.MethodParameters = methodParams
select {
case <-testCtx.Done():
t.Error("timeout")
t.FailNow()
case req := <-tc.unaryReqs:
t.Logf("got req\n")
test.That(t, len(tc.expectedUploads[i].sd), test.ShouldEqual, 1)
test.That(t, req.Metadata.FileExtension, test.ShouldResemble, tc.expectedUploads[i].md.FileExtension)
test.That(t, req.Metadata, test.ShouldResemble, tc.expectedUploads[i].md)
Expand Down

0 comments on commit 24f1702

Please sign in to comment.