diff --git a/data/capture_file.go b/data/capture_file.go index 5469d556419..9e552794299 100644 --- a/data/capture_file.go +++ b/data/capture_file.go @@ -29,9 +29,10 @@ const ( CompletedCaptureFileExt = ".capture" readImage = "ReadImage" // GetImages is used for getting simultaneous images from different imagers. - GetImages = "GetImages" - nextPointCloud = "NextPointCloud" - pointCloudMap = "PointCloudMap" + GetImages = "GetImages" + nextPointCloud = "NextPointCloud" + pointCloudMap = "PointCloudMap" + captureAllFromCamera = "CaptureAllFromCamera" // Non-exhaustive list of characters to strip from file paths, since not allowed // on certain file systems. filePathReservedChars = ":" diff --git a/data/collector_types.go b/data/collector_types.go index 8b680918562..7de961e6044 100644 --- a/data/collector_types.go +++ b/data/collector_types.go @@ -192,7 +192,7 @@ func (dt CaptureType) ToProto() datasyncPB.DataType { // GetDataType returns the DataType of the method. func GetDataType(methodName string) CaptureType { switch methodName { - case nextPointCloud, readImage, pointCloudMap, GetImages: + case nextPointCloud, readImage, pointCloudMap, GetImages, captureAllFromCamera: return CaptureTypeBinary default: return CaptureTypeTabular diff --git a/services/datamanager/builtin/sync/upload_data_capture_file_test.go b/services/datamanager/builtin/sync/upload_data_capture_file_test.go index 80d07b08970..a40518de77e 100644 --- a/services/datamanager/builtin/sync/upload_data_capture_file_test.go +++ b/services/datamanager/builtin/sync/upload_data_capture_file_test.go @@ -23,6 +23,7 @@ import ( "go.viam.com/rdk/logging" rprotoutils "go.viam.com/rdk/protoutils" "go.viam.com/rdk/resource" + "go.viam.com/rdk/services/vision" "go.viam.com/rdk/utils" ) @@ -82,6 +83,66 @@ func TestUploadDataCaptureFile(t *testing.T) { {Payload: largeBinaryPayload, MimeType: data.MimeTypeImageJpeg}, {Payload: largeBinaryPayload, MimeType: data.MimeTypeImagePng}, }) + conf := 0.888 + smallVisionCaptureAllFromCamera := data.NewBinaryCaptureResult(ts, []data.Binary{ + { + Payload: []byte("I'm a small binary jpeg result"), + MimeType: data.MimeTypeImageJpeg, + Annotations: data.Annotations{ + BoundingBoxes: []data.BoundingBox{ + { + Label: "a", + Confidence: &conf, + XMinNormalized: 1, + XMaxNormalized: 2, + YMinNormalized: 3, + YMaxNormalized: 4, + }, + { + Label: "b", + XMinNormalized: 5, + XMaxNormalized: 6, + YMinNormalized: 7, + YMaxNormalized: 8, + }, + }, + Classifications: []data.Classification{ + {Label: "a", Confidence: &conf}, + {Label: "b"}, + }, + }, + }, + }) + + largeVisionCaptureAllFromCamera := data.NewBinaryCaptureResult(ts, []data.Binary{ + { + Payload: largeBinaryPayload, + MimeType: data.MimeTypeImagePng, + Annotations: data.Annotations{ + BoundingBoxes: []data.BoundingBox{ + { + Label: "a", + Confidence: &conf, + XMinNormalized: 1, + XMaxNormalized: 2, + YMinNormalized: 3, + YMaxNormalized: 4, + }, + { + Label: "b", + XMinNormalized: 5, + XMaxNormalized: 6, + YMinNormalized: 7, + YMaxNormalized: 8, + }, + }, + Classifications: []data.Classification{ + {Label: "a", Confidence: &conf}, + {Label: "b"}, + }, + }, + }, + }) reqs0 := make(chan *v1.DataCaptureUploadRequest, 1) reqs1 := make(chan *v1.DataCaptureUploadRequest, 1) @@ -93,6 +154,12 @@ func TestUploadDataCaptureFile(t *testing.T) { make(chan *v1.StreamingDataCaptureUploadRequest, 100), make(chan *v1.StreamingDataCaptureUploadRequest, 100), } + reqs5 := make(chan *v1.DataCaptureUploadRequest, 2) + largeVisionCaptureAllFromCameraIdx := atomic.Int64{} + largeVisionCaptureAllFromCameraReqs := []chan *v1.StreamingDataCaptureUploadRequest{ + make(chan *v1.StreamingDataCaptureUploadRequest, 100), + } + tcs := []testCase{ { testName: "sensor readings", @@ -392,12 +459,102 @@ func TestUploadDataCaptureFile(t *testing.T) { }, steamingReqs: largeGetImagesReqs, }, - // { - // testName: "small vision.CaptureAllFromCamera", - // }, - // { - // testName: "large vision.CaptureAllFromCamera", - // }, + { + testName: "small vision.CaptureAllFromCamera", + captureResults: smallVisionCaptureAllFromCamera, + captureType: data.CaptureTypeBinary, + client: MockDataSyncServiceClient{ + T: t, + DataCaptureUploadFunc: func( + ctx context.Context, + in *v1.DataCaptureUploadRequest, + opts ...grpc.CallOption, + ) (*v1.DataCaptureUploadResponse, error) { + t.Log("called") + select { + case <-testCtx.Done(): + t.Error("timeout") + t.FailNow() + case reqs5 <- in: + } + return &v1.DataCaptureUploadResponse{}, nil + }, + }, + api: vision.API, + name: "vision-1", + method: "CaptureAllFromCamera", + tags: []string{"tag1", "tag2"}, + expectedUploads: []upload{ + { + md: &v1.UploadMetadata{ + ComponentName: "vision-1", + ComponentType: vision.API.String(), + FileExtension: ".jpeg", + MethodName: "CaptureAllFromCamera", + PartId: partID, + Tags: []string{"tag1", "tag2"}, + Type: v1.DataType_DATA_TYPE_BINARY_SENSOR, + }, + sd: smallVisionCaptureAllFromCamera.ToProto(), + }, + }, + unaryReqs: reqs5, + }, + { + testName: "large vision.CaptureAllFromCamera", + captureResults: largeVisionCaptureAllFromCamera, + captureType: data.CaptureTypeBinary, + client: MockDataSyncServiceClient{ + T: t, + StreamingDataCaptureUploadFunc: func( + ctx context.Context, + _ ...grpc.CallOption, + ) (v1.DataSyncService_StreamingDataCaptureUploadClient, error) { + mockStreamingClient := &ClientStreamingMock[ + *v1.StreamingDataCaptureUploadRequest, + *v1.StreamingDataCaptureUploadResponse, + ]{ + T: t, + SendFunc: func(in *v1.StreamingDataCaptureUploadRequest) error { + idx := largeVisionCaptureAllFromCameraIdx.Load() + t.Logf("writing to index: %d", idx) + ch := largeVisionCaptureAllFromCameraReqs[idx] + select { + case <-testCtx.Done(): + t.Error("timeout") + t.FailNow() + case ch <- in: + } + return nil + }, + CloseAndRecvFunc: func() (*v1.StreamingDataCaptureUploadResponse, error) { + close(largeVisionCaptureAllFromCameraReqs[largeVisionCaptureAllFromCameraIdx.Add(1)-1]) + return &v1.StreamingDataCaptureUploadResponse{}, nil + }, + } + return mockStreamingClient, nil + }, + }, + api: vision.API, + name: "vision-1", + method: "CaptureAllFromCamera", + tags: []string{"tag1", "tag2"}, + expectedUploads: []upload{ + { + md: &v1.UploadMetadata{ + ComponentName: "vision-1", + ComponentType: vision.API.String(), + FileExtension: ".png", + MethodName: "CaptureAllFromCamera", + PartId: partID, + Tags: []string{"tag1", "tag2"}, + Type: v1.DataType_DATA_TYPE_BINARY_SENSOR, + }, + sd: largeVisionCaptureAllFromCamera.ToProto(), + }, + }, + steamingReqs: largeVisionCaptureAllFromCameraReqs, + }, } tempDir := t.TempDir() @@ -432,12 +589,14 @@ func TestUploadDataCaptureFile(t *testing.T) { test.That(t, bytesUploaded, test.ShouldEqual, stat.Size()) if tc.unaryReqs != nil { for i := 0; i < len(tc.expectedUploads); i++ { + t.Logf("unaryReqs: i: %d", i) tc.expectedUploads[i].md.MethodParameters = methodParams select { case <-testCtx.Done(): t.Error("timeout") t.FailNow() case req := <-tc.unaryReqs: + t.Logf("got req\n") test.That(t, len(tc.expectedUploads[i].sd), test.ShouldEqual, 1) test.That(t, req.Metadata.FileExtension, test.ShouldResemble, tc.expectedUploads[i].md.FileExtension) test.That(t, req.Metadata, test.ShouldResemble, tc.expectedUploads[i].md)