diff --git a/app/data_client.go b/app/data_client.go index 8578eefc9e2..40206537003 100644 --- a/app/data_client.go +++ b/app/data_client.go @@ -3,11 +3,16 @@ package app import ( "context" + "errors" + "fmt" + "os" + "path/filepath" "time" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" pb "go.viam.com/api/app/data/v1" + syncPb "go.viam.com/api/app/datasync/v1" "go.viam.com/utils/rpc" "google.golang.org/protobuf/types/known/anypb" "google.golang.org/protobuf/types/known/structpb" @@ -18,9 +23,14 @@ import ( // DataClient implements the DataServiceClient interface. type DataClient struct { - client pb.DataServiceClient + client pb.DataServiceClient + dataSyncClient syncPb.DataSyncServiceClient } +const ( + UploadChunkSize = 64 * 1024 // UploadChunkSize is 64 KB +) + // Order specifies the order in which data is returned. type Order int32 @@ -177,13 +187,111 @@ type DatabaseConnReturn struct { HasDatabaseUser bool } +// DataSyncClient structs + +// SensorMetadata contains the time the sensor data was requested and was received. +type SensorMetadata struct { + TimeRequested time.Time + TimeReceived time.Time + MimeType MimeType + Annotations Annotations +} + +// SensorData contains the contents and metadata for tabular data. +type SensorData struct { + Metadata SensorMetadata + SDStruct map[string]interface{} + SDBinary []byte +} + +// DataType specifies the type of data uploaded. +type DataType int32 + +// DataType constants define the possible DataType options. +const ( + DataTypeUnspecified DataType = iota + DataTypeBinarySensor + DataTypeTabularSensor + DataTypeFile +) + +// MimeType specifies the format of a file being uploaded. +type MimeType int32 + +// MimeType constants define the possible MimeType options. +const ( + MimeTypeUnspecified MimeType = iota + MimeTypeJPEG + MimeTypePNG + MimeTypePCD +) + +// UploadMetadata contains the metadata for binary (image + file) data. +type UploadMetadata struct { + PartID string + ComponentType string + ComponentName string + MethodName string + Type DataType + FileName string + MethodParameters map[string]interface{} + FileExtension string + Tags []string +} + +// FileData contains the contents of binary (image + file) data. +type FileData struct { + Data []byte +} + +// BinaryOptions represents optional parameters for the BinaryDataCaptureUpload method. +type BinaryOptions struct { + Type DataType + FileName string + MethodParameters map[string]interface{} + Tags []string + DataRequestTimes [2]time.Time +} + +// TabularOptions represents optional parameters for the TabularDataCaptureUpload method. +type TabularOptions struct { + Type DataType + FileName string + MethodParameters map[string]interface{} + FileExtension string + Tags []string +} + +// StreamingOptions represents optional parameters for the StreamingDataCaptureUpload method. +type StreamingOptions struct { + ComponentType string + ComponentName string + MethodName string + Type DataType + FileName string + MethodParameters map[string]interface{} + Tags []string + DataRequestTimes [2]time.Time +} + +// FileUploadOptions represents optional parameters for the FileUploadFromPath & FileUploadFromBytes methods. +type FileUploadOptions struct { + ComponentType string + ComponentName string + MethodName string + FileName string + MethodParameters map[string]interface{} + FileExtension string + Tags []string +} + // NewDataClient constructs a new DataClient using the connection passed in by the viamClient. -func NewDataClient( - conn rpc.ClientConn, -) *DataClient { +func NewDataClient(conn rpc.ClientConn) *DataClient { d := pb.NewDataServiceClient(conn) + s := syncPb.NewDataSyncServiceClient(conn) return &DataClient{ - client: d, + client: d, + dataSyncClient: s, } } @@ -243,27 +351,6 @@ func captureMetadataFromProto(proto *pb.CaptureMetadata) CaptureMetadata { } } -func captureMetadataToProto(metadata CaptureMetadata) *pb.CaptureMetadata { - methodParams, err := protoutils.ConvertMapToProtoAny(metadata.MethodParameters) - if err != nil { - return nil - } - return &pb.CaptureMetadata{ - OrganizationId: metadata.OrganizationID, - LocationId: metadata.LocationID, - RobotName: metadata.RobotName, - RobotId: metadata.RobotID, - PartName: metadata.PartName, - PartId: metadata.PartID, - ComponentType: metadata.ComponentType, - ComponentName: metadata.ComponentName, - MethodName: metadata.MethodName, - MethodParameters: methodParams, - Tags: metadata.Tags, - MimeType: metadata.MimeType, - } -} - func binaryDataFromProto(proto *pb.BinaryData) BinaryData { return BinaryData{ Binary: proto.Binary, @@ -430,12 +517,12 @@ func (d *DataClient) TabularDataByFilter( } // TabularData contains tabular data and associated metadata dataArray := []TabularData{} - var metadata *pb.CaptureMetadata for _, data := range resp.Data { - if len(resp.Metadata) > 0 && int(data.MetadataIndex) < len(resp.Metadata) { + var metadata *pb.CaptureMetadata + switch { + case len(resp.Metadata) > 0 && int(data.MetadataIndex) < len(resp.Metadata): metadata = resp.Metadata[data.MetadataIndex] - } else { - // Use an empty CaptureMetadata as a fallback + default: metadata = &pb.CaptureMetadata{} } dataArray = append(dataArray, tabularDataFromProto(data, metadata)) @@ -761,3 +848,374 @@ func (d *DataClient) RemoveBinaryDataFromDatasetByIDs( }) return err } + +func uploadMetadataToProto(metadata UploadMetadata) *syncPb.UploadMetadata { + methodParams, err := protoutils.ConvertMapToProtoAny(metadata.MethodParameters) + if err != nil { + return nil + } + return &syncPb.UploadMetadata{ + PartId: metadata.PartID, + ComponentType: metadata.ComponentType, + ComponentName: metadata.ComponentName, + MethodName: metadata.MethodName, + Type: syncPb.DataType(metadata.Type), + FileName: metadata.FileName, + MethodParameters: methodParams, + FileExtension: metadata.FileExtension, + Tags: metadata.Tags, + } +} + +func annotationsToProto(annotations Annotations) *pb.Annotations { + var protoBboxes []*pb.BoundingBox + for _, bbox := range annotations.Bboxes { + protoBboxes = append(protoBboxes, &pb.BoundingBox{ + Id: bbox.ID, + Label: bbox.Label, + XMinNormalized: bbox.XMinNormalized, + YMinNormalized: bbox.YMinNormalized, + XMaxNormalized: bbox.XMaxNormalized, + YMaxNormalized: bbox.YMaxNormalized, + }) + } + return &pb.Annotations{ + Bboxes: protoBboxes, + } +} + +func sensorMetadataToProto(metadata SensorMetadata) *syncPb.SensorMetadata { + return &syncPb.SensorMetadata{ + TimeRequested: timestamppb.New(metadata.TimeRequested), + TimeReceived: timestamppb.New(metadata.TimeReceived), + MimeType: syncPb.MimeType(metadata.MimeType), + Annotations: annotationsToProto(metadata.Annotations), + } +} + +func sensorDataToProto(sensorData SensorData) *syncPb.SensorData { + protoSensorData := &syncPb.SensorData{ + Metadata: sensorMetadataToProto(sensorData.Metadata), + } + switch { + case len(sensorData.SDBinary) > 0: + protoSensorData.Data = &syncPb.SensorData_Binary{ + Binary: sensorData.SDBinary, + } + case sensorData.SDStruct != nil: + pbStruct, err := structpb.NewStruct(sensorData.SDStruct) + if err != nil { + return nil + } + protoSensorData.Data = &syncPb.SensorData_Struct{ + Struct: pbStruct, + } + default: + return nil + } + return protoSensorData +} + +func sensorContentsToProto(sensorContents []SensorData) []*syncPb.SensorData { + var protoSensorContents []*syncPb.SensorData + for _, item := range sensorContents { + protoSensorContents = append(protoSensorContents, sensorDataToProto(item)) + } + return protoSensorContents +} + +func formatFileExtension(fileExt string) string { + if fileExt == "" { + return fileExt + } + if fileExt[0] == '.' { + return fileExt + } + return "." + fileExt +} + +// BinaryDataCaptureUpload uploads the contents and metadata for binary data. +func (d *DataClient) BinaryDataCaptureUpload( + ctx context.Context, + binaryData []byte, + partID string, + componentType string, + componentName string, + methodName string, + fileExtension string, + options *BinaryOptions, +) (string, error) { + var sensorMetadata SensorMetadata + if len(options.DataRequestTimes) == 2 { + sensorMetadata = SensorMetadata{ + TimeRequested: options.DataRequestTimes[0], + TimeReceived: options.DataRequestTimes[1], + } + } + sensorData := SensorData{ + Metadata: sensorMetadata, + SDStruct: nil, + SDBinary: binaryData, + } + metadata := UploadMetadata{ + PartID: partID, + ComponentType: componentType, + ComponentName: componentName, + MethodName: methodName, + Type: DataTypeBinarySensor, + FileName: options.FileName, + MethodParameters: options.MethodParameters, + FileExtension: formatFileExtension(fileExtension), + Tags: options.Tags, + } + response, err := d.DataCaptureUpload(ctx, metadata, []SensorData{sensorData}) + if err != nil { + return "", err + } + return response, nil +} + +// TabularDataCaptureUpload uploads the contents and metadata for tabular data. +func (d *DataClient) tabularDataCaptureUpload( + ctx context.Context, + tabularData []map[string]interface{}, + partID string, + componentType string, + componentName string, + methodName string, + dataRequestTimes [][2]time.Time, + options *TabularOptions, +) (string, error) { + if len(dataRequestTimes) != len(tabularData) { + return "", errors.New("dataRequestTimes and tabularData lengths must be equal") + } + var sensorContents []SensorData + for i, tabData := range tabularData { + sensorMetadata := SensorMetadata{} + dates := dataRequestTimes[i] + if len(dates) == 2 { + sensorMetadata.TimeRequested = dates[0] + sensorMetadata.TimeReceived = dates[1] + } + sensorData := SensorData{ + Metadata: sensorMetadata, + SDStruct: tabData, + SDBinary: nil, + } + sensorContents = append(sensorContents, sensorData) + } + metadata := UploadMetadata{ + PartID: partID, + ComponentType: componentType, + ComponentName: componentName, + MethodName: methodName, + Type: DataTypeTabularSensor, + FileName: options.FileName, + MethodParameters: options.MethodParameters, + FileExtension: formatFileExtension(options.FileExtension), + Tags: options.Tags, + } + response, err := d.DataCaptureUpload(ctx, metadata, sensorContents) + if err != nil { + return "", err + } + return response, nil +} + +// DataCaptureUpload uploads the metadata and contents for either tabular or binary data, +// and returns the file ID associated with the uploaded data and metadata. +func (d *DataClient) DataCaptureUpload(ctx context.Context, metadata UploadMetadata, sensorContents []SensorData) (string, error) { + resp, err := d.dataSyncClient.DataCaptureUpload(ctx, &syncPb.DataCaptureUploadRequest{ + Metadata: uploadMetadataToProto(metadata), + SensorContents: sensorContentsToProto(sensorContents), + }) + if err != nil { + return "", err + } + return resp.FileId, nil +} + +// StreamingDataCaptureUpload uploads metadata and streaming binary data in chunks. +func (d *DataClient) StreamingDataCaptureUpload( + ctx context.Context, + data []byte, + partID string, + fileExt string, + options *StreamingOptions, +) (string, error) { + uploadMetadata := UploadMetadata{ + PartID: partID, + ComponentType: options.ComponentType, + ComponentName: options.ComponentName, + MethodName: options.MethodName, + Type: DataTypeBinarySensor, + FileName: options.FileName, + MethodParameters: options.MethodParameters, + FileExtension: fileExt, + Tags: options.Tags, + } + uploadMetadataPb := uploadMetadataToProto(uploadMetadata) + var sensorMetadata SensorMetadata + if len(options.DataRequestTimes) == 2 { + sensorMetadata = SensorMetadata{ + TimeRequested: options.DataRequestTimes[0], + TimeReceived: options.DataRequestTimes[1], + } + } + sensorMetadataPb := sensorMetadataToProto(sensorMetadata) + metadata := &syncPb.DataCaptureUploadMetadata{ + UploadMetadata: uploadMetadataPb, + SensorMetadata: sensorMetadataPb, + } + // establish a streaming connection. + stream, err := d.dataSyncClient.StreamingDataCaptureUpload(ctx) + if err != nil { + return "", err + } + // send the metadata as the first packet. + metaReq := &syncPb.StreamingDataCaptureUploadRequest{ + UploadPacket: &syncPb.StreamingDataCaptureUploadRequest_Metadata{ + Metadata: metadata, + }, + } + if err := stream.Send(metaReq); err != nil { + return "", err + } + + // send the binary data in chunks. + for start := 0; start < len(data); start += UploadChunkSize { + end := start + UploadChunkSize + if end > len(data) { + end = len(data) + } + dataReq := &syncPb.StreamingDataCaptureUploadRequest{ + UploadPacket: &syncPb.StreamingDataCaptureUploadRequest_Data{ + Data: data[start:end], + }, + } + if err := stream.Send(dataReq); err != nil { + return "", err + } + } + // close the stream and get the response. + resp, err := stream.CloseAndRecv() + if err != nil { + return "", err + } + return resp.FileId, nil +} + +// FileUploadFromBytes uploads the contents and metadata for binary data such as encoded images or other data represented by bytes. +func (d *DataClient) FileUploadFromBytes( + ctx context.Context, + partID string, + data []byte, + opts *FileUploadOptions, +) (string, error) { + methodParams, err := protoutils.ConvertMapToProtoAny(opts.MethodParameters) + if err != nil { + return "", err + } + metadata := &syncPb.UploadMetadata{ + PartId: partID, + ComponentType: opts.ComponentType, + ComponentName: opts.ComponentName, + MethodName: opts.MethodName, + Type: syncPb.DataType_DATA_TYPE_FILE, + MethodParameters: methodParams, + Tags: opts.Tags, + } + + if opts.FileName == "" { + metadata.FileName = time.Now().String() + } else { + metadata.FileName = opts.FileName + metadata.FileExtension = opts.FileExtension + } + return d.fileUploadStreamResp(metadata, data) +} + +// FileUploadFromPath uploads the contents and metadata for binary data created from a filepath. +func (d *DataClient) FileUploadFromPath( + ctx context.Context, + partID string, + filePath string, + opts *FileUploadOptions, +) (string, error) { + methodParams, err := protoutils.ConvertMapToProtoAny(opts.MethodParameters) + if err != nil { + return "", err + } + metadata := &syncPb.UploadMetadata{ + PartId: partID, + ComponentType: opts.ComponentType, + ComponentName: opts.ComponentName, + MethodName: opts.MethodName, + Type: syncPb.DataType_DATA_TYPE_FILE, + MethodParameters: methodParams, + Tags: opts.Tags, + } + if opts.FileName == "" { + if filePath != "" { + metadata.FileName = filepath.Base(filePath) + metadata.FileExtension = filepath.Ext(filePath) + } else { + metadata.FileName = time.Now().String() + } + } else { + metadata.FileName = opts.FileName + metadata.FileExtension = opts.FileExtension + } + var data []byte + // Prepare file data from filepath + if filePath != "" { + //nolint:gosec + fileData, err := os.ReadFile(filePath) + if err != nil { + return "", err + } + data = fileData + } + return d.fileUploadStreamResp(metadata, data) +} + +func (d *DataClient) fileUploadStreamResp(metadata *syncPb.UploadMetadata, data []byte) (string, error) { + // establish a streaming connection. + stream, err := d.dataSyncClient.FileUpload(context.Background()) + if err != nil { + return "", err + } + // send the metadata as the first packet. + metaReq := &syncPb.FileUploadRequest{ + UploadPacket: &syncPb.FileUploadRequest_Metadata{ + Metadata: metadata, + }, + } + if err := stream.Send(metaReq); err != nil { + return "", fmt.Errorf("failed to send metadata: %w", err) + } + // send file contents in chunks + for start := 0; start < len(data); start += UploadChunkSize { + end := start + UploadChunkSize + if end > len(data) { + end = len(data) + } + dataReq := &syncPb.FileUploadRequest{ + UploadPacket: &syncPb.FileUploadRequest_FileContents{ + FileContents: &syncPb.FileData{ + Data: data[start:end], + }, + }, + } + if err := stream.Send(dataReq); err != nil { + return "", err + } + } + // close stream and get response + resp, err := stream.CloseAndRecv() + if err != nil { + return "", err + } + + return resp.FileId, nil +} diff --git a/app/data_client_test.go b/app/data_client_test.go index 5a3a22468a0..9096635392c 100644 --- a/app/data_client_test.go +++ b/app/data_client_test.go @@ -2,16 +2,19 @@ package app import ( "context" + "os" "testing" "time" "go.mongodb.org/mongo-driver/bson" pb "go.viam.com/api/app/data/v1" + syncPb "go.viam.com/api/app/datasync/v1" "go.viam.com/test" utils "go.viam.com/utils/protoutils" "google.golang.org/grpc" "google.golang.org/protobuf/types/known/timestamppb" + "go.viam.com/rdk/protoutils" "go.viam.com/rdk/testutils/inject" ) @@ -31,15 +34,18 @@ const ( bboxLabel = "bbox_label" tag = "tag" fileName = "file_name" - fileExt = "file_ext.ext" + fileExt = ".ext" datasetID = "dataset_id" binaryMetaID = "binary_id" mongodbURI = "mongo_uri" hostName = "host_name" last = "last" + fileID = "file_id" ) var ( + binaryDataType = DataTypeBinarySensor + tabularDataType = DataTypeTabularSensor locationIDs = []string{locationID} orgIDs = []string{organizationID} mimeTypes = []string{mimeType} @@ -48,6 +54,7 @@ var ( tags = []string{tag} startTime = time.Now().UTC().Round(time.Millisecond) endTime = time.Now().UTC().Round(time.Millisecond) + dataRequestTimes = [2]time.Time{startTime, endTime} count = uint64(5) limit = uint64(5) countOnly = true @@ -55,6 +62,27 @@ var ( data = map[string]interface{}{ "key": "value", } + tabularMetadata = CaptureMetadata{ + OrganizationID: organizationID, + LocationID: locationID, + RobotName: robotName, + RobotID: robotID, + PartName: partName, + PartID: partID, + ComponentType: componentType, + ComponentName: componentName, + MethodName: method, + MethodParameters: methodParameters, + Tags: tags, + MimeType: mimeType, + } + tabularData = TabularData{ + Data: data, + MetadataIndex: 0, + Metadata: tabularMetadata, + TimeRequested: startTime, + TimeReceived: endTime, + } binaryID = BinaryID{ FileID: "file1", OrganizationID: organizationID, @@ -106,23 +134,6 @@ var ( } ) -func annotationsToProto(annotations Annotations) *pb.Annotations { - var protoBboxes []*pb.BoundingBox - for _, bbox := range annotations.Bboxes { - protoBboxes = append(protoBboxes, &pb.BoundingBox{ - Id: bbox.ID, - Label: bbox.Label, - XMinNormalized: bbox.XMinNormalized, - YMinNormalized: bbox.YMinNormalized, - XMaxNormalized: bbox.XMaxNormalized, - YMaxNormalized: bbox.YMaxNormalized, - }) - } - return &pb.Annotations{ - Bboxes: protoBboxes, - } -} - func binaryDataToProto(binaryData BinaryData) *pb.BinaryData { return &pb.BinaryData{ Binary: binaryData.Binary, @@ -130,6 +141,27 @@ func binaryDataToProto(binaryData BinaryData) *pb.BinaryData { } } +func captureMetadataToProto(metadata CaptureMetadata) *pb.CaptureMetadata { + methodParams, err := protoutils.ConvertMapToProtoAny(metadata.MethodParameters) + if err != nil { + return nil + } + return &pb.CaptureMetadata{ + OrganizationId: metadata.OrganizationID, + LocationId: metadata.LocationID, + RobotName: metadata.RobotName, + RobotId: metadata.RobotID, + PartName: metadata.PartName, + PartId: metadata.PartID, + ComponentType: metadata.ComponentType, + ComponentName: metadata.ComponentName, + MethodName: metadata.MethodName, + MethodParameters: methodParams, + Tags: metadata.Tags, + MimeType: metadata.MimeType, + } +} + func binaryMetadataToProto(binaryMetadata BinaryMetadata) *pb.BinaryMetadata { return &pb.BinaryMetadata{ Id: binaryMetadata.ID, @@ -157,6 +189,10 @@ func createGrpcClient() *inject.DataServiceClient { return &inject.DataServiceClient{} } +func createGrpcDataSyncClient() *inject.DataSyncServiceClient { + return &inject.DataSyncServiceClient{} +} + func TestDataClient(t *testing.T) { grpcClient := createGrpcClient() client := DataClient{client: grpcClient} @@ -187,21 +223,6 @@ func TestDataClient(t *testing.T) { DatasetID: datasetID, } - tabularMetadata := CaptureMetadata{ - OrganizationID: organizationID, - LocationID: locationID, - RobotName: robotName, - RobotID: robotID, - PartName: partName, - PartID: partID, - ComponentType: componentType, - ComponentName: componentName, - MethodName: method, - MethodParameters: methodParameters, - Tags: tags, - MimeType: mimeType, - } - binaryMetadata := BinaryMetadata{ ID: binaryMetaID, CaptureMetadata: tabularMetadata, @@ -227,13 +248,6 @@ func TestDataClient(t *testing.T) { } t.Run("TabularDataByFilter", func(t *testing.T) { - tabularData := TabularData{ - Data: data, - MetadataIndex: 0, - Metadata: tabularMetadata, - TimeRequested: startTime, - TimeReceived: endTime, - } dataStruct, _ := utils.StructToStructPb(data) tabularDataPb := &pb.TabularData{ Data: dataStruct, @@ -579,3 +593,261 @@ func TestDataClient(t *testing.T) { client.RemoveBinaryDataFromDatasetByIDs(context.Background(), binaryIDs, datasetID) }) } + +func TestDataSyncClient(t *testing.T) { + grpcClient := createGrpcDataSyncClient() + client := DataClient{dataSyncClient: grpcClient} + + uploadMetadata := UploadMetadata{ + PartID: partID, + ComponentType: componentType, + ComponentName: componentName, + MethodName: method, + Type: DataTypeBinarySensor, + FileName: fileName, + MethodParameters: methodParameters, + FileExtension: fileExt, + Tags: tags, + } + + t.Run("BinaryDataCaptureUpload", func(t *testing.T) { + uploadMetadata.Type = DataTypeBinarySensor + options := BinaryOptions{ + Type: binaryDataType, + FileName: fileName, + MethodParameters: methodParameters, + Tags: tags, + DataRequestTimes: dataRequestTimes, + } + grpcClient.DataCaptureUploadFunc = func(ctx context.Context, in *syncPb.DataCaptureUploadRequest, + opts ...grpc.CallOption, + ) (*syncPb.DataCaptureUploadResponse, error) { + methodParams, _ := protoutils.ConvertMapToProtoAny(methodParameters) + + test.That(t, in.Metadata.PartId, test.ShouldEqual, partID) + test.That(t, in.Metadata.ComponentType, test.ShouldEqual, componentType) + test.That(t, in.Metadata.ComponentName, test.ShouldEqual, componentName) + test.That(t, in.Metadata.MethodName, test.ShouldEqual, method) + test.That(t, in.Metadata.Type, test.ShouldEqual, binaryDataType) + test.That(t, in.Metadata.FileName, test.ShouldEqual, fileName) + test.That(t, in.Metadata.MethodParameters, test.ShouldResemble, methodParams) + test.That(t, in.Metadata.FileExtension, test.ShouldEqual, fileExt) + test.That(t, in.Metadata.Tags, test.ShouldResemble, tags) + + test.That(t, in.SensorContents[0].Metadata.TimeRequested, test.ShouldResemble, timestamppb.New(startTime)) + test.That(t, in.SensorContents[0].Metadata.TimeReceived, test.ShouldResemble, timestamppb.New(endTime)) + dataField, ok := in.SensorContents[0].Data.(*syncPb.SensorData_Binary) + test.That(t, ok, test.ShouldBeTrue) + test.That(t, dataField.Binary, test.ShouldResemble, binaryDataByte) + return &syncPb.DataCaptureUploadResponse{ + FileId: fileID, + }, nil + } + resp, _ := client.BinaryDataCaptureUpload(context.Background(), + binaryDataByte, partID, componentType, componentName, + method, fileExt, &options) + test.That(t, resp, test.ShouldResemble, fileID) + }) + + t.Run("TabularDataCaptureUpload", func(t *testing.T) { + uploadMetadata.Type = DataTypeTabularSensor + dataStruct, _ := utils.StructToStructPb(data) + tabularDataPb := &pb.TabularData{ + Data: dataStruct, + MetadataIndex: 0, + TimeRequested: timestamppb.New(startTime), + TimeReceived: timestamppb.New(endTime), + } + options := TabularOptions{ + Type: binaryDataType, + FileName: fileName, + MethodParameters: methodParameters, + FileExtension: fileExt, + Tags: tags, + } + grpcClient.DataCaptureUploadFunc = func(ctx context.Context, in *syncPb.DataCaptureUploadRequest, + opts ...grpc.CallOption, + ) (*syncPb.DataCaptureUploadResponse, error) { + methodParams, _ := protoutils.ConvertMapToProtoAny(methodParameters) + + test.That(t, in.Metadata.PartId, test.ShouldEqual, partID) + test.That(t, in.Metadata.ComponentType, test.ShouldEqual, componentType) + test.That(t, in.Metadata.ComponentName, test.ShouldEqual, componentName) + test.That(t, in.Metadata.MethodName, test.ShouldEqual, method) + test.That(t, in.Metadata.Type, test.ShouldEqual, tabularDataType) + test.That(t, in.Metadata.FileName, test.ShouldEqual, fileName) + test.That(t, in.Metadata.MethodParameters, test.ShouldResemble, methodParams) + test.That(t, in.Metadata.FileExtension, test.ShouldEqual, fileExt) + test.That(t, in.Metadata.Tags, test.ShouldResemble, tags) + + test.That(t, in.SensorContents[0].Metadata.TimeRequested, test.ShouldResemble, timestamppb.New(startTime)) + test.That(t, in.SensorContents[0].Metadata.TimeReceived, test.ShouldResemble, timestamppb.New(endTime)) + dataField, ok := in.SensorContents[0].Data.(*syncPb.SensorData_Struct) + test.That(t, ok, test.ShouldBeTrue) + test.That(t, dataField.Struct, test.ShouldResemble, tabularDataPb.Data) + return &syncPb.DataCaptureUploadResponse{ + FileId: fileID, + }, nil + } + tabularData := []map[string]interface{}{data} + dataRequestTimes := [][2]time.Time{ + {startTime, endTime}, + } + resp, _ := client.tabularDataCaptureUpload(context.Background(), + tabularData, partID, componentType, componentName, method, + dataRequestTimes, &options) + test.That(t, resp, test.ShouldResemble, fileID) + }) + + t.Run("StreamingDataCaptureUpload", func(t *testing.T) { + options := StreamingOptions{ + ComponentType: componentType, + ComponentName: componentName, + MethodName: method, + Type: binaryDataType, + FileName: fileName, + MethodParameters: methodParameters, + Tags: tags, + DataRequestTimes: dataRequestTimes, + } + // Mock implementation of the streaming client. + mockStream := &inject.DataSyncServiceStreamingDataCaptureUploadClient{ + SendFunc: func(req *syncPb.StreamingDataCaptureUploadRequest) error { + switch packet := req.UploadPacket.(type) { + case *syncPb.StreamingDataCaptureUploadRequest_Metadata: + meta := packet.Metadata + test.That(t, meta.UploadMetadata.PartId, test.ShouldEqual, partID) + test.That(t, meta.UploadMetadata.FileExtension, test.ShouldEqual, fileExt) + test.That(t, meta.UploadMetadata.ComponentType, test.ShouldEqual, componentType) + test.That(t, meta.UploadMetadata.ComponentName, test.ShouldEqual, componentName) + test.That(t, meta.UploadMetadata.MethodName, test.ShouldEqual, method) + test.That(t, meta.UploadMetadata.Tags, test.ShouldResemble, tags) + test.That(t, meta.SensorMetadata.TimeRequested, test.ShouldResemble, timestamppb.New(startTime)) + test.That(t, meta.SensorMetadata.TimeReceived, test.ShouldResemble, timestamppb.New(endTime)) + case *syncPb.StreamingDataCaptureUploadRequest_Data: + test.That(t, packet.Data, test.ShouldResemble, binaryDataByte) + default: + t.Errorf("unexpected packet type: %T", packet) + } + return nil + }, + CloseAndRecvFunc: func() (*syncPb.StreamingDataCaptureUploadResponse, error) { + return &syncPb.StreamingDataCaptureUploadResponse{ + FileId: fileID, + }, nil + }, + } + grpcClient.StreamingDataCaptureUploadFunc = func(ctx context.Context, + opts ...grpc.CallOption, + ) (syncPb.DataSyncService_StreamingDataCaptureUploadClient, error) { + return mockStream, nil + } + resp, err := client.StreamingDataCaptureUpload(context.Background(), binaryDataByte, partID, fileExt, &options) + test.That(t, err, test.ShouldBeNil) + test.That(t, resp, test.ShouldEqual, fileID) + }) + t.Run("FileUploadFromBytes", func(t *testing.T) { + options := FileUploadOptions{ + ComponentType: componentType, + ComponentName: componentName, + MethodName: method, + FileName: fileName, + MethodParameters: methodParameters, + FileExtension: fileExt, + Tags: tags, + } + // Mock implementation of the streaming client. + //nolint:dupl + mockStream := &inject.DataSyncServiceFileUploadClient{ + SendFunc: func(req *syncPb.FileUploadRequest) error { + switch packet := req.UploadPacket.(type) { + case *syncPb.FileUploadRequest_Metadata: + methodParams, _ := protoutils.ConvertMapToProtoAny(methodParameters) + meta := packet.Metadata + test.That(t, meta.PartId, test.ShouldEqual, partID) + test.That(t, meta.ComponentType, test.ShouldEqual, componentType) + test.That(t, meta.ComponentName, test.ShouldEqual, componentName) + test.That(t, meta.MethodName, test.ShouldEqual, method) + test.That(t, meta.Type, test.ShouldEqual, DataTypeFile) + test.That(t, meta.FileName, test.ShouldEqual, fileName) + test.That(t, meta.MethodParameters, test.ShouldResemble, methodParams) + test.That(t, meta.FileExtension, test.ShouldEqual, fileExt) + test.That(t, meta.Tags, test.ShouldResemble, tags) + case *syncPb.FileUploadRequest_FileContents: + test.That(t, packet.FileContents.Data, test.ShouldResemble, binaryDataByte) + default: + t.Errorf("unexpected packet type: %T", packet) + } + return nil + }, + CloseAndRecvFunc: func() (*syncPb.FileUploadResponse, error) { + return &syncPb.FileUploadResponse{ + FileId: fileID, + }, nil + }, + } + grpcClient.FileUploadFunc = func(ctx context.Context, + opts ...grpc.CallOption, + ) (syncPb.DataSyncService_FileUploadClient, error) { + return mockStream, nil + } + resp, err := client.FileUploadFromBytes(context.Background(), partID, binaryDataByte, &options) + test.That(t, err, test.ShouldBeNil) + test.That(t, resp, test.ShouldEqual, fileID) + }) + + t.Run("FileUploadFromPath", func(t *testing.T) { + options := FileUploadOptions{ + ComponentType: componentType, + ComponentName: componentName, + MethodName: method, + FileName: fileName, + MethodParameters: methodParameters, + FileExtension: fileExt, + Tags: tags, + } + // Create a temporary file for testing + tempContent := []byte("test file content") + tempFile, err := os.CreateTemp("", "test-upload-*.txt") + test.That(t, err, test.ShouldBeNil) + defer os.Remove(tempFile.Name()) + // Mock implementation of the streaming client. + //nolint:dupl + mockStream := &inject.DataSyncServiceFileUploadClient{ + SendFunc: func(req *syncPb.FileUploadRequest) error { + switch packet := req.UploadPacket.(type) { + case *syncPb.FileUploadRequest_Metadata: + methodParams, _ := protoutils.ConvertMapToProtoAny(methodParameters) + meta := packet.Metadata + test.That(t, meta.PartId, test.ShouldEqual, partID) + test.That(t, meta.ComponentType, test.ShouldEqual, componentType) + test.That(t, meta.ComponentName, test.ShouldEqual, componentName) + test.That(t, meta.MethodName, test.ShouldEqual, method) + test.That(t, meta.Type, test.ShouldEqual, DataTypeFile) + test.That(t, meta.FileName, test.ShouldEqual, fileName) + test.That(t, meta.MethodParameters, test.ShouldResemble, methodParams) + test.That(t, meta.FileExtension, test.ShouldEqual, fileExt) + test.That(t, meta.Tags, test.ShouldResemble, tags) + case *syncPb.FileUploadRequest_FileContents: + test.That(t, packet.FileContents.Data, test.ShouldResemble, tempContent) + default: + t.Errorf("unexpected packet type: %T", packet) + } + return nil + }, + CloseAndRecvFunc: func() (*syncPb.FileUploadResponse, error) { + return &syncPb.FileUploadResponse{ + FileId: fileID, + }, nil + }, + } + grpcClient.FileUploadFunc = func(ctx context.Context, + opts ...grpc.CallOption, + ) (syncPb.DataSyncService_FileUploadClient, error) { + return mockStream, nil + } + resp, err := client.FileUploadFromPath(context.Background(), partID, tempFile.Name(), &options) + test.That(t, err, test.ShouldBeNil) + test.That(t, resp, test.ShouldEqual, fileID) + }) +} diff --git a/go.mod b/go.mod index 9949c4ef90c..75e2f385ebb 100644 --- a/go.mod +++ b/go.mod @@ -78,7 +78,7 @@ require ( go.uber.org/atomic v1.11.0 go.uber.org/multierr v1.11.0 go.uber.org/zap v1.27.0 - go.viam.com/api v0.1.357 + go.viam.com/api v0.1.366 go.viam.com/test v1.2.4 go.viam.com/utils v0.1.116 goji.io v2.0.2+incompatible @@ -193,7 +193,7 @@ require ( github.com/dnephin/pflag v1.0.7 // indirect github.com/docker/cli v25.0.4+incompatible // indirect github.com/docker/distribution v2.8.3+incompatible // indirect - github.com/docker/docker v25.0.4+incompatible // indirect + github.com/docker/docker v25.0.6+incompatible // indirect github.com/docker/docker-credential-helpers v0.8.1 // indirect github.com/docker/go-connections v0.5.0 // indirect github.com/dustin/go-humanize v1.0.1 // indirect diff --git a/go.sum b/go.sum index e28191b5e46..774b3790a87 100644 --- a/go.sum +++ b/go.sum @@ -347,8 +347,8 @@ github.com/docker/cli v25.0.4+incompatible h1:DatRkJ+nrFoYL2HZUzjM5Z5sAmcA5XGp+A github.com/docker/cli v25.0.4+incompatible/go.mod h1:JLrzqnKDaYBop7H2jaqPtU4hHvMKP+vjCwu2uszcLI8= github.com/docker/distribution v2.8.3+incompatible h1:AtKxIZ36LoNK51+Z6RpzLpddBirtxJnzDrHLEKxTAYk= github.com/docker/distribution v2.8.3+incompatible/go.mod h1:J2gT2udsDAN96Uj4KfcMRqY0/ypR+oyYUYmja8H+y+w= -github.com/docker/docker v25.0.4+incompatible h1:XITZTrq+52tZyZxUOtFIahUf3aH367FLxJzt9vZeAF8= -github.com/docker/docker v25.0.4+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk= +github.com/docker/docker v25.0.6+incompatible h1:5cPwbwriIcsua2REJe8HqQV+6WlWc1byg2QSXzBxBGg= +github.com/docker/docker v25.0.6+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk= github.com/docker/docker-credential-helpers v0.8.1 h1:j/eKUktUltBtMzKqmfLB0PAgqYyMHOp5vfsD1807oKo= github.com/docker/docker-credential-helpers v0.8.1/go.mod h1:P3ci7E3lwkZg6XiHdRKft1KckHiO9a2rNtyFbZ/ry9M= github.com/docker/go-connections v0.5.0 h1:USnMq7hx7gwdVZq1L49hLXaFtUdTADjXGp+uj1Br63c= @@ -1531,8 +1531,8 @@ go.uber.org/zap v1.18.1/go.mod h1:xg/QME4nWcxGxrpdeYfq7UvYrLh66cuVKdrbD1XF/NI= go.uber.org/zap v1.23.0/go.mod h1:D+nX8jyLsMHMYrln8A0rJjFt/T/9/bGgIhAqxv5URuY= go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= -go.viam.com/api v0.1.357 h1:L9LBYbaH0imv/B+mVxqtSgClIl4flzjLV6LclfnD9Nc= -go.viam.com/api v0.1.357/go.mod h1:5lpVRxMsKFCaahqsnJfPGwJ9baoQ6PIKQu3lxvy6Wtw= +go.viam.com/api v0.1.366 h1:lUen0W04hwdFL95GoQkYaweZO5ySG40BnUl7HHVZE3o= +go.viam.com/api v0.1.366/go.mod h1:g5eipXHNm0rQmW7DWya6avKcmzoypLmxnMlAaIsE5Ls= go.viam.com/test v1.2.4 h1:JYgZhsuGAQ8sL9jWkziAXN9VJJiKbjoi9BsO33TW3ug= go.viam.com/test v1.2.4/go.mod h1:zI2xzosHdqXAJ/kFqcN+OIF78kQuTV2nIhGZ8EzvaJI= go.viam.com/utils v0.1.116 h1:hoCj3SsV8LZAOEP75TjMeX57axhravS8rNUYmhpTWtM= diff --git a/testutils/inject/datasync_service_client.go b/testutils/inject/datasync_service_client.go new file mode 100644 index 00000000000..7fccd84fb2d --- /dev/null +++ b/testutils/inject/datasync_service_client.go @@ -0,0 +1,97 @@ +package inject + +import ( + "context" + + datapb "go.viam.com/api/app/datasync/v1" + "google.golang.org/grpc" +) + +// DataSyncServiceClient represents a fake instance of a data sync service client. +type DataSyncServiceClient struct { + datapb.DataSyncServiceClient + DataCaptureUploadFunc func(ctx context.Context, in *datapb.DataCaptureUploadRequest, + opts ...grpc.CallOption) (*datapb.DataCaptureUploadResponse, error) + FileUploadFunc func(ctx context.Context, + opts ...grpc.CallOption) (datapb.DataSyncService_FileUploadClient, error) + StreamingDataCaptureUploadFunc func(ctx context.Context, + opts ...grpc.CallOption) (datapb.DataSyncService_StreamingDataCaptureUploadClient, error) +} + +// DataCaptureUpload uploads the contents and metadata for tabular data. +func (client *DataSyncServiceClient) DataCaptureUpload(ctx context.Context, in *datapb.DataCaptureUploadRequest, + opts ...grpc.CallOption, +) (*datapb.DataCaptureUploadResponse, error) { + if client.DataCaptureUploadFunc == nil { + return client.DataSyncServiceClient.DataCaptureUpload(ctx, in, opts...) + } + return client.DataCaptureUploadFunc(ctx, in, opts...) +} + +// FileUpload uploads the contents and metadata for binary (image + file) data, +// where the first packet must be the UploadMetadata. +func (client *DataSyncServiceClient) FileUpload(ctx context.Context, + opts ...grpc.CallOption, +) (datapb.DataSyncService_FileUploadClient, error) { + if client.FileUploadFunc == nil { + return client.DataSyncServiceClient.FileUpload(ctx, opts...) + } + return client.FileUploadFunc(ctx, opts...) +} + +// StreamingDataCaptureUpload uploads the contents and metadata for streaming binary data. +func (client *DataSyncServiceClient) StreamingDataCaptureUpload(ctx context.Context, + opts ...grpc.CallOption, +) (datapb.DataSyncService_StreamingDataCaptureUploadClient, error) { + if client.StreamingDataCaptureUploadFunc == nil { + return client.DataSyncServiceClient.StreamingDataCaptureUpload(ctx, opts...) + } + return client.StreamingDataCaptureUploadFunc(ctx, opts...) +} + +// DataSyncServiceStreamingDataCaptureUploadClient represents a fake instance of +// a StreamingDataCaptureUpload client. +type DataSyncServiceStreamingDataCaptureUploadClient struct { + datapb.DataSyncService_StreamingDataCaptureUploadClient + SendFunc func(*datapb.StreamingDataCaptureUploadRequest) error + CloseAndRecvFunc func() (*datapb.StreamingDataCaptureUploadResponse, error) +} + +// Send sends a StreamingDataCaptureUploadRequest using the mock or actual client. +func (client *DataSyncServiceStreamingDataCaptureUploadClient) Send(req *datapb.StreamingDataCaptureUploadRequest) error { + if client.SendFunc == nil { + return client.DataSyncService_StreamingDataCaptureUploadClient.Send(req) + } + return client.SendFunc(req) +} + +// CloseAndRecv closes the stream and receives a StreamingDataCaptureUploadResponse using the mock or actual client. +func (client *DataSyncServiceStreamingDataCaptureUploadClient) CloseAndRecv() (*datapb.StreamingDataCaptureUploadResponse, error) { + if client.CloseAndRecvFunc == nil { + return client.DataSyncService_StreamingDataCaptureUploadClient.CloseAndRecv() + } + return client.CloseAndRecvFunc() +} + +// DataSyncServiceFileUploadClient represents a fake instance of a FileUpload client. +type DataSyncServiceFileUploadClient struct { + datapb.DataSyncService_FileUploadClient + SendFunc func(*datapb.FileUploadRequest) error + CloseAndRecvFunc func() (*datapb.FileUploadResponse, error) +} + +// Send sends a FileUploadRequest using the mock or actual client. +func (client *DataSyncServiceFileUploadClient) Send(req *datapb.FileUploadRequest) error { + if client.SendFunc == nil { + return client.DataSyncService_FileUploadClient.Send(req) + } + return client.SendFunc(req) +} + +// CloseAndRecv closes the stream and receives a FileUploadResponse using the mock or actual client. +func (client *DataSyncServiceFileUploadClient) CloseAndRecv() (*datapb.FileUploadResponse, error) { + if client.CloseAndRecvFunc == nil { + return client.DataSyncService_FileUploadClient.CloseAndRecv() + } + return client.CloseAndRecvFunc() +}