diff --git a/components/camera/camera.go b/components/camera/camera.go index 1cd42ca8a00..e010f2f5a22 100644 --- a/components/camera/camera.go +++ b/components/camera/camera.go @@ -6,6 +6,7 @@ package camera import ( "context" + "fmt" "image" "github.com/pkg/errors" @@ -15,6 +16,7 @@ import ( "go.viam.com/rdk/gostream" "go.viam.com/rdk/pointcloud" "go.viam.com/rdk/resource" + "go.viam.com/rdk/rimage" "go.viam.com/rdk/rimage/transform" "go.viam.com/rdk/robot" ) @@ -70,15 +72,32 @@ type NamedImage struct { SourceName string } +// ImageMetadata contains useful information about returned image bytes such as its mimetype. +type ImageMetadata struct { + MimeType string +} + // A Camera is a resource that can capture frames. type Camera interface { resource.Resource VideoSource } -// A VideoSource represents anything that can capture frames. +// VideoSource represents anything that can capture frames. // For more information, see the [camera component docs]. // +// Image example: +// +// myCamera, err := camera.FromRobot(machine, "my_camera") +// +// // gets an image from the camera +// imageBytes, mimeType, err := myCamera.Image(context.Background(), utils.MimeTypeJPEG, nil) +// +// Or try to directly decode into an image.Image: +// +// myCamera, err := camera.FromRobot(machine, "my_camera") +// img, err = camera.DecodeImageFromCamera(context.Background(), utils.MimeTypeJPEG, nil, myCamera) +// // Images example: // // myCamera, err := camera.FromRobot(machine, "my_camera") @@ -111,6 +130,10 @@ type Camera interface { // // [camera component docs]: https://docs.viam.com/components/camera/ type VideoSource interface { + // Image returns a byte slice representing an image that tries to adhere to the MIME type hint. + // Image also may return a string representing the mime type hint or empty string if not. + Image(ctx context.Context, mimeType string, extra map[string]interface{}) ([]byte, ImageMetadata, error) + // Images is used for getting simultaneous images from different imagers, // along with associated metadata (just timestamp for now). It's not for getting a time series of images from the same imager. Images(ctx context.Context) ([]NamedImage, resource.ResponseMetadata, error) @@ -136,6 +159,22 @@ func ReadImage(ctx context.Context, src gostream.VideoSource) (image.Image, func return gostream.ReadImage(ctx, src) } +// DecodeImageFromCamera retrieves image bytes from a camera resource and serializes it as an image.Image. +func DecodeImageFromCamera(ctx context.Context, mimeType string, extra map[string]interface{}, cam Camera) (image.Image, error) { + resBytes, resMetadata, err := cam.Image(ctx, mimeType, extra) + if err != nil { + return nil, fmt.Errorf("could not get image bytes from camera: %w", err) + } + if len(resBytes) == 0 { + return nil, errors.New("received empty bytes from camera") + } + img, err := rimage.DecodeImage(ctx, resBytes, resMetadata.MimeType) + if err != nil { + return nil, fmt.Errorf("could not decode into image.Image: %w", err) + } + return img, nil +} + // A PointCloudSource is a source that can generate pointclouds. type PointCloudSource interface { NextPointCloud(ctx context.Context) (pointcloud.PointCloud, error) diff --git a/components/camera/camera_test.go b/components/camera/camera_test.go index 1d4392c285d..43c5951548f 100644 --- a/components/camera/camera_test.go +++ b/components/camera/camera_test.go @@ -11,6 +11,7 @@ import ( "go.viam.com/rdk/components/camera" "go.viam.com/rdk/gostream" + "go.viam.com/rdk/logging" "go.viam.com/rdk/pointcloud" "go.viam.com/rdk/resource" "go.viam.com/rdk/rimage" @@ -168,6 +169,7 @@ func (cs *cloudSource) NextPointCloud(ctx context.Context) (pointcloud.PointClou } func TestCameraWithNoProjector(t *testing.T) { + logger := logging.NewTestLogger(t) videoSrc := &simpleSource{"rimage/board1"} noProj, err := camera.NewVideoSourceFromReader(context.Background(), videoSrc, nil, camera.DepthStream) test.That(t, err, test.ShouldBeNil) @@ -175,28 +177,38 @@ func TestCameraWithNoProjector(t *testing.T) { test.That(t, errors.Is(err, transform.ErrNoIntrinsics), test.ShouldBeTrue) // make a camera with a NextPointCloudFunction - videoSrc2 := &cloudSource{Named: camera.Named("foo").AsNamed(), simpleSource: videoSrc} - noProj2, err := camera.NewVideoSourceFromReader(context.Background(), videoSrc2, nil, camera.DepthStream) + cloudSrc2 := &cloudSource{Named: camera.Named("foo").AsNamed(), simpleSource: videoSrc} + videoSrc2, err := camera.NewVideoSourceFromReader(context.Background(), cloudSrc2, nil, camera.DepthStream) + noProj2 := camera.FromVideoSource(resource.NewName(camera.API, "bar"), videoSrc2, logger) test.That(t, err, test.ShouldBeNil) pc, err := noProj2.NextPointCloud(context.Background()) test.That(t, err, test.ShouldBeNil) _, got := pc.At(0, 0, 0) test.That(t, got, test.ShouldBeTrue) - img, _, err := camera.ReadImage( - gostream.WithMIMETypeHint(context.Background(), rutils.WithLazyMIMEType(rutils.MimeTypePNG)), - noProj2) + // TODO(hexbabe): remove below test when Stream is refactored + t.Run("ReadImage depth map without projector", func(t *testing.T) { + img, _, err := camera.ReadImage( + gostream.WithMIMETypeHint(context.Background(), rutils.WithLazyMIMEType(rutils.MimeTypePNG)), + noProj2) + test.That(t, err, test.ShouldBeNil) + depthImg := img.(*rimage.DepthMap) + test.That(t, err, test.ShouldBeNil) + test.That(t, depthImg.Bounds().Dx(), test.ShouldEqual, 1280) + test.That(t, depthImg.Bounds().Dy(), test.ShouldEqual, 720) + }) + + img, err := camera.DecodeImageFromCamera(context.Background(), rutils.WithLazyMIMEType(rutils.MimeTypePNG), nil, noProj2) test.That(t, err, test.ShouldBeNil) - depthImg := img.(*rimage.DepthMap) - test.That(t, err, test.ShouldBeNil) - test.That(t, depthImg.Bounds().Dx(), test.ShouldEqual, 1280) - test.That(t, depthImg.Bounds().Dy(), test.ShouldEqual, 720) + test.That(t, img.Bounds().Dx(), test.ShouldEqual, 1280) + test.That(t, img.Bounds().Dy(), test.ShouldEqual, 720) test.That(t, noProj2.Close(context.Background()), test.ShouldBeNil) } func TestCameraWithProjector(t *testing.T) { + logger := logging.NewTestLogger(t) videoSrc := &simpleSource{"rimage/board1"} params1 := &transform.PinholeCameraIntrinsics{ // not the real camera parameters -- fake for test Width: 1280, @@ -219,32 +231,50 @@ func TestCameraWithProjector(t *testing.T) { test.That(t, src.Close(context.Background()), test.ShouldBeNil) // camera with a point cloud function - videoSrc2 := &cloudSource{Named: camera.Named("foo").AsNamed(), simpleSource: videoSrc} + cloudSrc2 := &cloudSource{Named: camera.Named("foo").AsNamed(), simpleSource: videoSrc} props, err := src.Properties(context.Background()) test.That(t, err, test.ShouldBeNil) - cam2, err := camera.NewVideoSourceFromReader( + videoSrc2, err := camera.NewVideoSourceFromReader( context.Background(), - videoSrc2, + cloudSrc2, &transform.PinholeCameraModel{PinholeCameraIntrinsics: props.IntrinsicParams}, camera.DepthStream, ) + cam2 := camera.FromVideoSource(resource.NewName(camera.API, "bar"), videoSrc2, logger) test.That(t, err, test.ShouldBeNil) - pc, err = cam2.NextPointCloud(context.Background()) + pc, err = videoSrc2.NextPointCloud(context.Background()) test.That(t, err, test.ShouldBeNil) _, got := pc.At(0, 0, 0) test.That(t, got, test.ShouldBeTrue) - img, _, err := camera.ReadImage( - gostream.WithMIMETypeHint(context.Background(), rutils.MimeTypePNG), - cam2) + // TODO(hexbabe): remove below test when Stream/ReadImage pattern is refactored + t.Run("ReadImage depth map with projector", func(t *testing.T) { + img, _, err := camera.ReadImage( + gostream.WithMIMETypeHint(context.Background(), rutils.MimeTypePNG), + cam2) + test.That(t, err, test.ShouldBeNil) + + depthImg := img.(*rimage.DepthMap) + test.That(t, err, test.ShouldBeNil) + test.That(t, depthImg.Bounds().Dx(), test.ShouldEqual, 1280) + test.That(t, depthImg.Bounds().Dy(), test.ShouldEqual, 720) + // cam2 should implement a default GetImages, that just returns the one image + images, _, err := cam2.Images(context.Background()) + test.That(t, err, test.ShouldBeNil) + test.That(t, len(images), test.ShouldEqual, 1) + test.That(t, images[0].Image, test.ShouldHaveSameTypeAs, &rimage.DepthMap{}) + test.That(t, images[0].Image.Bounds().Dx(), test.ShouldEqual, 1280) + test.That(t, images[0].Image.Bounds().Dy(), test.ShouldEqual, 720) + }) + + img, err := camera.DecodeImageFromCamera(context.Background(), rutils.MimeTypePNG, nil, cam2) test.That(t, err, test.ShouldBeNil) - depthImg := img.(*rimage.DepthMap) test.That(t, err, test.ShouldBeNil) - test.That(t, depthImg.Bounds().Dx(), test.ShouldEqual, 1280) - test.That(t, depthImg.Bounds().Dy(), test.ShouldEqual, 720) + test.That(t, img.Bounds().Dx(), test.ShouldEqual, 1280) + test.That(t, img.Bounds().Dy(), test.ShouldEqual, 720) // cam2 should implement a default GetImages, that just returns the one image - images, _, err := cam2.Images(context.Background()) + images, _, err := videoSrc2.Images(context.Background()) test.That(t, err, test.ShouldBeNil) test.That(t, len(images), test.ShouldEqual, 1) test.That(t, images[0].Image, test.ShouldHaveSameTypeAs, &rimage.DepthMap{}) diff --git a/components/camera/client.go b/components/camera/client.go index 852f6548577..e717e9868d3 100644 --- a/components/camera/client.go +++ b/components/camera/client.go @@ -21,8 +21,6 @@ import ( goprotoutils "go.viam.com/utils/protoutils" "go.viam.com/utils/rpc" "golang.org/x/exp/slices" - "google.golang.org/protobuf/proto" - "google.golang.org/protobuf/types/known/structpb" "go.viam.com/rdk/components/camera/rtppassthrough" "go.viam.com/rdk/data" @@ -100,60 +98,6 @@ func NewClientFromConn( }, nil } -func getExtra(ctx context.Context) (*structpb.Struct, error) { - ext := &structpb.Struct{} - if extra, ok := FromContext(ctx); ok { - var err error - if ext, err = goprotoutils.StructToStructPb(extra); err != nil { - return nil, err - } - } - - dataExt, err := data.GetExtraFromContext(ctx) - if err != nil { - return nil, err - } - - proto.Merge(ext, dataExt) - return ext, nil -} - -// RSDK-8663: This method signature is depended on by the `camera.serviceServer` optimization that -// avoids using an image stream just to get a single image. -func (c *client) Read(ctx context.Context) (image.Image, func(), error) { - ctx, span := trace.StartSpan(ctx, "camera::client::Read") - defer span.End() - mimeType := gostream.MIMETypeHint(ctx, "") - expectedType, _ := utils.CheckLazyMIMEType(mimeType) - - ext, err := getExtra(ctx) - if err != nil { - return nil, nil, err - } - - resp, err := c.client.GetImage(ctx, &pb.GetImageRequest{ - Name: c.name, - MimeType: expectedType, - Extra: ext, - }) - if err != nil { - return nil, nil, err - } - - if expectedType != "" && resp.MimeType != expectedType { - c.logger.CDebugw(ctx, "got different MIME type than what was asked for", "sent", expectedType, "received", resp.MimeType) - } else { - resp.MimeType = mimeType - } - - resp.MimeType = utils.WithLazyMIMEType(resp.MimeType) - img, err := rimage.DecodeImage(ctx, resp.Image, resp.MimeType) - if err != nil { - return nil, nil, err - } - return img, func() {}, nil -} - func (c *client) Stream( ctx context.Context, errHandlers ...gostream.ErrorHandler, @@ -184,7 +128,8 @@ func (c *client) Stream( // with those from the new "generation". healthyClientCh := c.maybeResetHealthyClientCh() - ctxWithMIME := gostream.WithMIMETypeHint(context.Background(), gostream.MIMETypeHint(ctx, "")) + mimeTypeFromCtx := gostream.MIMETypeHint(ctx, "") + ctxWithMIME := gostream.WithMIMETypeHint(context.Background(), mimeTypeFromCtx) streamCtx, stream, frameCh := gostream.NewMediaStreamForChannel[image.Image](ctxWithMIME) c.activeBackgroundWorkers.Add(1) @@ -201,7 +146,7 @@ func (c *client) Stream( return } - frame, release, err := c.Read(streamCtx) + img, err := DecodeImageFromCamera(streamCtx, mimeTypeFromCtx, nil, c) if err != nil { for _, handler := range errHandlers { handler(streamCtx, err) @@ -217,8 +162,8 @@ func (c *client) Stream( } return case frameCh <- gostream.MediaReleasePairWithError[image.Image]{ - Media: frame, - Release: release, + Media: img, + Release: func() {}, Err: err, }: } @@ -228,6 +173,36 @@ func (c *client) Stream( return stream, nil } +func (c *client) Image(ctx context.Context, mimeType string, extra map[string]interface{}) ([]byte, ImageMetadata, error) { + ctx, span := trace.StartSpan(ctx, "camera::client::Image") + defer span.End() + expectedType, _ := utils.CheckLazyMIMEType(mimeType) + + convertedExtra, err := goprotoutils.StructToStructPb(extra) + if err != nil { + return nil, ImageMetadata{}, err + } + resp, err := c.client.GetImage(ctx, &pb.GetImageRequest{ + Name: c.name, + MimeType: expectedType, + Extra: convertedExtra, + }) + if err != nil { + return nil, ImageMetadata{}, err + } + if len(resp.Image) == 0 { + return nil, ImageMetadata{}, errors.New("received empty bytes from client GetImage") + } + + if expectedType != "" && resp.MimeType != expectedType { + c.logger.CDebugw(ctx, "got different MIME type than what was asked for", "sent", expectedType, "received", resp.MimeType) + } else { + resp.MimeType = mimeType + } + + return resp.Image, ImageMetadata{MimeType: utils.WithLazyMIMEType(resp.MimeType)}, nil +} + func (c *client) Images(ctx context.Context) ([]NamedImage, resource.ResponseMetadata, error) { ctx, span := trace.StartSpan(ctx, "camera::client::Images") defer span.End() diff --git a/components/camera/client_test.go b/components/camera/client_test.go index 0e905f44b57..d67ea2bf2ae 100644 --- a/components/camera/client_test.go +++ b/components/camera/client_test.go @@ -7,7 +7,6 @@ import ( "image/color" "image/png" "net" - "sync" "testing" "time" @@ -72,8 +71,6 @@ func TestClient(t *testing.T) { } projA = intrinsics - var imageReleased bool - var imageReleasedMu sync.Mutex // color camera injectCamera.NextPointCloudFunc = func(ctx context.Context) (pointcloud.PointCloud, error) { return pcA, nil @@ -99,13 +96,13 @@ func TestClient(t *testing.T) { ts := time.UnixMilli(12345) return images, resource.ResponseMetadata{CapturedAt: ts}, nil } - injectCamera.StreamFunc = func(ctx context.Context, errHandlers ...gostream.ErrorHandler) (gostream.VideoStream, error) { - return gostream.NewEmbeddedVideoStreamFromReader(gostream.VideoReaderFunc(func(ctx context.Context) (image.Image, func(), error) { - imageReleasedMu.Lock() - imageReleased = true - imageReleasedMu.Unlock() - return imgPng, func() {}, nil - })), nil + injectCamera.ImageFunc = func(ctx context.Context, mimeType string, extra map[string]interface{}) ([]byte, camera.ImageMetadata, error) { + if val, ok := extra["empty"].(bool); ok && val { + return []byte{}, camera.ImageMetadata{}, nil + } + resBytes, err := rimage.EncodeImage(ctx, imgPng, mimeType) + test.That(t, err, test.ShouldBeNil) + return resBytes, camera.ImageMetadata{MimeType: mimeType}, nil } // depth camera injectCameraDepth := &inject.Camera{} @@ -127,13 +124,14 @@ func TestClient(t *testing.T) { injectCameraDepth.ProjectorFunc = func(ctx context.Context) (transform.Projector, error) { return projA, nil } - injectCameraDepth.StreamFunc = func(ctx context.Context, errHandlers ...gostream.ErrorHandler) (gostream.VideoStream, error) { - return gostream.NewEmbeddedVideoStreamFromReader(gostream.VideoReaderFunc(func(ctx context.Context) (image.Image, func(), error) { - imageReleasedMu.Lock() - imageReleased = true - imageReleasedMu.Unlock() - return depthImg, func() {}, nil - })), nil + injectCameraDepth.ImageFunc = func( + ctx context.Context, + mimeType string, + extra map[string]interface{}, + ) ([]byte, camera.ImageMetadata, error) { + resBytes, err := rimage.EncodeImage(ctx, depthImg, mimeType) + test.That(t, err, test.ShouldBeNil) + return resBytes, camera.ImageMetadata{MimeType: mimeType}, nil } // bad camera injectCamera2 := &inject.Camera{} @@ -146,8 +144,8 @@ func TestClient(t *testing.T) { injectCamera2.ProjectorFunc = func(ctx context.Context) (transform.Projector, error) { return nil, errCameraProjectorFailed } - injectCamera2.StreamFunc = func(ctx context.Context, errHandlers ...gostream.ErrorHandler) (gostream.VideoStream, error) { - return nil, errStreamFailed + injectCamera2.ImageFunc = func(ctx context.Context, mimeType string, extra map[string]interface{}) ([]byte, camera.ImageMetadata, error) { + return nil, camera.ImageMetadata{}, errGetImageFailed } resources := map[resource.Name]camera.Camera{ @@ -180,15 +178,25 @@ func TestClient(t *testing.T) { test.That(t, err, test.ShouldBeNil) camera1Client, err := camera.NewClientFromConn(context.Background(), conn, "", camera.Named(testCameraName), logger) test.That(t, err, test.ShouldBeNil) - ctx := gostream.WithMIMETypeHint(context.Background(), rutils.MimeTypeRawRGBA) - frame, _, err := camera.ReadImage(ctx, camera1Client) + // TODO(hexbabe): remove below test when Stream/ReadImage pattern is refactored + t.Run("ReadImage from camera client 1", func(t *testing.T) { + ctx := gostream.WithMIMETypeHint(context.Background(), rutils.MimeTypeRawRGBA) + stream, err := camera1Client.Stream(ctx) + test.That(t, err, test.ShouldBeNil) + frame, _, err := stream.Next(ctx) + test.That(t, err, test.ShouldBeNil) + compVal, _, err := rimage.CompareImages(img, frame) + test.That(t, err, test.ShouldBeNil) + test.That(t, compVal, test.ShouldEqual, 0) // exact copy, no color conversion + }) + frame, err := camera.DecodeImageFromCamera(context.Background(), rutils.MimeTypeRawRGBA, nil, camera1Client) test.That(t, err, test.ShouldBeNil) compVal, _, err := rimage.CompareImages(img, frame) test.That(t, err, test.ShouldBeNil) test.That(t, compVal, test.ShouldEqual, 0) // exact copy, no color conversion - imageReleasedMu.Lock() - test.That(t, imageReleased, test.ShouldBeTrue) - imageReleasedMu.Unlock() + _, err = camera.DecodeImageFromCamera(context.Background(), rutils.MimeTypeRawRGBA, map[string]interface{}{"empty": true}, camera1Client) + test.That(t, err, test.ShouldNotBeNil) + test.That(t, err.Error(), test.ShouldContainSubstring, "received empty bytes from Image method") pcB, err := camera1Client.NextPointCloud(context.Background()) test.That(t, err, test.ShouldBeNil) @@ -230,16 +238,23 @@ func TestClient(t *testing.T) { client, err := resourceAPI.RPCClient(context.Background(), conn, "", camera.Named(depthCameraName), logger) test.That(t, err, test.ShouldBeNil) - ctx := gostream.WithMIMETypeHint( - context.Background(), rutils.WithLazyMIMEType(rutils.MimeTypePNG)) - frame, _, err := camera.ReadImage(ctx, client) + // TODO(hexbabe): remove below test when Stream/ReadImage pattern is refactored + t.Run("ReadImage from camera depth", func(t *testing.T) { + ctx := gostream.WithMIMETypeHint( + context.Background(), rutils.WithLazyMIMEType(rutils.MimeTypePNG)) + frame, _, err := camera.ReadImage(ctx, client) + test.That(t, err, test.ShouldBeNil) + dm, err := rimage.ConvertImageToDepthMap(context.Background(), frame) + test.That(t, err, test.ShouldBeNil) + test.That(t, dm, test.ShouldResemble, depthImg) + }) + + ctx := context.Background() + frame, err := camera.DecodeImageFromCamera(ctx, rutils.WithLazyMIMEType(rutils.MimeTypePNG), nil, client) test.That(t, err, test.ShouldBeNil) dm, err := rimage.ConvertImageToDepthMap(context.Background(), frame) test.That(t, err, test.ShouldBeNil) test.That(t, dm, test.ShouldResemble, depthImg) - imageReleasedMu.Lock() - test.That(t, imageReleased, test.ShouldBeTrue) - imageReleasedMu.Unlock() test.That(t, client.Close(context.Background()), test.ShouldBeNil) test.That(t, conn.Close(), test.ShouldBeNil) @@ -251,9 +266,9 @@ func TestClient(t *testing.T) { client2, err := resourceAPI.RPCClient(context.Background(), conn, "", camera.Named(failCameraName), logger) test.That(t, err, test.ShouldBeNil) - _, _, err = camera.ReadImage(context.Background(), client2) + _, _, err = client2.Image(context.Background(), "", nil) test.That(t, err, test.ShouldNotBeNil) - test.That(t, err.Error(), test.ShouldContainSubstring, errStreamFailed.Error()) + test.That(t, err.Error(), test.ShouldContainSubstring, errGetImageFailed.Error()) _, err = client2.NextPointCloud(context.Background()) test.That(t, err, test.ShouldNotBeNil) @@ -272,63 +287,58 @@ func TestClient(t *testing.T) { camClient, err := camera.NewClientFromConn(context.Background(), conn, "", camera.Named(testCameraName), logger) test.That(t, err, test.ShouldBeNil) - injectCamera.StreamFunc = func(ctx context.Context, errHandlers ...gostream.ErrorHandler) (gostream.VideoStream, error) { - extra, ok := camera.FromContext(ctx) - test.That(t, ok, test.ShouldBeTrue) + injectCamera.ImageFunc = func(ctx context.Context, mimeType string, extra map[string]interface{}) ([]byte, camera.ImageMetadata, error) { test.That(t, extra, test.ShouldBeEmpty) - return nil, errStreamFailed + return nil, camera.ImageMetadata{}, errGetImageFailed } ctx := context.Background() - _, _, err = camera.ReadImage(ctx, camClient) + // TODO(hexbabe): remove below test when Stream/ReadImage pattern is refactored + t.Run("ReadImage test failure", func(t *testing.T) { + _, _, err = camera.ReadImage(ctx, camClient) + test.That(t, err, test.ShouldNotBeNil) + test.That(t, err.Error(), test.ShouldContainSubstring, errGetImageFailed.Error()) + }) + _, _, err = camClient.Image(ctx, "", nil) test.That(t, err, test.ShouldNotBeNil) - test.That(t, err.Error(), test.ShouldContainSubstring, errStreamFailed.Error()) + test.That(t, err.Error(), test.ShouldContainSubstring, errGetImageFailed.Error()) - injectCamera.StreamFunc = func(ctx context.Context, errHandlers ...gostream.ErrorHandler) (gostream.VideoStream, error) { - extra, ok := camera.FromContext(ctx) - test.That(t, ok, test.ShouldBeTrue) + injectCamera.ImageFunc = func(ctx context.Context, mimeType string, extra map[string]interface{}) ([]byte, camera.ImageMetadata, error) { test.That(t, len(extra), test.ShouldEqual, 1) test.That(t, extra["hello"], test.ShouldEqual, "world") - return nil, errStreamFailed + return nil, camera.ImageMetadata{}, errGetImageFailed } - // one kvp created with camera.Extra - ext := camera.Extra{"hello": "world"} - ctx = camera.NewContext(ctx, ext) - _, _, err = camera.ReadImage(ctx, camClient) + // one kvp created with map[string]interface{} + ext := map[string]interface{}{"hello": "world"} + _, _, err = camClient.Image(ctx, "", ext) test.That(t, err, test.ShouldNotBeNil) - test.That(t, err.Error(), test.ShouldContainSubstring, errStreamFailed.Error()) + test.That(t, err.Error(), test.ShouldContainSubstring, errGetImageFailed.Error()) - injectCamera.StreamFunc = func(ctx context.Context, errHandlers ...gostream.ErrorHandler) (gostream.VideoStream, error) { - extra, ok := camera.FromContext(ctx) - test.That(t, ok, test.ShouldBeTrue) + injectCamera.ImageFunc = func(ctx context.Context, mimeType string, extra map[string]interface{}) ([]byte, camera.ImageMetadata, error) { test.That(t, len(extra), test.ShouldEqual, 1) test.That(t, extra[data.FromDMString], test.ShouldBeTrue) - return nil, errStreamFailed + return nil, camera.ImageMetadata{}, errGetImageFailed } - // one kvp created with data.FromDMContextKey - ctx = context.WithValue(context.Background(), data.FromDMContextKey{}, true) - _, _, err = camera.ReadImage(ctx, camClient) + _, _, err = camClient.Image(context.Background(), "", map[string]interface{}{data.FromDMString: true}) test.That(t, err, test.ShouldNotBeNil) - test.That(t, err.Error(), test.ShouldContainSubstring, errStreamFailed.Error()) + test.That(t, err.Error(), test.ShouldContainSubstring, errGetImageFailed.Error()) - injectCamera.StreamFunc = func(ctx context.Context, errHandlers ...gostream.ErrorHandler) (gostream.VideoStream, error) { - extra, ok := camera.FromContext(ctx) - test.That(t, ok, test.ShouldBeTrue) + injectCamera.ImageFunc = func(ctx context.Context, mimeType string, extra map[string]interface{}) ([]byte, camera.ImageMetadata, error) { test.That(t, len(extra), test.ShouldEqual, 2) test.That(t, extra["hello"], test.ShouldEqual, "world") test.That(t, extra[data.FromDMString], test.ShouldBeTrue) - return nil, errStreamFailed + return nil, camera.ImageMetadata{}, errGetImageFailed } // merge values from data and camera - ext = camera.Extra{"hello": "world"} - ctx = camera.NewContext(ctx, ext) - _, _, err = camera.ReadImage(ctx, camClient) + ext = map[string]interface{}{"hello": "world", data.FromDMString: true} + ctx = context.Background() + _, _, err = camClient.Image(ctx, "", ext) test.That(t, err, test.ShouldNotBeNil) - test.That(t, err.Error(), test.ShouldContainSubstring, errStreamFailed.Error()) + test.That(t, err.Error(), test.ShouldContainSubstring, errGetImageFailed.Error()) test.That(t, conn.Close(), test.ShouldBeNil) }) @@ -453,16 +463,19 @@ func TestClientLazyImage(t *testing.T) { imgPng, err := png.Decode(bytes.NewReader(imgBuf.Bytes())) test.That(t, err, test.ShouldBeNil) - injectCamera.StreamFunc = func(ctx context.Context, errHandlers ...gostream.ErrorHandler) (gostream.VideoStream, error) { - return gostream.NewEmbeddedVideoStreamFromReader(gostream.VideoReaderFunc(func(ctx context.Context) (image.Image, func(), error) { - mimeType, _ := rutils.CheckLazyMIMEType(gostream.MIMETypeHint(ctx, rutils.MimeTypeRawRGBA)) - switch mimeType { - case rutils.MimeTypePNG: - return imgPng, func() {}, nil - default: - return nil, nil, errInvalidMimeType - } - })), nil + injectCamera.ImageFunc = func(ctx context.Context, mimeType string, extra map[string]interface{}) ([]byte, camera.ImageMetadata, error) { + if mimeType == "" { + mimeType = rutils.MimeTypeRawRGBA + } + mimeType, _ = rutils.CheckLazyMIMEType(mimeType) + switch mimeType { + case rutils.MimeTypePNG: + resBytes, err := rimage.EncodeImage(ctx, imgPng, mimeType) + test.That(t, err, test.ShouldBeNil) + return resBytes, camera.ImageMetadata{MimeType: mimeType}, nil + default: + return nil, camera.ImageMetadata{}, errInvalidMimeType + } } resources := map[resource.Name]camera.Camera{ @@ -483,16 +496,36 @@ func TestClientLazyImage(t *testing.T) { camera1Client, err := camera.NewClientFromConn(context.Background(), conn, "", camera.Named(testCameraName), logger) test.That(t, err, test.ShouldBeNil) - ctx := gostream.WithMIMETypeHint(context.Background(), rutils.MimeTypePNG) - frame, _, err := camera.ReadImage(ctx, camera1Client) + // TODO(hexbabe): remove below test when Stream/ReadImage pattern is refactored + t.Run("lazily decode from ReadImage without lazy suffix", func(t *testing.T) { + ctx := gostream.WithMIMETypeHint(context.Background(), rutils.MimeTypePNG) + frame, _, err := camera.ReadImage(ctx, camera1Client) + test.That(t, err, test.ShouldBeNil) + // Should always lazily decode + test.That(t, frame, test.ShouldHaveSameTypeAs, &rimage.LazyEncodedImage{}) + frameLazy := frame.(*rimage.LazyEncodedImage) + test.That(t, frameLazy.RawData(), test.ShouldResemble, imgBuf.Bytes()) + }) + + ctx := context.Background() + frame, err := camera.DecodeImageFromCamera(ctx, rutils.MimeTypePNG, nil, camera1Client) test.That(t, err, test.ShouldBeNil) // Should always lazily decode test.That(t, frame, test.ShouldHaveSameTypeAs, &rimage.LazyEncodedImage{}) frameLazy := frame.(*rimage.LazyEncodedImage) test.That(t, frameLazy.RawData(), test.ShouldResemble, imgBuf.Bytes()) - ctx = gostream.WithMIMETypeHint(context.Background(), rutils.WithLazyMIMEType(rutils.MimeTypePNG)) - frame, _, err = camera.ReadImage(ctx, camera1Client) + // TODO(hexbabe): remove below test when Stream/ReadImage pattern is refactored + t.Run("lazily decode from ReadImage", func(t *testing.T) { + ctx = gostream.WithMIMETypeHint(context.Background(), rutils.WithLazyMIMEType(rutils.MimeTypePNG)) + frame, _, err = camera.ReadImage(ctx, camera1Client) + test.That(t, err, test.ShouldBeNil) + test.That(t, frame, test.ShouldHaveSameTypeAs, &rimage.LazyEncodedImage{}) + frameLazy = frame.(*rimage.LazyEncodedImage) + }) + + ctx = context.Background() + frame, err = camera.DecodeImageFromCamera(ctx, rutils.WithLazyMIMEType(rutils.MimeTypePNG), nil, camera1Client) test.That(t, err, test.ShouldBeNil) test.That(t, frame, test.ShouldHaveSameTypeAs, &rimage.LazyEncodedImage{}) frameLazy = frame.(*rimage.LazyEncodedImage) @@ -582,10 +615,10 @@ func TestClientStreamAfterClose(t *testing.T) { injectCamera.PropertiesFunc = func(ctx context.Context) (camera.Properties, error) { return camera.Properties{}, nil } - injectCamera.StreamFunc = func(ctx context.Context, errHandlers ...gostream.ErrorHandler) (gostream.VideoStream, error) { - return gostream.NewEmbeddedVideoStreamFromReader(gostream.VideoReaderFunc(func(ctx context.Context) (image.Image, func(), error) { - return img, func() {}, nil - })), nil + injectCamera.ImageFunc = func(ctx context.Context, mimeType string, extra map[string]interface{}) ([]byte, camera.ImageMetadata, error) { + imgBytes, err := rimage.EncodeImage(ctx, img, mimeType) + test.That(t, err, test.ShouldBeNil) + return imgBytes, camera.ImageMetadata{MimeType: mimeType}, nil } // Register CameraService API in our gRPC server. diff --git a/components/camera/collectors.go b/components/camera/collectors.go index 37663e7335f..e2df6868189 100644 --- a/components/camera/collectors.go +++ b/components/camera/collectors.go @@ -12,7 +12,6 @@ import ( "go.viam.com/rdk/data" "go.viam.com/rdk/pointcloud" - "go.viam.com/rdk/rimage" "go.viam.com/rdk/utils" ) @@ -88,13 +87,18 @@ func newReadImageCollector(resource interface{}, params data.CollectorParams) (d } } + mimeStr := new(wrapperspb.StringValue) + if err := mimeType.UnmarshalTo(mimeStr); err != nil { + return nil, err + } + cFunc := data.CaptureFunc(func(ctx context.Context, _ map[string]*anypb.Any) (interface{}, error) { _, span := trace.StartSpan(ctx, "camera::data::collector::CaptureFunc::ReadImage") defer span.End() ctx = context.WithValue(ctx, data.FromDMContextKey{}, true) - img, release, err := ReadImage(ctx, camera) + img, _, err := camera.Image(ctx, mimeStr.Value, nil) if err != nil { // A modular filter component can be created to filter the readings from a component. The error ErrNoCaptureToStore // is used in the datamanager to exclude readings from being captured and stored. @@ -104,22 +108,8 @@ func newReadImageCollector(resource interface{}, params data.CollectorParams) (d return nil, data.FailedToReadErr(params.ComponentName, readImage.String(), err) } - defer func() { - if release != nil { - release() - } - }() - - mimeStr := new(wrapperspb.StringValue) - if err := mimeType.UnmarshalTo(mimeStr); err != nil { - return nil, err - } - outBytes, err := rimage.EncodeImage(ctx, img, mimeStr.Value) - if err != nil { - return nil, err - } - return outBytes, nil + return img, nil }) return data.NewCollector(cFunc, params) } diff --git a/components/camera/collectors_test.go b/components/camera/collectors_test.go index 60af3460585..2b4a596077d 100644 --- a/components/camera/collectors_test.go +++ b/components/camera/collectors_test.go @@ -22,7 +22,6 @@ import ( "go.viam.com/rdk/components/camera" "go.viam.com/rdk/data" - "go.viam.com/rdk/gostream" "go.viam.com/rdk/logging" "go.viam.com/rdk/pointcloud" "go.viam.com/rdk/resource" @@ -185,10 +184,12 @@ func newCamera( pcd pointcloud.PointCloud, ) camera.Camera { v := &inject.Camera{} - v.StreamFunc = func(ctx context.Context, errHandlers ...gostream.ErrorHandler) (gostream.VideoStream, error) { - return gostream.NewEmbeddedVideoStreamFromReader(gostream.VideoReaderFunc(func(ctx context.Context) (image.Image, func(), error) { - return left, func() {}, nil - })), nil + v.ImageFunc = func(ctx context.Context, mimeType string, extra map[string]interface{}) ([]byte, camera.ImageMetadata, error) { + viamLogoJpegBytes, err := io.ReadAll(base64.NewDecoder(base64.StdEncoding, bytes.NewReader(viamLogoJpegB64))) + if err != nil { + return nil, camera.ImageMetadata{}, err + } + return viamLogoJpegBytes, camera.ImageMetadata{MimeType: mimeType}, nil } v.NextPointCloudFunc = func(ctx context.Context) (pointcloud.PointCloud, error) { diff --git a/components/camera/extra.go b/components/camera/extra.go deleted file mode 100644 index 35dd04fbfb9..00000000000 --- a/components/camera/extra.go +++ /dev/null @@ -1,22 +0,0 @@ -package camera - -import "context" - -// Extra is the type of value stored in the Contexts. -type ( - Extra map[string]interface{} - key int -) - -var extraKey key - -// NewContext returns a new Context that carries value Extra. -func NewContext(ctx context.Context, e Extra) context.Context { - return context.WithValue(ctx, extraKey, e) -} - -// FromContext returns the Extra value stored in ctx, if any. -func FromContext(ctx context.Context) (Extra, bool) { - ext, ok := ctx.Value(extraKey).(Extra) - return ext, ok -} diff --git a/components/camera/extra_test.go b/components/camera/extra_test.go deleted file mode 100644 index be0f0a3f81a..00000000000 --- a/components/camera/extra_test.go +++ /dev/null @@ -1,27 +0,0 @@ -package camera - -import ( - "context" - "testing" - - "go.viam.com/test" -) - -func TestExtraEmpty(t *testing.T) { - ctx := context.Background() - _, ok := FromContext(ctx) - test.That(t, ok, test.ShouldBeFalse) -} - -func TestExtraRoundtrip(t *testing.T) { - ctx := context.Background() - expected := Extra{ - "It goes one": "by one", - "even two": "by two", - } - - ctx = NewContext(ctx, expected) - actual, ok := FromContext(ctx) - test.That(t, ok, test.ShouldBeTrue) - test.That(t, actual, test.ShouldEqual, expected) -} diff --git a/components/camera/fake/camera_test.go b/components/camera/fake/camera_test.go index 510ba9f3161..06e766baef4 100644 --- a/components/camera/fake/camera_test.go +++ b/components/camera/fake/camera_test.go @@ -15,6 +15,7 @@ import ( "go.viam.com/rdk/components/camera/rtppassthrough" "go.viam.com/rdk/logging" "go.viam.com/rdk/resource" + "go.viam.com/rdk/utils" ) func TestFakeCameraParams(t *testing.T) { @@ -90,24 +91,36 @@ func TestRTPPassthrough(t *testing.T) { _, err := cfg.Validate("", camera.API.SubtypeName) test.That(t, err, test.ShouldBeNil) - camera, err := NewCamera(context.Background(), nil, cfg, logger) + cam, err := NewCamera(context.Background(), nil, cfg, logger) test.That(t, err, test.ShouldBeNil) - stream, err := camera.Stream(context.Background()) - test.That(t, err, test.ShouldBeNil) - img, _, err := stream.Next(context.Background()) + // TODO(hexbabe): remove below test when Stream/ReadImage pattern is refactored + t.Run("test Stream and Next", func(t *testing.T) { + camera, err := NewCamera(context.Background(), nil, cfg, logger) + test.That(t, err, test.ShouldBeNil) + + stream, err := camera.Stream(context.Background()) + test.That(t, err, test.ShouldBeNil) + img, _, err := stream.Next(context.Background()) + test.That(t, err, test.ShouldBeNil) + // GetImage returns the world jpeg + test.That(t, img.Bounds(), test.ShouldResemble, image.Rectangle{Max: image.Point{X: 480, Y: 270}}) + test.That(t, camera, test.ShouldNotBeNil) + }) + + img, err := camera.DecodeImageFromCamera(context.Background(), utils.MimeTypeRawRGBA, nil, cam) test.That(t, err, test.ShouldBeNil) // GetImage returns the world jpeg test.That(t, img.Bounds(), test.ShouldResemble, image.Rectangle{Max: image.Point{X: 480, Y: 270}}) - test.That(t, camera, test.ShouldNotBeNil) + test.That(t, cam, test.ShouldNotBeNil) // implements rtppassthrough.Source - cam, ok := camera.(rtppassthrough.Source) + passthroughCam, ok := cam.(rtppassthrough.Source) test.That(t, ok, test.ShouldBeTrue) var called atomic.Bool pktChan := make(chan []*rtp.Packet) // SubscribeRTP succeeds - sub, err := cam.SubscribeRTP(context.Background(), 512, func(pkts []*rtp.Packet) { + sub, err := passthroughCam.SubscribeRTP(context.Background(), 512, func(pkts []*rtp.Packet) { if called.Load() { return } @@ -119,19 +132,19 @@ func TestRTPPassthrough(t *testing.T) { test.That(t, len(pkts), test.ShouldEqual, 4) // Unsubscribe fails when provided an ID for which there is no subscription - test.That(t, cam.Unsubscribe(context.Background(), uuid.New()), test.ShouldBeError, errors.New("id not found")) + test.That(t, passthroughCam.Unsubscribe(context.Background(), uuid.New()), test.ShouldBeError, errors.New("id not found")) test.That(t, sub.Terminated.Err(), test.ShouldBeNil) // Unsubscribe succeeds when provided an ID for which there is a subscription - test.That(t, cam.Unsubscribe(context.Background(), sub.ID), test.ShouldBeNil) + test.That(t, passthroughCam.Unsubscribe(context.Background(), sub.ID), test.ShouldBeNil) // Unsubscribe cancels the subscription test.That(t, sub.Terminated.Err(), test.ShouldBeError, context.Canceled) // subscriptions are cleaned up after Close is called - sub2, err := cam.SubscribeRTP(context.Background(), 512, func(pkts []*rtp.Packet) {}) + sub2, err := passthroughCam.SubscribeRTP(context.Background(), 512, func(pkts []*rtp.Packet) {}) test.That(t, err, test.ShouldBeNil) test.That(t, sub2.Terminated.Err(), test.ShouldBeNil) - test.That(t, camera.Close(context.Background()), test.ShouldBeNil) + test.That(t, cam.Close(context.Background()), test.ShouldBeNil) test.That(t, sub2.Terminated.Err(), test.ShouldBeError, context.Canceled) }) diff --git a/components/camera/fake/image_file_test.go b/components/camera/fake/image_file_test.go index ab090f90a56..c1415b5265a 100644 --- a/components/camera/fake/image_file_test.go +++ b/components/camera/fake/image_file_test.go @@ -16,6 +16,7 @@ import ( "go.viam.com/rdk/logging" "go.viam.com/rdk/resource" "go.viam.com/rdk/rimage" + "go.viam.com/rdk/utils" ) func TestPCD(t *testing.T) { @@ -26,6 +27,7 @@ func TestPCD(t *testing.T) { cam, err := newCamera(ctx, resource.Name{API: camera.API}, cfg, logger) test.That(t, err, test.ShouldBeNil) + // TODO(hexbabe): remove below test when Stream/ReadImage pattern is refactored _, err = cam.Stream(ctx) test.That(t, err, test.ShouldBeNil) @@ -45,16 +47,24 @@ func TestPCD(t *testing.T) { cam, err = newCamera(ctx, resource.Name{API: camera.API}, cfg, logger) test.That(t, err, test.ShouldBeNil) - stream, err := cam.Stream(ctx) - test.That(t, err, test.ShouldBeNil) - readInImage, err := rimage.NewImageFromFile(artifact.MustPath("vision/objectdetection/detection_test.jpg")) test.That(t, err, test.ShouldBeNil) - strmImg, _, err := stream.Next(ctx) + // TODO(hexbabe): remove below test when Stream/ReadImage pattern is refactored + t.Run("test Stream and Next", func(t *testing.T) { + stream, err := cam.Stream(ctx) + test.That(t, err, test.ShouldBeNil) + strmImg, _, err := stream.Next(ctx) + test.That(t, err, test.ShouldBeNil) + test.That(t, strmImg, test.ShouldResemble, readInImage) + test.That(t, strmImg.Bounds(), test.ShouldResemble, readInImage.Bounds()) + }) + + imgBytes, _, err := cam.Image(ctx, utils.MimeTypeJPEG, nil) + test.That(t, err, test.ShouldBeNil) + expectedBytes, err := rimage.EncodeImage(ctx, readInImage, utils.MimeTypeJPEG) test.That(t, err, test.ShouldBeNil) - test.That(t, strmImg, test.ShouldResemble, readInImage) - test.That(t, strmImg.Bounds(), test.ShouldResemble, readInImage.Bounds()) + test.That(t, imgBytes, test.ShouldResemble, expectedBytes) err = cam.Close(ctx) test.That(t, err, test.ShouldBeNil) @@ -68,19 +78,27 @@ func TestColor(t *testing.T) { cam, err := newCamera(ctx, resource.Name{API: camera.API}, cfg, logger) test.That(t, err, test.ShouldBeNil) - stream, err := cam.Stream(ctx) - test.That(t, err, test.ShouldBeNil) - _, err = cam.NextPointCloud(ctx) test.That(t, err, test.ShouldNotBeNil) readInImage, err := rimage.NewImageFromFile(artifact.MustPath("vision/objectdetection/detection_test.jpg")) test.That(t, err, test.ShouldBeNil) - strmImg, _, err := stream.Next(ctx) + // TODO(hexbabe): remove below test when Stream/ReadImage pattern is refactored + t.Run("test Stream and Next", func(t *testing.T) { + stream, err := cam.Stream(ctx) + test.That(t, err, test.ShouldBeNil) + strmImg, _, err := stream.Next(ctx) + test.That(t, err, test.ShouldBeNil) + test.That(t, strmImg, test.ShouldResemble, readInImage) + test.That(t, strmImg.Bounds(), test.ShouldResemble, readInImage.Bounds()) + }) + + imgBytes, _, err := cam.Image(ctx, utils.MimeTypeJPEG, nil) test.That(t, err, test.ShouldBeNil) - test.That(t, strmImg, test.ShouldResemble, readInImage) - test.That(t, strmImg.Bounds(), test.ShouldResemble, readInImage.Bounds()) + expectedBytes, err := rimage.EncodeImage(ctx, readInImage, utils.MimeTypeJPEG) + test.That(t, err, test.ShouldBeNil) + test.That(t, imgBytes, test.ShouldResemble, expectedBytes) err = cam.Close(ctx) test.That(t, err, test.ShouldBeNil) @@ -108,18 +126,29 @@ func TestColorOddResolution(t *testing.T) { cam, err := newCamera(ctx, resource.Name{API: camera.API}, cfg, logger) test.That(t, err, test.ShouldBeNil) - stream, err := cam.Stream(ctx) - test.That(t, err, test.ShouldBeNil) + // TODO(hexbabe): remove below test when Stream/ReadImage pattern is refactored + t.Run("test Stream and Next", func(t *testing.T) { + stream, err := cam.Stream(ctx) + test.That(t, err, test.ShouldBeNil) - readInImage, err := rimage.NewImageFromFile(imgFilePath) - test.That(t, err, test.ShouldBeNil) + readInImage, err := rimage.NewImageFromFile(imgFilePath) + test.That(t, err, test.ShouldBeNil) - strmImg, _, err := stream.Next(ctx) + strmImg, _, err := stream.Next(ctx) + test.That(t, err, test.ShouldBeNil) + + expectedBounds := image.Rect(0, 0, readInImage.Bounds().Dx()-1, readInImage.Bounds().Dy()-1) + test.That(t, strmImg, test.ShouldResemble, readInImage.SubImage(expectedBounds)) + }) + + strmImg, err := camera.DecodeImageFromCamera(ctx, utils.MimeTypeRawRGBA, nil, cam) test.That(t, err, test.ShouldBeNil) - expectedBounds := image.Rect(0, 0, readInImage.Bounds().Dx()-1, readInImage.Bounds().Dy()-1) - test.That(t, strmImg, test.ShouldResemble, readInImage.SubImage(expectedBounds)) + expectedBounds := image.Rect(0, 0, img.Bounds().Dx()-1, img.Bounds().Dy()-1) test.That(t, strmImg.Bounds(), test.ShouldResemble, expectedBounds) + val, _, err := rimage.CompareImages(strmImg, img.SubImage(expectedBounds)) + test.That(t, err, test.ShouldBeNil) + test.That(t, val, test.ShouldEqual, 0) err = cam.Close(ctx) test.That(t, err, test.ShouldBeNil) diff --git a/components/camera/ffmpeg/ffmpeg_test.go b/components/camera/ffmpeg/ffmpeg_test.go index f3b36771ea3..f8f9fb5f626 100644 --- a/components/camera/ffmpeg/ffmpeg_test.go +++ b/components/camera/ffmpeg/ffmpeg_test.go @@ -9,6 +9,7 @@ import ( "go.viam.com/utils/artifact" "go.viam.com/rdk/logging" + "go.viam.com/rdk/utils" ) func TestFFMPEGCamera(t *testing.T) { @@ -17,11 +18,14 @@ func TestFFMPEGCamera(t *testing.T) { path := artifact.MustPath("components/camera/ffmpeg/testsrc.mpg") cam, err := NewFFMPEGCamera(ctx, &Config{VideoPath: path}, logger) test.That(t, err, test.ShouldBeNil) + // TODO(hexbabe): remove below test when Stream/ReadImage pattern is refactored stream, err := cam.Stream(ctx) test.That(t, err, test.ShouldBeNil) for i := 0; i < 5; i++ { _, _, err := stream.Next(ctx) test.That(t, err, test.ShouldBeNil) + _, _, err = cam.Image(ctx, utils.MimeTypeJPEG, nil) + test.That(t, err, test.ShouldBeNil) } test.That(t, stream.Close(context.Background()), test.ShouldBeNil) test.That(t, cam.Close(context.Background()), test.ShouldBeNil) diff --git a/components/camera/replaypcd/replaypcd.go b/components/camera/replaypcd/replaypcd.go index 9740d99e5a7..fb5e0f0257e 100644 --- a/components/camera/replaypcd/replaypcd.go +++ b/components/camera/replaypcd/replaypcd.go @@ -346,6 +346,10 @@ func (replay *pcdCamera) Stream(ctx context.Context, errHandlers ...gostream.Err return stream, errors.New("Stream is unimplemented") } +func (replay *pcdCamera) Image(ctx context.Context, mimeType string, extra map[string]interface{}) ([]byte, camera.ImageMetadata, error) { + return nil, camera.ImageMetadata{}, errors.New("Image is unimplemented") +} + // Close stops replay camera, closes the channels and its connections to the cloud. func (replay *pcdCamera) Close(ctx context.Context) error { replay.mu.Lock() diff --git a/components/camera/server.go b/components/camera/server.go index 0f8fb8b2a19..a38e68e9f5a 100644 --- a/components/camera/server.go +++ b/components/camera/server.go @@ -3,6 +3,7 @@ package camera import ( "bytes" "context" + "fmt" "image" "github.com/pkg/errors" @@ -11,7 +12,6 @@ import ( pb "go.viam.com/api/component/camera/v1" "google.golang.org/genproto/googleapis/api/httpbody" - "go.viam.com/rdk/gostream" "go.viam.com/rdk/logging" "go.viam.com/rdk/pointcloud" "go.viam.com/rdk/protoutils" @@ -36,12 +36,6 @@ func NewRPCServiceServer(coll resource.APIResourceCollection[Camera]) interface{ return &serviceServer{coll: coll, logger: logger, imgTypes: imgTypes} } -// ReadImager is an interface that cameras can implement when they allow for returning a single -// image object. -type ReadImager interface { - Read(ctx context.Context) (image.Image, func(), error) -} - // GetImage returns an image from a camera of the underlying robot. If a specific MIME type // is requested and is not available, an error is returned. func (s *serviceServer) GetImage( @@ -75,45 +69,17 @@ func (s *serviceServer) GetImage( req.MimeType = utils.MimeTypeJPEG } } - req.MimeType = utils.WithLazyMIMEType(req.MimeType) - ext := req.Extra.AsMap() - ctx = NewContext(ctx, ext) - - var img image.Image - var release func() - switch castedCam := cam.(type) { - case ReadImager: - // RSDK-8663: If available, call a method that reads exactly one image. The default - // `ReadImage` implementation will otherwise create a gostream `Stream`, call `Next` and - // `Close` the stream. However, between `Next` and `Close`, the stream may have pulled a - // second image from the underlying camera. This is particularly noticeable on camera - // clients. Where a second `GetImage` request can be processed/returned over the - // network. Just to be discarded. - img, release, err = castedCam.Read(ctx) - default: - img, release, err = ReadImage(gostream.WithMIMETypeHint(ctx, req.MimeType), cam) - } + resBytes, resMetadata, err := cam.Image(ctx, req.MimeType, req.Extra.AsMap()) if err != nil { return nil, err } - defer func() { - if release != nil { - release() - } - }() - - actualMIME, _ := utils.CheckLazyMIMEType(req.MimeType) - resp := pb.GetImageResponse{ - MimeType: actualMIME, - } - outBytes, err := rimage.EncodeImage(ctx, img, req.MimeType) - if err != nil { - return nil, err + if len(resBytes) == 0 { + return nil, fmt.Errorf("received empty bytes from Image method of %s", req.Name) } - resp.Image = outBytes - return &resp, nil + actualMIME, _ := utils.CheckLazyMIMEType(resMetadata.MimeType) + return &pb.GetImageResponse{MimeType: actualMIME, Image: resBytes}, nil } // GetImages returns a list of images and metadata from a camera of the underlying robot. diff --git a/components/camera/server_test.go b/components/camera/server_test.go index db5c366e917..55832a12c97 100644 --- a/components/camera/server_test.go +++ b/components/camera/server_test.go @@ -7,7 +7,6 @@ import ( "image" "image/jpeg" "image/png" - "sync" "testing" "time" @@ -17,7 +16,6 @@ import ( "go.viam.com/rdk/components/camera" "go.viam.com/rdk/data" - "go.viam.com/rdk/gostream" "go.viam.com/rdk/pointcloud" "go.viam.com/rdk/resource" "go.viam.com/rdk/rimage" @@ -31,7 +29,7 @@ var ( errGeneratePointCloudFailed = errors.New("can't generate next point cloud") errPropertiesFailed = errors.New("can't get camera properties") errCameraProjectorFailed = errors.New("can't get camera properties") - errStreamFailed = errors.New("can't generate stream") + errGetImageFailed = errors.New("can't get image") errCameraUnimplemented = errors.New("not found") ) @@ -83,8 +81,6 @@ func TestServer(t *testing.T) { err = pcA.Set(pointcloud.NewVector(5, 5, 5), nil) test.That(t, err, test.ShouldBeNil) - var imageReleased bool - var imageReleasedMu sync.Mutex injectCamera.NextPointCloudFunc = func(ctx context.Context) (pointcloud.PointCloud, error) { return pcA, nil } @@ -112,25 +108,33 @@ func TestServer(t *testing.T) { return projA, nil } wooMIME := "image/woohoo" - injectCamera.StreamFunc = func(ctx context.Context, errHandlers ...gostream.ErrorHandler) (gostream.VideoStream, error) { - return gostream.NewEmbeddedVideoStreamFromReader(gostream.VideoReaderFunc(func(ctx context.Context) (image.Image, func(), error) { - imageReleasedMu.Lock() - imageReleased = true - imageReleasedMu.Unlock() - mimeType, _ := utils.CheckLazyMIMEType(gostream.MIMETypeHint(ctx, utils.MimeTypeRawRGBA)) - switch mimeType { - case "", utils.MimeTypeRawRGBA: - return img, func() {}, nil - case utils.MimeTypePNG: - return imgPng, func() {}, nil - case utils.MimeTypeJPEG: - return imgJpeg, func() {}, nil - case "image/woohoo": - return rimage.NewLazyEncodedImage([]byte{1, 2, 3}, mimeType), func() {}, nil - default: - return nil, nil, errInvalidMimeType - } - })), nil + emptyMIME := "image/empty" + injectCamera.ImageFunc = func(ctx context.Context, mimeType string, extra map[string]interface{}) ([]byte, camera.ImageMetadata, error) { + if mimeType == "" { + mimeType = utils.MimeTypeRawRGBA + } + mimeType, _ = utils.CheckLazyMIMEType(mimeType) + + var respImg image.Image + switch mimeType { + case "", utils.MimeTypeRawRGBA: + respImg = img + case utils.MimeTypePNG: + respImg = imgPng + case utils.MimeTypeJPEG: + respImg = imgJpeg + case wooMIME: + respImg = rimage.NewLazyEncodedImage([]byte{1, 2, 3}, mimeType) + case emptyMIME: + return []byte{}, camera.ImageMetadata{}, nil + default: + return nil, camera.ImageMetadata{}, errInvalidMimeType + } + resBytes, err := rimage.EncodeImage(ctx, respImg, mimeType) + if err != nil { + return nil, camera.ImageMetadata{}, err + } + return resBytes, camera.ImageMetadata{MimeType: mimeType}, nil } // depth camera depthImage := rimage.NewEmptyDepthMap(10, 20) @@ -155,13 +159,16 @@ func TestServer(t *testing.T) { injectCameraDepth.ProjectorFunc = func(ctx context.Context) (transform.Projector, error) { return projA, nil } - injectCameraDepth.StreamFunc = func(ctx context.Context, errHandlers ...gostream.ErrorHandler) (gostream.VideoStream, error) { - return gostream.NewEmbeddedVideoStreamFromReader(gostream.VideoReaderFunc(func(ctx context.Context) (image.Image, func(), error) { - imageReleasedMu.Lock() - imageReleased = true - imageReleasedMu.Unlock() - return depthImage, func() {}, nil - })), nil + injectCameraDepth.ImageFunc = func( + ctx context.Context, + mimeType string, + extra map[string]interface{}, + ) ([]byte, camera.ImageMetadata, error) { + resBytes, err := rimage.EncodeImage(ctx, depthImage, mimeType) + if err != nil { + return nil, camera.ImageMetadata{}, err + } + return resBytes, camera.ImageMetadata{MimeType: mimeType}, nil } // bad camera injectCamera2.NextPointCloudFunc = func(ctx context.Context) (pointcloud.PointCloud, error) { @@ -173,8 +180,8 @@ func TestServer(t *testing.T) { injectCamera2.ProjectorFunc = func(ctx context.Context) (transform.Projector, error) { return nil, errCameraProjectorFailed } - injectCamera2.StreamFunc = func(ctx context.Context, errHandlers ...gostream.ErrorHandler) (gostream.VideoStream, error) { - return nil, errStreamFailed + injectCamera2.ImageFunc = func(ctx context.Context, mimeType string, extra map[string]interface{}) ([]byte, camera.ImageMetadata, error) { + return nil, camera.ImageMetadata{}, errGetImageFailed } // does a depth camera transfer its depth image properly @@ -190,9 +197,6 @@ func TestServer(t *testing.T) { &pb.GetImageRequest{Name: testCameraName, MimeType: utils.MimeTypeRawRGBA}, ) test.That(t, err, test.ShouldBeNil) - imageReleasedMu.Lock() - test.That(t, imageReleased, test.ShouldBeTrue) - imageReleasedMu.Unlock() test.That(t, resp.MimeType, test.ShouldEqual, utils.MimeTypeRawRGBA) test.That(t, resp.Image[rimage.RawRGBAHeaderLength:], test.ShouldResemble, img.Pix) @@ -202,9 +206,6 @@ func TestServer(t *testing.T) { &pb.GetImageRequest{Name: testCameraName, MimeType: ""}, ) test.That(t, err, test.ShouldBeNil) - imageReleasedMu.Lock() - test.That(t, imageReleased, test.ShouldBeTrue) - imageReleasedMu.Unlock() test.That(t, resp.MimeType, test.ShouldEqual, utils.MimeTypeJPEG) test.That(t, resp.Image, test.ShouldNotBeNil) @@ -214,29 +215,17 @@ func TestServer(t *testing.T) { &pb.GetImageRequest{Name: depthCameraName, MimeType: ""}, ) test.That(t, err, test.ShouldBeNil) - imageReleasedMu.Lock() - test.That(t, imageReleased, test.ShouldBeTrue) - imageReleasedMu.Unlock() test.That(t, resp.MimeType, test.ShouldEqual, utils.MimeTypeRawDepth) test.That(t, resp.Image, test.ShouldNotBeNil) - imageReleasedMu.Lock() - imageReleased = false - imageReleasedMu.Unlock() resp, err = cameraServer.GetImage(context.Background(), &pb.GetImageRequest{ Name: testCameraName, MimeType: "image/png", }) test.That(t, err, test.ShouldBeNil) - imageReleasedMu.Lock() - test.That(t, imageReleased, test.ShouldBeTrue) - imageReleasedMu.Unlock() test.That(t, resp.MimeType, test.ShouldEqual, utils.MimeTypePNG) test.That(t, resp.Image, test.ShouldResemble, imgBuf.Bytes()) - imageReleasedMu.Lock() - imageReleased = false - imageReleasedMu.Unlock() _, err = cameraServer.GetImage(context.Background(), &pb.GetImageRequest{ Name: testCameraName, MimeType: "image/who", @@ -245,17 +234,11 @@ func TestServer(t *testing.T) { test.That(t, err.Error(), test.ShouldContainSubstring, errInvalidMimeType.Error()) // depth camera - imageReleasedMu.Lock() - imageReleased = false - imageReleasedMu.Unlock() resp, err = cameraServer.GetImage( context.Background(), &pb.GetImageRequest{Name: depthCameraName, MimeType: utils.MimeTypePNG}, ) test.That(t, err, test.ShouldBeNil) - imageReleasedMu.Lock() - test.That(t, imageReleased, test.ShouldBeTrue) - imageReleasedMu.Unlock() test.That(t, resp.MimeType, test.ShouldEqual, utils.MimeTypePNG) test.That(t, resp.Image, test.ShouldNotBeNil) decodedDepth, err := rimage.DecodeImage( @@ -268,23 +251,26 @@ func TestServer(t *testing.T) { test.That(t, err, test.ShouldBeNil) test.That(t, dm, test.ShouldResemble, depthImage) - imageReleasedMu.Lock() - imageReleased = false - imageReleasedMu.Unlock() resp, err = cameraServer.GetImage(context.Background(), &pb.GetImageRequest{ Name: depthCameraName, MimeType: "image/png", }) test.That(t, err, test.ShouldBeNil) - imageReleasedMu.Lock() - test.That(t, imageReleased, test.ShouldBeTrue) - imageReleasedMu.Unlock() test.That(t, resp.MimeType, test.ShouldEqual, utils.MimeTypePNG) test.That(t, resp.Image, test.ShouldResemble, depthBuf.Bytes()) // bad camera _, err = cameraServer.GetImage(context.Background(), &pb.GetImageRequest{Name: failCameraName, MimeType: utils.MimeTypeRawRGBA}) test.That(t, err, test.ShouldNotBeNil) - test.That(t, err.Error(), test.ShouldContainSubstring, errStreamFailed.Error()) + test.That(t, err.Error(), test.ShouldContainSubstring, errGetImageFailed.Error()) + }) + + t.Run("GetImage response image bytes empty", func(t *testing.T) { + _, err := cameraServer.GetImage(context.Background(), &pb.GetImageRequest{ + Name: testCameraName, + MimeType: emptyMIME, + }) + test.That(t, err, test.ShouldNotBeNil) + test.That(t, err.Error(), test.ShouldContainSubstring, "received empty bytes from Image method") }) t.Run("GetImage with lazy", func(t *testing.T) { @@ -331,43 +317,27 @@ func TestServer(t *testing.T) { Name: testCameraName, }) test.That(t, err, test.ShouldBeNil) - imageReleasedMu.Lock() - test.That(t, imageReleased, test.ShouldBeTrue) - imageReleasedMu.Unlock() test.That(t, resp.ContentType, test.ShouldEqual, "image/jpeg") test.That(t, resp.Data, test.ShouldResemble, imgBufJpeg.Bytes()) - imageReleasedMu.Lock() - imageReleased = false - imageReleasedMu.Unlock() resp, err = cameraServer.RenderFrame(context.Background(), &pb.RenderFrameRequest{ Name: testCameraName, MimeType: "image/png", }) test.That(t, err, test.ShouldBeNil) - imageReleasedMu.Lock() - test.That(t, imageReleased, test.ShouldBeTrue) - imageReleasedMu.Unlock() test.That(t, resp.ContentType, test.ShouldEqual, "image/png") test.That(t, resp.Data, test.ShouldResemble, imgBuf.Bytes()) - imageReleasedMu.Lock() - imageReleased = false - imageReleasedMu.Unlock() - _, err = cameraServer.RenderFrame(context.Background(), &pb.RenderFrameRequest{ Name: testCameraName, MimeType: "image/who", }) test.That(t, err, test.ShouldNotBeNil) test.That(t, err.Error(), test.ShouldContainSubstring, errInvalidMimeType.Error()) - imageReleasedMu.Lock() - test.That(t, imageReleased, test.ShouldBeTrue) - imageReleasedMu.Unlock() _, err = cameraServer.RenderFrame(context.Background(), &pb.RenderFrameRequest{Name: failCameraName}) test.That(t, err, test.ShouldNotBeNil) - test.That(t, err.Error(), test.ShouldContainSubstring, errStreamFailed.Error()) + test.That(t, err.Error(), test.ShouldContainSubstring, errGetImageFailed.Error()) }) t.Run("GetPointCloud", func(t *testing.T) { @@ -435,11 +405,9 @@ func TestServer(t *testing.T) { }) t.Run("GetImage with extra", func(t *testing.T) { - injectCamera.StreamFunc = func(ctx context.Context, errHandlers ...gostream.ErrorHandler) (gostream.VideoStream, error) { - extra, ok := camera.FromContext(ctx) - test.That(t, ok, test.ShouldBeTrue) + injectCamera.ImageFunc = func(ctx context.Context, mimeType string, extra map[string]interface{}) ([]byte, camera.ImageMetadata, error) { test.That(t, extra, test.ShouldBeEmpty) - return nil, errStreamFailed + return nil, camera.ImageMetadata{}, errGetImageFailed } _, err := cameraServer.GetImage(context.Background(), &pb.GetImageRequest{ @@ -447,17 +415,15 @@ func TestServer(t *testing.T) { }) test.That(t, err, test.ShouldNotBeNil) - test.That(t, err.Error(), test.ShouldContainSubstring, errStreamFailed.Error()) + test.That(t, err.Error(), test.ShouldContainSubstring, errGetImageFailed.Error()) - injectCamera.StreamFunc = func(ctx context.Context, errHandlers ...gostream.ErrorHandler) (gostream.VideoStream, error) { - extra, ok := camera.FromContext(ctx) - test.That(t, ok, test.ShouldBeTrue) + injectCamera.ImageFunc = func(ctx context.Context, mimeType string, extra map[string]interface{}) ([]byte, camera.ImageMetadata, error) { test.That(t, len(extra), test.ShouldEqual, 1) test.That(t, extra["hello"], test.ShouldEqual, "world") - return nil, errStreamFailed + return nil, camera.ImageMetadata{}, errGetImageFailed } - ext, err := goprotoutils.StructToStructPb(camera.Extra{"hello": "world"}) + ext, err := goprotoutils.StructToStructPb(map[string]interface{}{"hello": "world"}) test.That(t, err, test.ShouldBeNil) _, err = cameraServer.GetImage(context.Background(), &pb.GetImageRequest{ @@ -466,15 +432,13 @@ func TestServer(t *testing.T) { }) test.That(t, err, test.ShouldNotBeNil) - test.That(t, err.Error(), test.ShouldContainSubstring, errStreamFailed.Error()) + test.That(t, err.Error(), test.ShouldContainSubstring, errGetImageFailed.Error()) - injectCamera.StreamFunc = func(ctx context.Context, errHandlers ...gostream.ErrorHandler) (gostream.VideoStream, error) { - extra, ok := camera.FromContext(ctx) - test.That(t, ok, test.ShouldBeTrue) + injectCamera.ImageFunc = func(ctx context.Context, mimeType string, extra map[string]interface{}) ([]byte, camera.ImageMetadata, error) { test.That(t, len(extra), test.ShouldEqual, 1) test.That(t, extra[data.FromDMString], test.ShouldBeTrue) - return nil, errStreamFailed + return nil, camera.ImageMetadata{}, errGetImageFailed } // one kvp created with data.FromDMContextKey @@ -487,15 +451,13 @@ func TestServer(t *testing.T) { }) test.That(t, err, test.ShouldNotBeNil) - test.That(t, err.Error(), test.ShouldContainSubstring, errStreamFailed.Error()) + test.That(t, err.Error(), test.ShouldContainSubstring, errGetImageFailed.Error()) - injectCamera.StreamFunc = func(ctx context.Context, errHandlers ...gostream.ErrorHandler) (gostream.VideoStream, error) { - extra, ok := camera.FromContext(ctx) - test.That(t, ok, test.ShouldBeTrue) + injectCamera.ImageFunc = func(ctx context.Context, mimeType string, extra map[string]interface{}) ([]byte, camera.ImageMetadata, error) { test.That(t, len(extra), test.ShouldEqual, 2) test.That(t, extra["hello"], test.ShouldEqual, "world") test.That(t, extra[data.FromDMString], test.ShouldBeTrue) - return nil, errStreamFailed + return nil, camera.ImageMetadata{}, errGetImageFailed } // use values from data and camera @@ -513,20 +475,6 @@ func TestServer(t *testing.T) { }) test.That(t, err, test.ShouldNotBeNil) - test.That(t, err.Error(), test.ShouldContainSubstring, errStreamFailed.Error()) + test.That(t, err.Error(), test.ShouldContainSubstring, errGetImageFailed.Error()) }) } - -//nolint -func TestCameraClientIsReadImager(t *testing.T) { - // RSDK-8663: Enforce that a camera client always satisfies the optimized `ReadImager` - // interface. - cameraClient, err := camera.NewClientFromConn(nil, nil, "", resource.Name{}, nil) - test.That(t, err, test.ShouldBeNil) - - if _, isReadImager := cameraClient.(camera.ReadImager); isReadImager { - // Success - } else { - t.Fatalf("Camera client is expected to be a `ReadImager`. Client type: %T", cameraClient) - } -} diff --git a/components/camera/transformpipeline/classifier.go b/components/camera/transformpipeline/classifier.go index 5db1f7c43e8..1a13cec9e59 100644 --- a/components/camera/transformpipeline/classifier.go +++ b/components/camera/transformpipeline/classifier.go @@ -9,7 +9,6 @@ import ( "go.opencensus.io/trace" "go.viam.com/rdk/components/camera" - "go.viam.com/rdk/gostream" "go.viam.com/rdk/resource" "go.viam.com/rdk/rimage/transform" "go.viam.com/rdk/robot" @@ -28,7 +27,7 @@ type classifierConfig struct { // classifierSource takes an image from the camera, and overlays labels from the classifier. type classifierSource struct { - stream gostream.VideoStream + src camera.VideoSource classifierName string maxClassifications uint32 labelFilter classification.Postprocessor @@ -38,8 +37,8 @@ type classifierSource struct { func newClassificationsTransform( ctx context.Context, - source gostream.VideoSource, r robot.Robot, am utils.AttributeMap, -) (gostream.VideoSource, camera.ImageType, error) { + source camera.VideoSource, r robot.Robot, am utils.AttributeMap, +) (camera.VideoSource, camera.ImageType, error) { conf, err := resource.TransformAttributeMap[*classifierConfig](am) if err != nil { return nil, camera.UnspecifiedStream, err @@ -66,7 +65,7 @@ func newClassificationsTransform( maxClassifications = conf.MaxClassifications } classifier := &classifierSource{ - gostream.NewEmbeddedVideoStream(source), + source, conf.ClassifierName, maxClassifications, labelFilter, @@ -92,7 +91,7 @@ func (cs *classifierSource) Read(ctx context.Context) (image.Image, func(), erro return nil, nil, errors.Wrap(err, "source_classifier can't find vision service") } // get image from source camera - img, release, err := cs.stream.Next(ctx) + img, release, err := camera.ReadImage(ctx, cs.src) if err != nil { return nil, nil, errors.Wrap(err, "could not get next source image") } @@ -111,5 +110,5 @@ func (cs *classifierSource) Read(ctx context.Context) (image.Image, func(), erro } func (cs *classifierSource) Close(ctx context.Context) error { - return cs.stream.Close(ctx) + return cs.src.Close(ctx) } diff --git a/components/camera/transformpipeline/composed.go b/components/camera/transformpipeline/composed.go index 333d9682c3e..9b9811132f0 100644 --- a/components/camera/transformpipeline/composed.go +++ b/components/camera/transformpipeline/composed.go @@ -8,7 +8,6 @@ import ( "go.opencensus.io/trace" "go.viam.com/rdk/components/camera" - "go.viam.com/rdk/gostream" "go.viam.com/rdk/pointcloud" "go.viam.com/rdk/resource" "go.viam.com/rdk/rimage" @@ -19,13 +18,13 @@ import ( // depthToPretty takes a depth image and turns into a colorful image, with blue being // farther away, and red being closest. Actual depth information is lost in the transform. type depthToPretty struct { - originalStream gostream.VideoStream - cameraModel *transform.PinholeCameraModel + src camera.VideoSource + cameraModel *transform.PinholeCameraModel } -func propsFromVideoSource(ctx context.Context, source gostream.VideoSource) (camera.Properties, error) { +func propsFromVideoSource(ctx context.Context, source camera.VideoSource) (camera.Properties, error) { var camProps camera.Properties - //nolint:staticcheck + if cameraSrc, ok := source.(camera.Camera); ok { props, err := cameraSrc.Properties(ctx) if err != nil { @@ -38,9 +37,9 @@ func propsFromVideoSource(ctx context.Context, source gostream.VideoSource) (cam func newDepthToPrettyTransform( ctx context.Context, - source gostream.VideoSource, + source camera.VideoSource, stream camera.ImageType, -) (gostream.VideoSource, camera.ImageType, error) { +) (camera.VideoSource, camera.ImageType, error) { if stream != camera.DepthStream { return nil, camera.UnspecifiedStream, errors.Errorf("source has stream type %s, depth_to_pretty only supports depth stream inputs", stream) @@ -55,10 +54,9 @@ func newDepthToPrettyTransform( if props.DistortionParams != nil { cameraModel.Distortion = props.DistortionParams } - depthStream := gostream.NewEmbeddedVideoStream(source) reader := &depthToPretty{ - originalStream: depthStream, - cameraModel: &cameraModel, + src: source, + cameraModel: &cameraModel, } src, err := camera.NewVideoSourceFromReader(ctx, reader, &cameraModel, camera.ColorStream) if err != nil { @@ -70,7 +68,7 @@ func newDepthToPrettyTransform( func (dtp *depthToPretty) Read(ctx context.Context) (image.Image, func(), error) { ctx, span := trace.StartSpan(ctx, "camera::transformpipeline::depthToPretty::Read") defer span.End() - i, release, err := dtp.originalStream.Next(ctx) + i, release, err := camera.ReadImage(ctx, dtp.src) if err != nil { return nil, nil, err } @@ -82,7 +80,7 @@ func (dtp *depthToPretty) Read(ctx context.Context) (image.Image, func(), error) } func (dtp *depthToPretty) Close(ctx context.Context) error { - return dtp.originalStream.Close(ctx) + return dtp.src.Close(ctx) } func (dtp *depthToPretty) PointCloud(ctx context.Context) (pointcloud.PointCloud, error) { @@ -91,7 +89,7 @@ func (dtp *depthToPretty) PointCloud(ctx context.Context) (pointcloud.PointCloud if dtp.cameraModel == nil || dtp.cameraModel.PinholeCameraIntrinsics == nil { return nil, errors.Wrapf(transform.ErrNoIntrinsics, "depthToPretty transform cannot project to pointcloud") } - i, release, err := dtp.originalStream.Next(ctx) + i, release, err := camera.ReadImage(ctx, dtp.src) if err != nil { return nil, err } @@ -111,16 +109,16 @@ type overlayConfig struct { // overlaySource overlays the depth and color 2D images in order to debug the alignment of the two images. type overlaySource struct { - src gostream.VideoSource + src camera.VideoSource cameraModel *transform.PinholeCameraModel } func newOverlayTransform( ctx context.Context, - src gostream.VideoSource, + src camera.VideoSource, stream camera.ImageType, am utils.AttributeMap, -) (gostream.VideoSource, camera.ImageType, error) { +) (camera.VideoSource, camera.ImageType, error) { conf, err := resource.TransformAttributeMap[*overlayConfig](am) if err != nil { return nil, camera.UnspecifiedStream, err diff --git a/components/camera/transformpipeline/depth_edges.go b/components/camera/transformpipeline/depth_edges.go index e5c0f558126..841c2809cb3 100644 --- a/components/camera/transformpipeline/depth_edges.go +++ b/components/camera/transformpipeline/depth_edges.go @@ -8,7 +8,6 @@ import ( "go.opencensus.io/trace" "go.viam.com/rdk/components/camera" - "go.viam.com/rdk/gostream" "go.viam.com/rdk/resource" "go.viam.com/rdk/rimage" "go.viam.com/rdk/rimage/transform" @@ -23,13 +22,13 @@ type depthEdgesConfig struct { // depthEdgesSource applies a Canny Edge Detector to the depth map. type depthEdgesSource struct { - stream gostream.VideoStream + src camera.VideoSource detector *rimage.CannyEdgeDetector blurRadius float64 } -func newDepthEdgesTransform(ctx context.Context, source gostream.VideoSource, am utils.AttributeMap, -) (gostream.VideoSource, camera.ImageType, error) { +func newDepthEdgesTransform(ctx context.Context, source camera.VideoSource, am utils.AttributeMap, +) (camera.VideoSource, camera.ImageType, error) { conf, err := resource.TransformAttributeMap[*depthEdgesConfig](am) if err != nil { return nil, camera.UnspecifiedStream, err @@ -45,7 +44,7 @@ func newDepthEdgesTransform(ctx context.Context, source gostream.VideoSource, am cameraModel.Distortion = props.DistortionParams } canny := rimage.NewCannyDericheEdgeDetectorWithParameters(conf.HiThresh, conf.LoThresh, true) - videoSrc := &depthEdgesSource{gostream.NewEmbeddedVideoStream(source), canny, 3.0} + videoSrc := &depthEdgesSource{source, canny, 3.0} src, err := camera.NewVideoSourceFromReader(ctx, videoSrc, &cameraModel, camera.DepthStream) if err != nil { return nil, camera.UnspecifiedStream, err @@ -57,7 +56,7 @@ func newDepthEdgesTransform(ctx context.Context, source gostream.VideoSource, am func (os *depthEdgesSource) Read(ctx context.Context) (image.Image, func(), error) { ctx, span := trace.StartSpan(ctx, "camera::transformpipeline::depthEdges::Read") defer span.End() - i, closer, err := os.stream.Next(ctx) + i, closer, err := camera.ReadImage(ctx, os.src) if err != nil { return nil, nil, err } @@ -73,5 +72,5 @@ func (os *depthEdgesSource) Read(ctx context.Context) (image.Image, func(), erro } func (os *depthEdgesSource) Close(ctx context.Context) error { - return os.stream.Close(ctx) + return os.src.Close(ctx) } diff --git a/components/camera/transformpipeline/depth_edges_test.go b/components/camera/transformpipeline/depth_edges_test.go index fdb193a4c7d..e6ec7feebeb 100644 --- a/components/camera/transformpipeline/depth_edges_test.go +++ b/components/camera/transformpipeline/depth_edges_test.go @@ -7,13 +7,11 @@ import ( "image" "testing" - "github.com/pion/mediadevices/pkg/prop" "go.viam.com/test" "go.viam.com/utils/artifact" "go.viam.com/rdk/components/camera" "go.viam.com/rdk/components/camera/fake" - "go.viam.com/rdk/gostream" "go.viam.com/rdk/logging" "go.viam.com/rdk/rimage" "go.viam.com/rdk/rimage/depthadapter" @@ -25,13 +23,14 @@ func TestDepthSource(t *testing.T) { img, err := rimage.NewDepthMapFromFile( context.Background(), artifact.MustPath("rimage/board1_gray_small.png")) test.That(t, err, test.ShouldBeNil) - source := &fake.StaticSource{DepthImg: img} + source, err := camera.NewVideoSourceFromReader(context.Background(), &fake.StaticSource{DepthImg: img}, nil, camera.UnspecifiedStream) + test.That(t, err, test.ShouldBeNil) am := utils.AttributeMap{ "high_threshold": 0.85, "low_threshold": 0.40, "blur_radius": 3.0, } - ds, stream, err := newDepthEdgesTransform(context.Background(), gostream.NewVideoSource(source, prop.Video{}), am) + ds, stream, err := newDepthEdgesTransform(context.Background(), source, am) test.That(t, err, test.ShouldBeNil) test.That(t, stream, test.ShouldEqual, camera.DepthStream) _, _, err = camera.ReadImage(context.Background(), ds) @@ -57,13 +56,14 @@ func (h *depthSourceTestHelper) Process( pCtx.GotDebugImage(dm.ToPrettyPicture(0, rimage.MaxDepth), "aligned-depth") // create edge map - source := &fake.StaticSource{DepthImg: dm} + source, err := camera.NewVideoSourceFromReader(context.Background(), &fake.StaticSource{DepthImg: dm}, nil, camera.UnspecifiedStream) + test.That(t, err, test.ShouldBeNil) am := utils.AttributeMap{ "high_threshold": 0.85, "low_threshold": 0.40, "blur_radius": 3.0, } - ds, stream, err := newDepthEdgesTransform(context.Background(), gostream.NewVideoSource(source, prop.Video{}), am) + ds, stream, err := newDepthEdgesTransform(context.Background(), source, am) test.That(t, err, test.ShouldBeNil) test.That(t, stream, test.ShouldEqual, camera.DepthStream) edges, _, err := camera.ReadImage(context.Background(), ds) @@ -78,8 +78,9 @@ func (h *depthSourceTestHelper) Process( pCtx.GotDebugPointCloud(fixedPointCloud, "aligned-pointcloud") // preprocess depth map - source = &fake.StaticSource{DepthImg: dm} - rs, stream, err := newDepthPreprocessTransform(context.Background(), gostream.NewVideoSource(source, prop.Video{})) + source, err = camera.NewVideoSourceFromReader(context.Background(), &fake.StaticSource{DepthImg: dm}, nil, camera.UnspecifiedStream) + test.That(t, err, test.ShouldBeNil) + rs, stream, err := newDepthPreprocessTransform(context.Background(), source) test.That(t, err, test.ShouldBeNil) test.That(t, stream, test.ShouldEqual, camera.DepthStream) @@ -94,8 +95,14 @@ func (h *depthSourceTestHelper) Process( test.That(t, preprocessedPointCloud.MetaData().HasColor, test.ShouldBeFalse) pCtx.GotDebugPointCloud(preprocessedPointCloud, "preprocessed-aligned-pointcloud") - source = &fake.StaticSource{DepthImg: preprocessed} - ds, stream, err = newDepthEdgesTransform(context.Background(), gostream.NewVideoSource(source, prop.Video{}), am) + source, err = camera.NewVideoSourceFromReader( + context.Background(), + &fake.StaticSource{DepthImg: preprocessed}, + nil, + camera.UnspecifiedStream, + ) + test.That(t, err, test.ShouldBeNil) + ds, stream, err = newDepthEdgesTransform(context.Background(), source, am) test.That(t, err, test.ShouldBeNil) test.That(t, stream, test.ShouldEqual, camera.DepthStream) processedEdges, _, err := camera.ReadImage(context.Background(), ds) diff --git a/components/camera/transformpipeline/detector.go b/components/camera/transformpipeline/detector.go index 241103353d0..0728da400ea 100644 --- a/components/camera/transformpipeline/detector.go +++ b/components/camera/transformpipeline/detector.go @@ -9,7 +9,6 @@ import ( "go.opencensus.io/trace" "go.viam.com/rdk/components/camera" - "go.viam.com/rdk/gostream" "go.viam.com/rdk/resource" "go.viam.com/rdk/rimage/transform" "go.viam.com/rdk/robot" @@ -27,7 +26,7 @@ type detectorConfig struct { // detectorSource takes an image from the camera, and overlays the detections from the detector. type detectorSource struct { - stream gostream.VideoStream + src camera.VideoSource detectorName string labelFilter objectdetection.Postprocessor // must build from ValidLabels confFilter objectdetection.Postprocessor @@ -36,10 +35,10 @@ type detectorSource struct { func newDetectionsTransform( ctx context.Context, - source gostream.VideoSource, + source camera.VideoSource, r robot.Robot, am utils.AttributeMap, -) (gostream.VideoSource, camera.ImageType, error) { +) (camera.VideoSource, camera.ImageType, error) { conf, err := resource.TransformAttributeMap[*detectorConfig](am) if err != nil { return nil, camera.UnspecifiedStream, err @@ -62,7 +61,7 @@ func newDetectionsTransform( } labelFilter := objectdetection.NewLabelFilter(validLabels) detector := &detectorSource{ - gostream.NewEmbeddedVideoStream(source), + source, conf.DetectorName, labelFilter, confFilter, @@ -85,7 +84,7 @@ func (ds *detectorSource) Read(ctx context.Context) (image.Image, func(), error) return nil, nil, fmt.Errorf("source_detector cant find vision service: %w", err) } // get image from source camera - img, release, err := ds.stream.Next(ctx) + img, release, err := camera.ReadImage(ctx, ds.src) if err != nil { return nil, nil, fmt.Errorf("could not get next source image: %w", err) } @@ -105,5 +104,5 @@ func (ds *detectorSource) Read(ctx context.Context) (image.Image, func(), error) } func (ds *detectorSource) Close(ctx context.Context) error { - return ds.stream.Close(ctx) + return ds.src.Close(ctx) } diff --git a/components/camera/transformpipeline/mods.go b/components/camera/transformpipeline/mods.go index 776d51856fa..0331f300399 100644 --- a/components/camera/transformpipeline/mods.go +++ b/components/camera/transformpipeline/mods.go @@ -11,7 +11,6 @@ import ( "golang.org/x/image/draw" "go.viam.com/rdk/components/camera" - "go.viam.com/rdk/gostream" "go.viam.com/rdk/resource" "go.viam.com/rdk/rimage" "go.viam.com/rdk/rimage/transform" @@ -25,14 +24,14 @@ type rotateConfig struct { // rotateSource is the source to be rotated and the kind of image type. type rotateSource struct { - originalStream gostream.VideoStream - stream camera.ImageType - angle float64 + src camera.VideoSource + stream camera.ImageType + angle float64 } // newRotateTransform creates a new rotation transform. -func newRotateTransform(ctx context.Context, source gostream.VideoSource, stream camera.ImageType, am utils.AttributeMap, -) (gostream.VideoSource, camera.ImageType, error) { +func newRotateTransform(ctx context.Context, source camera.VideoSource, stream camera.ImageType, am utils.AttributeMap, +) (camera.VideoSource, camera.ImageType, error) { conf, err := resource.TransformAttributeMap[*rotateConfig](am) if err != nil { return nil, camera.UnspecifiedStream, errors.Wrap(err, "cannot parse rotate attribute map") @@ -52,7 +51,7 @@ func newRotateTransform(ctx context.Context, source gostream.VideoSource, stream if props.DistortionParams != nil { cameraModel.Distortion = props.DistortionParams } - reader := &rotateSource{gostream.NewEmbeddedVideoStream(source), stream, conf.Angle} + reader := &rotateSource{source, stream, conf.Angle} src, err := camera.NewVideoSourceFromReader(ctx, reader, &cameraModel, stream) if err != nil { return nil, camera.UnspecifiedStream, err @@ -64,7 +63,7 @@ func newRotateTransform(ctx context.Context, source gostream.VideoSource, stream func (rs *rotateSource) Read(ctx context.Context) (image.Image, func(), error) { ctx, span := trace.StartSpan(ctx, "camera::transformpipeline::rotate::Read") defer span.End() - orig, release, err := rs.originalStream.Next(ctx) + orig, release, err := camera.ReadImage(ctx, rs.src) if err != nil { return nil, nil, err } @@ -86,7 +85,7 @@ func (rs *rotateSource) Read(ctx context.Context) (image.Image, func(), error) { // Close closes the original stream. func (rs *rotateSource) Close(ctx context.Context) error { - return rs.originalStream.Close(ctx) + return rs.src.Close(ctx) } // resizeConfig are the attributes for a resize transform. @@ -96,16 +95,16 @@ type resizeConfig struct { } type resizeSource struct { - originalStream gostream.VideoStream - stream camera.ImageType - height int - width int + src camera.VideoSource + stream camera.ImageType + height int + width int } // newResizeTransform creates a new resize transform. func newResizeTransform( - ctx context.Context, source gostream.VideoSource, stream camera.ImageType, am utils.AttributeMap, -) (gostream.VideoSource, camera.ImageType, error) { + ctx context.Context, source camera.VideoSource, stream camera.ImageType, am utils.AttributeMap, +) (camera.VideoSource, camera.ImageType, error) { conf, err := resource.TransformAttributeMap[*resizeConfig](am) if err != nil { return nil, camera.UnspecifiedStream, err @@ -117,7 +116,7 @@ func newResizeTransform( return nil, camera.UnspecifiedStream, errors.New("new height for resize transform cannot be 0") } - reader := &resizeSource{gostream.NewEmbeddedVideoStream(source), stream, conf.Height, conf.Width} + reader := &resizeSource{source, stream, conf.Height, conf.Width} src, err := camera.NewVideoSourceFromReader(ctx, reader, nil, stream) if err != nil { return nil, camera.UnspecifiedStream, err @@ -129,7 +128,7 @@ func newResizeTransform( func (rs *resizeSource) Read(ctx context.Context) (image.Image, func(), error) { ctx, span := trace.StartSpan(ctx, "camera::transformpipeline::resize::Read") defer span.End() - orig, release, err := rs.originalStream.Next(ctx) + orig, release, err := camera.ReadImage(ctx, rs.src) if err != nil { return nil, nil, err } @@ -153,7 +152,7 @@ func (rs *resizeSource) Read(ctx context.Context) (image.Image, func(), error) { // Close closes the original stream. func (rs *resizeSource) Close(ctx context.Context) error { - return rs.originalStream.Close(ctx) + return rs.src.Close(ctx) } // cropConfig are the attributes for a crop transform. @@ -165,15 +164,15 @@ type cropConfig struct { } type cropSource struct { - originalStream gostream.VideoStream - imgType camera.ImageType - cropWindow image.Rectangle + src camera.VideoSource + imgType camera.ImageType + cropWindow image.Rectangle } // newCropTransform creates a new crop transform. func newCropTransform( - ctx context.Context, source gostream.VideoSource, stream camera.ImageType, am utils.AttributeMap, -) (gostream.VideoSource, camera.ImageType, error) { + ctx context.Context, source camera.VideoSource, stream camera.ImageType, am utils.AttributeMap, +) (camera.VideoSource, camera.ImageType, error) { conf, err := resource.TransformAttributeMap[*cropConfig](am) if err != nil { return nil, camera.UnspecifiedStream, err @@ -189,7 +188,7 @@ func newCropTransform( } cropRect := image.Rect(conf.XMin, conf.YMin, conf.XMax, conf.YMax) - reader := &cropSource{gostream.NewEmbeddedVideoStream(source), stream, cropRect} + reader := &cropSource{source, stream, cropRect} src, err := camera.NewVideoSourceFromReader(ctx, reader, nil, stream) if err != nil { return nil, camera.UnspecifiedStream, err @@ -201,7 +200,7 @@ func newCropTransform( func (cs *cropSource) Read(ctx context.Context) (image.Image, func(), error) { ctx, span := trace.StartSpan(ctx, "camera::transformpipeline::crop::Read") defer span.End() - orig, release, err := cs.originalStream.Next(ctx) + orig, release, err := camera.ReadImage(ctx, cs.src) if err != nil { return nil, nil, err } @@ -229,5 +228,5 @@ func (cs *cropSource) Read(ctx context.Context) (image.Image, func(), error) { // Close closes the original stream. func (cs *cropSource) Close(ctx context.Context) error { - return cs.originalStream.Close(ctx) + return cs.src.Close(ctx) } diff --git a/components/camera/transformpipeline/mods_test.go b/components/camera/transformpipeline/mods_test.go index 721319a9bdf..e6bb1a5f1dc 100644 --- a/components/camera/transformpipeline/mods_test.go +++ b/components/camera/transformpipeline/mods_test.go @@ -30,7 +30,8 @@ func TestCrop(t *testing.T) { test.That(t, err, test.ShouldBeNil) // test depth source - source := gostream.NewVideoSource(&fake.StaticSource{DepthImg: dm}, prop.Video{}) + source, err := camera.NewVideoSourceFromReader(context.Background(), &fake.StaticSource{DepthImg: dm}, nil, camera.UnspecifiedStream) + test.That(t, err, test.ShouldBeNil) out, _, err := camera.ReadImage(context.Background(), source) test.That(t, err, test.ShouldBeNil) test.That(t, out.Bounds().Dx(), test.ShouldEqual, 128) @@ -48,7 +49,8 @@ func TestCrop(t *testing.T) { test.That(t, source.Close(context.Background()), test.ShouldBeNil) // test color source - source = gostream.NewVideoSource(&fake.StaticSource{ColorImg: img}, prop.Video{}) + source, err = camera.NewVideoSourceFromReader(context.Background(), &fake.StaticSource{ColorImg: img}, nil, camera.UnspecifiedStream) + test.That(t, err, test.ShouldBeNil) out, _, err = camera.ReadImage(context.Background(), source) test.That(t, err, test.ShouldBeNil) test.That(t, out.Bounds().Dx(), test.ShouldEqual, 128) @@ -75,7 +77,7 @@ func TestCrop(t *testing.T) { test.That(t, out.Bounds().Dy(), test.ShouldEqual, 1) test.That(t, rs.Close(context.Background()), test.ShouldBeNil) - // error - crop limits are outside of original image + // error - crop limits are outside of original image am = utils.AttributeMap{"x_min_px": 1000, "x_max_px": 2000, "y_min_px": 300, "y_max_px": 400} rs, stream, err = newCropTransform(context.Background(), source, camera.ColorStream, am) test.That(t, err, test.ShouldBeNil) @@ -107,7 +109,8 @@ func TestResizeColor(t *testing.T) { "height_px": 20, "width_px": 30, } - source := gostream.NewVideoSource(&fake.StaticSource{ColorImg: img}, prop.Video{}) + source, err := camera.NewVideoSourceFromReader(context.Background(), &fake.StaticSource{ColorImg: img}, nil, camera.UnspecifiedStream) + test.That(t, err, test.ShouldBeNil) out, _, err := camera.ReadImage(context.Background(), source) test.That(t, err, test.ShouldBeNil) test.That(t, out.Bounds().Dx(), test.ShouldEqual, 128) @@ -133,7 +136,8 @@ func TestResizeDepth(t *testing.T) { "height_px": 40, "width_px": 60, } - source := gostream.NewVideoSource(&fake.StaticSource{DepthImg: img}, prop.Video{}) + source, err := camera.NewVideoSourceFromReader(context.Background(), &fake.StaticSource{DepthImg: img}, nil, camera.UnspecifiedStream) + test.That(t, err, test.ShouldBeNil) out, _, err := camera.ReadImage(context.Background(), source) test.That(t, err, test.ShouldBeNil) test.That(t, out.Bounds().Dx(), test.ShouldEqual, 128) @@ -154,7 +158,8 @@ func TestRotateColorSource(t *testing.T) { img, err := rimage.NewImageFromFile(artifact.MustPath("rimage/board1_small.png")) test.That(t, err, test.ShouldBeNil) - source := gostream.NewVideoSource(&fake.StaticSource{ColorImg: img}, prop.Video{}) + source, err := camera.NewVideoSourceFromReader(context.Background(), &fake.StaticSource{ColorImg: img}, nil, camera.UnspecifiedStream) + test.That(t, err, test.ShouldBeNil) am := utils.AttributeMap{ "angle_degs": 180, } @@ -207,7 +212,8 @@ func TestRotateColorSource(t *testing.T) { test.That(t, rs.Close(context.Background()), test.ShouldBeNil) test.That(t, source.Close(context.Background()), test.ShouldBeNil) - source = gostream.NewVideoSource(&fake.StaticSource{ColorImg: img}, prop.Video{}) + source, err = camera.NewVideoSourceFromReader(context.Background(), &fake.StaticSource{ColorImg: img}, nil, camera.UnspecifiedStream) + test.That(t, err, test.ShouldBeNil) am = utils.AttributeMap{ "angle_degs": 90, } @@ -238,7 +244,8 @@ func TestRotateColorSource(t *testing.T) { test.That(t, rs.Close(context.Background()), test.ShouldBeNil) test.That(t, source.Close(context.Background()), test.ShouldBeNil) - source = gostream.NewVideoSource(&fake.StaticSource{ColorImg: img}, prop.Video{}) + source, err = camera.NewVideoSourceFromReader(context.Background(), &fake.StaticSource{ColorImg: img}, nil, camera.UnspecifiedStream) + test.That(t, err, test.ShouldBeNil) am = utils.AttributeMap{ "angle_degs": -90, } @@ -269,7 +276,8 @@ func TestRotateColorSource(t *testing.T) { test.That(t, rs.Close(context.Background()), test.ShouldBeNil) test.That(t, source.Close(context.Background()), test.ShouldBeNil) - source = gostream.NewVideoSource(&fake.StaticSource{ColorImg: img}, prop.Video{}) + source, err = camera.NewVideoSourceFromReader(context.Background(), &fake.StaticSource{ColorImg: img}, nil, camera.UnspecifiedStream) + test.That(t, err, test.ShouldBeNil) am = utils.AttributeMap{ "angle_degs": 270, } @@ -300,7 +308,8 @@ func TestRotateColorSource(t *testing.T) { test.That(t, rs.Close(context.Background()), test.ShouldBeNil) test.That(t, source.Close(context.Background()), test.ShouldBeNil) - source = gostream.NewVideoSource(&fake.StaticSource{ColorImg: img}, prop.Video{}) + source, err = camera.NewVideoSourceFromReader(context.Background(), &fake.StaticSource{ColorImg: img}, nil, camera.UnspecifiedStream) + test.That(t, err, test.ShouldBeNil) am = utils.AttributeMap{ "angle_degs": 0, // no-op } @@ -336,7 +345,8 @@ func TestRotateDepthSource(t *testing.T) { context.Background(), artifact.MustPath("rimage/board1_gray_small.png")) test.That(t, err, test.ShouldBeNil) - source := gostream.NewVideoSource(&fake.StaticSource{DepthImg: pc}, prop.Video{}) + source, err := camera.NewVideoSourceFromReader(context.Background(), &fake.StaticSource{DepthImg: pc}, nil, camera.UnspecifiedStream) + test.That(t, err, test.ShouldBeNil) am := utils.AttributeMap{ "angle_degs": 180, } @@ -388,7 +398,8 @@ func TestRotateDepthSource(t *testing.T) { test.That(t, rs.Close(context.Background()), test.ShouldBeNil) test.That(t, source.Close(context.Background()), test.ShouldBeNil) - source = gostream.NewVideoSource(&fake.StaticSource{DepthImg: pc}, prop.Video{}) + source, err = camera.NewVideoSourceFromReader(context.Background(), &fake.StaticSource{DepthImg: pc}, nil, camera.UnspecifiedStream) + test.That(t, err, test.ShouldBeNil) am = utils.AttributeMap{ "angle_degs": 90, } @@ -419,7 +430,8 @@ func TestRotateDepthSource(t *testing.T) { test.That(t, rs.Close(context.Background()), test.ShouldBeNil) test.That(t, source.Close(context.Background()), test.ShouldBeNil) - source = gostream.NewVideoSource(&fake.StaticSource{DepthImg: pc}, prop.Video{}) + source, err = camera.NewVideoSourceFromReader(context.Background(), &fake.StaticSource{DepthImg: pc}, nil, camera.UnspecifiedStream) + test.That(t, err, test.ShouldBeNil) am = utils.AttributeMap{ "angle_degs": -90, } @@ -450,7 +462,8 @@ func TestRotateDepthSource(t *testing.T) { test.That(t, rs.Close(context.Background()), test.ShouldBeNil) test.That(t, source.Close(context.Background()), test.ShouldBeNil) - source = gostream.NewVideoSource(&fake.StaticSource{DepthImg: pc}, prop.Video{}) + source, err = camera.NewVideoSourceFromReader(context.Background(), &fake.StaticSource{DepthImg: pc}, nil, camera.UnspecifiedStream) + test.That(t, err, test.ShouldBeNil) am = utils.AttributeMap{ "angle_degs": 270, } @@ -481,7 +494,8 @@ func TestRotateDepthSource(t *testing.T) { test.That(t, rs.Close(context.Background()), test.ShouldBeNil) test.That(t, source.Close(context.Background()), test.ShouldBeNil) - source = gostream.NewVideoSource(&fake.StaticSource{DepthImg: pc}, prop.Video{}) + source, err = camera.NewVideoSourceFromReader(context.Background(), &fake.StaticSource{DepthImg: pc}, nil, camera.UnspecifiedStream) + test.That(t, err, test.ShouldBeNil) am = utils.AttributeMap{ "angle_degs": 0, // no-op } @@ -529,7 +543,8 @@ func BenchmarkColorRotate(b *testing.B) { b.ResetTimer() for n := 0; n < b.N; n++ { - camera.ReadImage(context.Background(), rs) + _, _, err = camera.ReadImage(context.Background(), rs) + test.That(b, err, test.ShouldBeNil) } test.That(b, rs.Close(context.Background()), test.ShouldBeNil) test.That(b, source.Close(context.Background()), test.ShouldBeNil) @@ -553,7 +568,8 @@ func BenchmarkDepthRotate(b *testing.B) { b.ResetTimer() for n := 0; n < b.N; n++ { - camera.ReadImage(context.Background(), rs) + _, _, err = camera.ReadImage(context.Background(), rs) + test.That(b, err, test.ShouldBeNil) } test.That(b, rs.Close(context.Background()), test.ShouldBeNil) test.That(b, source.Close(context.Background()), test.ShouldBeNil) diff --git a/components/camera/transformpipeline/pipeline.go b/components/camera/transformpipeline/pipeline.go index 254913d941b..2932c61682d 100644 --- a/components/camera/transformpipeline/pipeline.go +++ b/components/camera/transformpipeline/pipeline.go @@ -13,7 +13,6 @@ import ( "go.uber.org/multierr" "go.viam.com/rdk/components/camera" - "go.viam.com/rdk/gostream" "go.viam.com/rdk/logging" "go.viam.com/rdk/pointcloud" "go.viam.com/rdk/resource" @@ -89,7 +88,7 @@ func (cfg *transformConfig) Validate(path string) ([]string, error) { func newTransformPipeline( ctx context.Context, - source gostream.VideoSource, + source camera.VideoSource, cfg *transformConfig, r robot.Robot, logger logging.Logger, @@ -117,7 +116,7 @@ func newTransformPipeline( release() } // loop through the pipeline and create the image flow - pipeline := make([]gostream.VideoSource, 0, len(cfg.Pipeline)) + pipeline := make([]camera.VideoSource, 0, len(cfg.Pipeline)) lastSource := source for _, tr := range cfg.Pipeline { src, newStreamType, err := buildTransform(ctx, r, lastSource, streamType, tr, cfg.Source) @@ -128,19 +127,18 @@ func newTransformPipeline( lastSource = src streamType = newStreamType } - lastSourceStream := gostream.NewEmbeddedVideoStream(lastSource) cameraModel := camera.NewPinholeModelWithBrownConradyDistortion(cfg.CameraParameters, cfg.DistortionParameters) return camera.NewVideoSourceFromReader( ctx, - transformPipeline{pipeline, lastSourceStream, cfg.CameraParameters, logger}, + transformPipeline{pipeline, lastSource, cfg.CameraParameters, logger}, &cameraModel, streamType, ) } type transformPipeline struct { - pipeline []gostream.VideoSource - stream gostream.VideoStream + pipeline []camera.VideoSource + src camera.VideoSource intrinsicParameters *transform.PinholeCameraIntrinsics logger logging.Logger } @@ -148,7 +146,7 @@ type transformPipeline struct { func (tp transformPipeline) Read(ctx context.Context) (image.Image, func(), error) { ctx, span := trace.StartSpan(ctx, "camera::transformpipeline::Read") defer span.End() - return tp.stream.Next(ctx) + return camera.ReadImage(ctx, tp.src) } func (tp transformPipeline) NextPointCloud(ctx context.Context) (pointcloud.PointCloud, error) { @@ -176,5 +174,5 @@ func (tp transformPipeline) Close(ctx context.Context) error { return src.Close(ctx) }()) } - return multierr.Combine(tp.stream.Close(ctx), errs) + return multierr.Combine(tp.src.Close(ctx), errs) } diff --git a/components/camera/transformpipeline/pipeline_test.go b/components/camera/transformpipeline/pipeline_test.go index b96ea0a9596..eb31cd06e52 100644 --- a/components/camera/transformpipeline/pipeline_test.go +++ b/components/camera/transformpipeline/pipeline_test.go @@ -130,7 +130,9 @@ func TestTransformPipelineDepth2(t *testing.T) { dm, err := rimage.NewDepthMapFromFile( context.Background(), artifact.MustPath("rimage/board1_gray_small.png")) test.That(t, err, test.ShouldBeNil) - source := gostream.NewVideoSource(&fake.StaticSource{DepthImg: dm}, prop.Video{}) + source, err := camera.NewVideoSourceFromReader(context.Background(), &fake.StaticSource{DepthImg: dm}, nil, camera.UnspecifiedStream) + test.That(t, err, test.ShouldBeNil) + // first depth transform depth1, err := newTransformPipeline(context.Background(), source, transform1, r, logger) test.That(t, err, test.ShouldBeNil) @@ -160,7 +162,8 @@ func TestNullPipeline(t *testing.T) { img, err := rimage.NewImageFromFile(artifact.MustPath("rimage/board1_small.png")) test.That(t, err, test.ShouldBeNil) - source := gostream.NewVideoSource(&fake.StaticSource{ColorImg: img}, prop.Video{}) + source, err := camera.NewVideoSourceFromReader(context.Background(), &fake.StaticSource{ColorImg: img}, nil, camera.UnspecifiedStream) + test.That(t, err, test.ShouldBeNil) _, err = newTransformPipeline(context.Background(), source, transform1, r, logger) test.That(t, err, test.ShouldNotBeNil) test.That(t, err.Error(), test.ShouldContainSubstring, "pipeline has no transforms") @@ -185,7 +188,8 @@ func TestPipeIntoPipe(t *testing.T) { img, err := rimage.NewImageFromFile(artifact.MustPath("rimage/board1_small.png")) test.That(t, err, test.ShouldBeNil) - source := gostream.NewVideoSource(&fake.StaticSource{ColorImg: img}, prop.Video{}) + source, err := camera.NewVideoSourceFromReader(context.Background(), &fake.StaticSource{ColorImg: img}, nil, camera.UnspecifiedStream) + test.That(t, err, test.ShouldBeNil) intrinsics1 := &transform.PinholeCameraIntrinsics{Width: 128, Height: 72} transform1 := &transformConfig{ diff --git a/components/camera/transformpipeline/preprocessing.go b/components/camera/transformpipeline/preprocessing.go index 79ff11d5e00..73624c1bcbe 100644 --- a/components/camera/transformpipeline/preprocessing.go +++ b/components/camera/transformpipeline/preprocessing.go @@ -8,19 +8,18 @@ import ( "go.opencensus.io/trace" "go.viam.com/rdk/components/camera" - "go.viam.com/rdk/gostream" "go.viam.com/rdk/rimage" "go.viam.com/rdk/rimage/transform" ) // preprocessDepthTransform applies pre-processing functions to depth maps in order to smooth edges and fill holes. type preprocessDepthTransform struct { - stream gostream.VideoStream + src camera.VideoSource } -func newDepthPreprocessTransform(ctx context.Context, source gostream.VideoSource, -) (gostream.VideoSource, camera.ImageType, error) { - reader := &preprocessDepthTransform{gostream.NewEmbeddedVideoStream(source)} +func newDepthPreprocessTransform(ctx context.Context, source camera.VideoSource, +) (camera.VideoSource, camera.ImageType, error) { + reader := &preprocessDepthTransform{source} props, err := propsFromVideoSource(ctx, source) if err != nil { @@ -43,7 +42,7 @@ func newDepthPreprocessTransform(ctx context.Context, source gostream.VideoSourc func (os *preprocessDepthTransform) Read(ctx context.Context) (image.Image, func(), error) { ctx, span := trace.StartSpan(ctx, "camera::transformpipeline::depthPreprocess::Read") defer span.End() - i, release, err := os.stream.Next(ctx) + i, release, err := camera.ReadImage(ctx, os.src) if err != nil { return nil, nil, err } @@ -59,5 +58,5 @@ func (os *preprocessDepthTransform) Read(ctx context.Context) (image.Image, func } func (os *preprocessDepthTransform) Close(ctx context.Context) error { - return os.stream.Close(ctx) + return os.src.Close(ctx) } diff --git a/components/camera/transformpipeline/segmenter.go b/components/camera/transformpipeline/segmenter.go index 3e0f306e2e5..0ba705ce544 100644 --- a/components/camera/transformpipeline/segmenter.go +++ b/components/camera/transformpipeline/segmenter.go @@ -8,7 +8,6 @@ import ( "go.opencensus.io/trace" "go.viam.com/rdk/components/camera" - "go.viam.com/rdk/gostream" "go.viam.com/rdk/pointcloud" "go.viam.com/rdk/resource" "go.viam.com/rdk/robot" @@ -24,7 +23,7 @@ type segmenterConfig struct { // segmenterSource takes a pointcloud from the camera and applies a segmenter to it. type segmenterSource struct { - stream gostream.VideoStream + src camera.VideoSource cameraName string segmenterName string r robot.Robot @@ -32,11 +31,11 @@ type segmenterSource struct { func newSegmentationsTransform( ctx context.Context, - source gostream.VideoSource, + source camera.VideoSource, r robot.Robot, am utils.AttributeMap, sourceString string, -) (gostream.VideoSource, camera.ImageType, error) { +) (camera.VideoSource, camera.ImageType, error) { conf, err := resource.TransformAttributeMap[*segmenterConfig](am) if err != nil { return nil, camera.UnspecifiedStream, err @@ -48,7 +47,7 @@ func newSegmentationsTransform( } segmenter := &segmenterSource{ - gostream.NewEmbeddedVideoStream(source), + source, sourceString, conf.SegmenterName, r, @@ -108,7 +107,7 @@ func (ss *segmenterSource) NextPointCloud(ctx context.Context) (pointcloud.Point // Read returns the image if the stream is valid, else error. func (ss *segmenterSource) Read(ctx context.Context) (image.Image, func(), error) { - img, release, err := ss.stream.Next(ctx) + img, release, err := camera.ReadImage(ctx, ss.src) if err != nil { return nil, nil, fmt.Errorf("could not get next source image: %w", err) } @@ -117,5 +116,5 @@ func (ss *segmenterSource) Read(ctx context.Context) (image.Image, func(), error // Close closes the underlying stream. func (ss *segmenterSource) Close(ctx context.Context) error { - return ss.stream.Close(ctx) + return ss.src.Close(ctx) } diff --git a/components/camera/transformpipeline/transform.go b/components/camera/transformpipeline/transform.go index c8d401997e4..2b23743bb5a 100644 --- a/components/camera/transformpipeline/transform.go +++ b/components/camera/transformpipeline/transform.go @@ -7,7 +7,6 @@ import ( "github.com/pkg/errors" "go.viam.com/rdk/components/camera" - "go.viam.com/rdk/gostream" "go.viam.com/rdk/robot" "go.viam.com/rdk/utils" ) @@ -137,11 +136,11 @@ func (Transformation) JSONSchema() *jsonschema.Schema { func buildTransform( ctx context.Context, r robot.Robot, - source gostream.VideoSource, + source camera.VideoSource, stream camera.ImageType, tr Transformation, sourceString string, -) (gostream.VideoSource, camera.ImageType, error) { +) (camera.VideoSource, camera.ImageType, error) { switch transformType(tr.Type) { case transformTypeUnspecified, transformTypeIdentity: return source, stream, nil diff --git a/components/camera/transformpipeline/undistort.go b/components/camera/transformpipeline/undistort.go index 7254fe2f64e..a24dc18cceb 100644 --- a/components/camera/transformpipeline/undistort.go +++ b/components/camera/transformpipeline/undistort.go @@ -8,7 +8,6 @@ import ( "go.opencensus.io/trace" "go.viam.com/rdk/components/camera" - "go.viam.com/rdk/gostream" "go.viam.com/rdk/resource" "go.viam.com/rdk/rimage" "go.viam.com/rdk/rimage/transform" @@ -23,14 +22,14 @@ type undistortConfig struct { // undistortSource will undistort the original image according to the Distortion parameters // within the intrinsic parameters. type undistortSource struct { - originalStream gostream.VideoStream + originalSource camera.VideoSource stream camera.ImageType cameraParams *transform.PinholeCameraModel } func newUndistortTransform( - ctx context.Context, source gostream.VideoSource, stream camera.ImageType, am utils.AttributeMap, -) (gostream.VideoSource, camera.ImageType, error) { + ctx context.Context, source camera.VideoSource, stream camera.ImageType, am utils.AttributeMap, +) (camera.VideoSource, camera.ImageType, error) { conf, err := resource.TransformAttributeMap[*undistortConfig](am) if err != nil { return nil, camera.UnspecifiedStream, err @@ -40,7 +39,7 @@ func newUndistortTransform( } cameraModel := camera.NewPinholeModelWithBrownConradyDistortion(conf.CameraParams, conf.DistortionParams) reader := &undistortSource{ - gostream.NewEmbeddedVideoStream(source), + source, stream, &cameraModel, } @@ -55,7 +54,7 @@ func newUndistortTransform( func (us *undistortSource) Read(ctx context.Context) (image.Image, func(), error) { ctx, span := trace.StartSpan(ctx, "camera::transformpipeline::undistort::Read") defer span.End() - orig, release, err := us.originalStream.Next(ctx) + orig, release, err := camera.ReadImage(ctx, us.originalSource) if err != nil { return nil, nil, err } @@ -84,5 +83,5 @@ func (us *undistortSource) Read(ctx context.Context) (image.Image, func(), error // Close closes the original stream. func (us *undistortSource) Close(ctx context.Context) error { - return us.originalStream.Close(ctx) + return us.originalSource.Close(ctx) } diff --git a/components/camera/transformpipeline/undistort_test.go b/components/camera/transformpipeline/undistort_test.go index 41f6184bd84..6227c3c4e78 100644 --- a/components/camera/transformpipeline/undistort_test.go +++ b/components/camera/transformpipeline/undistort_test.go @@ -5,13 +5,11 @@ import ( "errors" "testing" - "github.com/pion/mediadevices/pkg/prop" "go.viam.com/test" "go.viam.com/utils/artifact" "go.viam.com/rdk/components/camera" "go.viam.com/rdk/components/camera/fake" - "go.viam.com/rdk/gostream" "go.viam.com/rdk/rimage" "go.viam.com/rdk/rimage/transform" "go.viam.com/rdk/utils" @@ -38,7 +36,8 @@ var undistortTestBC = &transform.BrownConrady{ func TestUndistortSetup(t *testing.T) { img, err := rimage.NewImageFromFile(artifact.MustPath("rimage/board1_small.png")) test.That(t, err, test.ShouldBeNil) - source := gostream.NewVideoSource(&fake.StaticSource{ColorImg: img}, prop.Video{}) + source, err := camera.NewVideoSourceFromReader(context.Background(), &fake.StaticSource{ColorImg: img}, nil, camera.UnspecifiedStream) + test.That(t, err, test.ShouldBeNil) // no camera parameters am := utils.AttributeMap{} @@ -48,7 +47,8 @@ func TestUndistortSetup(t *testing.T) { test.That(t, source.Close(context.Background()), test.ShouldBeNil) // bad stream type - source = gostream.NewVideoSource(&fake.StaticSource{ColorImg: img}, prop.Video{}) + source, err = camera.NewVideoSourceFromReader(context.Background(), &fake.StaticSource{ColorImg: img}, nil, camera.UnspecifiedStream) + test.That(t, err, test.ShouldBeNil) am = utils.AttributeMap{"intrinsic_parameters": undistortTestParams, "distortion_parameters": undistortTestBC} us, _, err := newUndistortTransform(context.Background(), source, camera.ImageType("fake"), am) test.That(t, err, test.ShouldBeNil) @@ -71,7 +71,8 @@ func TestUndistortSetup(t *testing.T) { func TestUndistortImage(t *testing.T) { img, err := rimage.NewImageFromFile(artifact.MustPath("rimage/board1_small.png")) test.That(t, err, test.ShouldBeNil) - source := gostream.NewVideoSource(&fake.StaticSource{ColorImg: img}, prop.Video{}) + source, err := camera.NewVideoSourceFromReader(context.Background(), &fake.StaticSource{ColorImg: img}, nil, camera.UnspecifiedStream) + test.That(t, err, test.ShouldBeNil) // success am := utils.AttributeMap{"intrinsic_parameters": undistortTestParams, "distortion_parameters": undistortTestBC} @@ -92,7 +93,13 @@ func TestUndistortImage(t *testing.T) { test.That(t, result, test.ShouldResemble, expected) // bad source - source = gostream.NewVideoSource(&fake.StaticSource{ColorImg: rimage.NewImage(10, 10)}, prop.Video{}) + source, err = camera.NewVideoSourceFromReader( + context.Background(), + &fake.StaticSource{ColorImg: rimage.NewImage(10, 10)}, + nil, + camera.UnspecifiedStream, + ) + test.That(t, err, test.ShouldBeNil) us, stream, err = newUndistortTransform(context.Background(), source, camera.ColorStream, am) test.That(t, err, test.ShouldBeNil) test.That(t, stream, test.ShouldEqual, camera.ColorStream) @@ -105,7 +112,8 @@ func TestUndistortDepthMap(t *testing.T) { img, err := rimage.NewDepthMapFromFile( context.Background(), artifact.MustPath("rimage/board1_gray_small.png")) test.That(t, err, test.ShouldBeNil) - source := gostream.NewVideoSource(&fake.StaticSource{DepthImg: img}, prop.Video{}) + source, err := camera.NewVideoSourceFromReader(context.Background(), &fake.StaticSource{DepthImg: img}, nil, camera.UnspecifiedStream) + test.That(t, err, test.ShouldBeNil) // success am := utils.AttributeMap{"intrinsic_parameters": undistortTestParams, "distortion_parameters": undistortTestBC} @@ -126,7 +134,13 @@ func TestUndistortDepthMap(t *testing.T) { test.That(t, result, test.ShouldResemble, expected) // bad source - source = gostream.NewVideoSource(&fake.StaticSource{DepthImg: rimage.NewEmptyDepthMap(10, 10)}, prop.Video{}) + source, err = camera.NewVideoSourceFromReader( + context.Background(), + &fake.StaticSource{DepthImg: rimage.NewEmptyDepthMap(10, 10)}, + nil, + camera.UnspecifiedStream, + ) + test.That(t, err, test.ShouldBeNil) us, stream, err = newUndistortTransform(context.Background(), source, camera.DepthStream, am) test.That(t, err, test.ShouldBeNil) test.That(t, stream, test.ShouldEqual, camera.DepthStream) @@ -135,7 +149,13 @@ func TestUndistortDepthMap(t *testing.T) { test.That(t, us.Close(context.Background()), test.ShouldBeNil) // can't convert image to depth map - source = gostream.NewVideoSource(&fake.StaticSource{ColorImg: rimage.NewImage(10, 10)}, prop.Video{}) + source, err = camera.NewVideoSourceFromReader( + context.Background(), + &fake.StaticSource{ColorImg: rimage.NewImage(10, 10)}, + nil, + camera.UnspecifiedStream, + ) + test.That(t, err, test.ShouldBeNil) us, stream, err = newUndistortTransform(context.Background(), source, camera.DepthStream, am) test.That(t, stream, test.ShouldEqual, camera.DepthStream) test.That(t, err, test.ShouldBeNil) diff --git a/components/camera/videosource/webcam.go b/components/camera/videosource/webcam.go index 66cf37f68fe..2c7a6070308 100644 --- a/components/camera/videosource/webcam.go +++ b/components/camera/videosource/webcam.go @@ -565,6 +565,10 @@ func (c *monitoredWebcam) Stream(ctx context.Context, errHandlers ...gostream.Er return c.exposedSwapper.Stream(ctx, errHandlers...) } +func (c *monitoredWebcam) Image(ctx context.Context, mimeType string, extra map[string]interface{}) ([]byte, camera.ImageMetadata, error) { + return camera.ReadImageBytes(ctx, c.underlyingSource, mimeType) +} + func (c *monitoredWebcam) NextPointCloud(ctx context.Context) (pointcloud.PointCloud, error) { c.mu.RLock() defer c.mu.RUnlock() diff --git a/components/camera/videosourcewrappers.go b/components/camera/videosourcewrappers.go index db41e934324..1d98a9c197a 100644 --- a/components/camera/videosourcewrappers.go +++ b/components/camera/videosourcewrappers.go @@ -197,6 +197,26 @@ func (vs *videoSource) Stream(ctx context.Context, errHandlers ...gostream.Error return vs.videoSource.Stream(ctx, errHandlers...) } +// ReadImageBytes wraps ReadImage given a mimetype to encode the image as bytes data, returning +// supplementary metadata for downstream processing. +// TODO(hexbabe): make function private or remove altogether once the usages are limited to this file. +func ReadImageBytes(ctx context.Context, src gostream.VideoSource, mimeType string) ([]byte, ImageMetadata, error) { + img, release, err := ReadImage(ctx, src) + if err != nil { + return nil, ImageMetadata{}, err + } + defer release() + imgBytes, err := rimage.EncodeImage(ctx, img, mimeType) + if err != nil { + return nil, ImageMetadata{}, err + } + return imgBytes, ImageMetadata{MimeType: mimeType}, nil +} + +func (vs *videoSource) Image(ctx context.Context, mimeType string, extra map[string]interface{}) ([]byte, ImageMetadata, error) { + return ReadImageBytes(ctx, vs.videoSource, mimeType) +} + // Images is for getting simultaneous images from different sensors // If the underlying source did not specify an Images function, a default is applied. // The default returns a list of 1 image from ReadImage, and the current time. diff --git a/data/collector.go b/data/collector.go index 7070141147e..79c81197171 100644 --- a/data/collector.go +++ b/data/collector.go @@ -78,6 +78,8 @@ type collector struct { captureFunc CaptureFunc target CaptureBufferedWriter lastLoggedErrors map[string]int64 + + lastCaptureTime time.Time } // Close closes the channels backing the Collector. It should always be called before disposing of a Collector to avoid @@ -182,6 +184,8 @@ func (c *collector) getAndPushNextReading() { timeRequested := timestamppb.New(c.clock.Now().UTC()) reading, err := c.captureFunc(c.cancelCtx, c.params) timeReceived := timestamppb.New(c.clock.Now().UTC()) + latency := timeReceived.AsTime().Sub(timeRequested.AsTime()) + elapsed := timeReceived.AsTime().Sub(c.lastCaptureTime).Seconds() if c.cancelCtx.Err() != nil { return @@ -196,6 +200,21 @@ func (c *collector) getAndPushNextReading() { return } + // debug on success + if !c.lastCaptureTime.IsZero() { + if elapsed > 0 { + frequency := 1.0 / elapsed + c.logger.Infow("capture metrics", + "frequency_hz", frequency, + "latency_ms", latency.Milliseconds(), + "time_received", timeReceived.AsTime(), + ) + } else { + panic("oh no") + } + } + c.lastCaptureTime = timeReceived.AsTime() + var msg v1.SensorData switch v := reading.(type) { case []byte: diff --git a/robot/client/client_test.go b/robot/client/client_test.go index b452af0049f..d5802efd7e6 100644 --- a/robot/client/client_test.go +++ b/robot/client/client_test.go @@ -332,15 +332,8 @@ func TestStatusClient(t *testing.T) { var imgBuf bytes.Buffer test.That(t, png.Encode(&imgBuf, img), test.ShouldBeNil) - var imageReleased bool - var imageReleasedMu sync.Mutex - injectCamera.StreamFunc = func(ctx context.Context, errHandlers ...gostream.ErrorHandler) (gostream.VideoStream, error) { - return gostream.NewEmbeddedVideoStreamFromReader(gostream.VideoReaderFunc(func(ctx context.Context) (image.Image, func(), error) { - imageReleasedMu.Lock() - imageReleased = true - imageReleasedMu.Unlock() - return img, func() {}, nil - })), nil + injectCamera.ImageFunc = func(ctx context.Context, mimeType string, extra map[string]interface{}) ([]byte, camera.ImageMetadata, error) { + return imgBuf.Bytes(), camera.ImageMetadata{MimeType: rutils.MimeTypePNG}, nil } injectInputDev := &inject.InputController{} @@ -512,9 +505,17 @@ func TestStatusClient(t *testing.T) { camera1, err := camera.FromRobot(client, "camera1") test.That(t, err, test.ShouldBeNil) - _, _, err = camera.ReadImage(context.Background(), camera1) + // TODO(hexbabe): remove below test when Stream/ReadImage pattern is refactored + t.Run("ReadImage on missing camera", func(t *testing.T) { + _, _, err = camera.ReadImage(context.Background(), camera1) + test.That(t, err, test.ShouldNotBeNil) + test.That(t, err.Error(), test.ShouldContainSubstring, "not found") + }) + imgBytes, metadata, err := camera1.Image(context.Background(), rutils.MimeTypeJPEG, nil) test.That(t, err, test.ShouldNotBeNil) test.That(t, err.Error(), test.ShouldContainSubstring, "not found") + test.That(t, imgBytes, test.ShouldBeNil) + test.That(t, metadata, test.ShouldResemble, camera.ImageMetadata{}) gripper1, err := gripper.FromRobot(client, "gripper1") test.That(t, err, test.ShouldBeNil) @@ -590,15 +591,22 @@ func TestStatusClient(t *testing.T) { camera1, err = camera.FromRobot(client, "camera1") test.That(t, err, test.ShouldBeNil) - ctx := gostream.WithMIMETypeHint(context.Background(), rutils.MimeTypeRawRGBA) - frame, _, err := camera.ReadImage(ctx, camera1) + + // TODO(hexbabe): remove below test when Stream/ReadImage pattern is refactored + t.Run("ReadImage on camera with valid response", func(t *testing.T) { + ctx := gostream.WithMIMETypeHint(context.Background(), rutils.MimeTypeRawRGBA) + frame, _, err := camera.ReadImage(ctx, camera1) + test.That(t, err, test.ShouldBeNil) + compVal, _, err := rimage.CompareImages(img, frame) + test.That(t, err, test.ShouldBeNil) + test.That(t, compVal, test.ShouldEqual, 0) // exact copy, no color conversion + }) + + frame, err := camera.DecodeImageFromCamera(context.Background(), rutils.MimeTypeRawRGBA, nil, camera1) test.That(t, err, test.ShouldBeNil) compVal, _, err := rimage.CompareImages(img, frame) test.That(t, err, test.ShouldBeNil) test.That(t, compVal, test.ShouldEqual, 0) // exact copy, no color conversion - imageReleasedMu.Lock() - test.That(t, imageReleased, test.ShouldBeTrue) - imageReleasedMu.Unlock() gripper1, err = gripper.FromRobot(client, "gripper1") test.That(t, err, test.ShouldBeNil) diff --git a/robot/impl/local_robot_test.go b/robot/impl/local_robot_test.go index f215383e424..ac273051f31 100644 --- a/robot/impl/local_robot_test.go +++ b/robot/impl/local_robot_test.go @@ -93,7 +93,16 @@ func TestConfig1(t *testing.T) { c1, err := camera.FromRobot(r, "c1") test.That(t, err, test.ShouldBeNil) test.That(t, c1.Name(), test.ShouldResemble, camera.Named("c1")) - pic, _, err := camera.ReadImage(context.Background(), c1) + + // TODO(hexbabe): remove below test when Stream/ReadImage pattern is refactored + t.Run("ReadImage from camera", func(t *testing.T) { + pic, _, err := camera.ReadImage(context.Background(), c1) + test.That(t, err, test.ShouldBeNil) + bounds := pic.Bounds() + test.That(t, bounds.Max.X, test.ShouldBeGreaterThanOrEqualTo, 32) + }) + + pic, err := camera.DecodeImageFromCamera(context.Background(), rutils.MimeTypeJPEG, nil, c1) test.That(t, err, test.ShouldBeNil) bounds := pic.Bounds() diff --git a/services/datamanager/builtin/builtin_sync_test.go b/services/datamanager/builtin/builtin_sync_test.go index 844b8772095..5c2061b8221 100644 --- a/services/datamanager/builtin/builtin_sync_test.go +++ b/services/datamanager/builtin/builtin_sync_test.go @@ -24,9 +24,9 @@ import ( "go.viam.com/rdk/components/arm" "go.viam.com/rdk/components/camera" "go.viam.com/rdk/data" - "go.viam.com/rdk/gostream" "go.viam.com/rdk/logging" "go.viam.com/rdk/resource" + "go.viam.com/rdk/rimage" datasync "go.viam.com/rdk/services/datamanager/builtin/sync" "go.viam.com/rdk/spatialmath" "go.viam.com/rdk/testutils/inject" @@ -38,7 +38,7 @@ const ( waitTime = syncInterval * 4 ) -// TODO DATA-849: Add a test that validates that sync interval is accurately respected. +// TODO DATA-849: Add a test that validates that the sync interval is accurately respected. func TestSyncEnabled(t *testing.T) { tests := []struct { name string @@ -154,11 +154,12 @@ func TestSyncEnabled(t *testing.T) { imgPng := newImgPng(t) r := setupRobot(tc.cloudConnectionErr, map[resource.Name]resource.Resource{ camera.Named("c1"): &inject.Camera{ - StreamFunc: func( + ImageFunc: func( ctx context.Context, - errHandlers ...gostream.ErrorHandler, - ) (gostream.VideoStream, error) { - return newVideoStream(imgPng), nil + mimeType string, + extra map[string]interface{}, + ) ([]byte, camera.ImageMetadata, error) { + return newImageBytesResp(ctx, imgPng, mimeType) }, }, }) @@ -354,13 +355,14 @@ func TestDataCaptureUploadIntegration(t *testing.T) { if tc.dataType == v1.DataType_DATA_TYPE_TABULAR_SENSOR { config, deps = setupConfig(t, r, enabledTabularCollectorConfigPath) } else { - r = setupRobot(tc.cloudConnectionErr, map[resource.Name]resource.Resource{ + r := setupRobot(tc.cloudConnectionErr, map[resource.Name]resource.Resource{ camera.Named("c1"): &inject.Camera{ - StreamFunc: func( + ImageFunc: func( ctx context.Context, - errHandlers ...gostream.ErrorHandler, - ) (gostream.VideoStream, error) { - return newVideoStream(imgPng), nil + mimeType string, + extra map[string]interface{}, + ) ([]byte, camera.ImageMetadata, error) { + return newImageBytesResp(ctx, imgPng, mimeType) }, }, }) @@ -758,11 +760,12 @@ func TestStreamingDCUpload(t *testing.T) { imgPng := newImgPng(t) r := setupRobot(nil, map[resource.Name]resource.Resource{ camera.Named("c1"): &inject.Camera{ - StreamFunc: func( + ImageFunc: func( ctx context.Context, - errHandlers ...gostream.ErrorHandler, - ) (gostream.VideoStream, error) { - return newVideoStream(imgPng), nil + mimeType string, + extra map[string]interface{}, + ) ([]byte, camera.ImageMetadata, error) { + return newImageBytesResp(ctx, imgPng, mimeType) }, }, }) @@ -998,11 +1001,12 @@ func TestSyncConfigUpdateBehavior(t *testing.T) { imgPng := newImgPng(t) r := setupRobot(nil, map[resource.Name]resource.Resource{ camera.Named("c1"): &inject.Camera{ - StreamFunc: func( + ImageFunc: func( ctx context.Context, - errHandlers ...gostream.ErrorHandler, - ) (gostream.VideoStream, error) { - return newVideoStream(imgPng), nil + mimeType string, + extra map[string]interface{}, + ) ([]byte, camera.ImageMetadata, error) { + return newImageBytesResp(ctx, imgPng, mimeType) }, }, }) @@ -1156,12 +1160,12 @@ func populateFileContents(fileContents []byte) []byte { return fileContents } -func newVideoStream(imgPng image.Image) gostream.VideoStream { - return gostream.NewEmbeddedVideoStreamFromReader( - gostream.VideoReaderFunc(func(ctx context.Context) (image.Image, func(), error) { - return imgPng, func() {}, nil - }), - ) +func newImageBytesResp(ctx context.Context, img image.Image, mimeType string) ([]byte, camera.ImageMetadata, error) { + outBytes, err := rimage.EncodeImage(ctx, img, mimeType) + if err != nil { + return nil, camera.ImageMetadata{}, err + } + return outBytes, camera.ImageMetadata{MimeType: mimeType}, nil } func newImgPng(t *testing.T) image.Image { diff --git a/services/datamanager/builtin/builtin_test.go b/services/datamanager/builtin/builtin_test.go index 91bf269e592..09e31dd83fd 100644 --- a/services/datamanager/builtin/builtin_test.go +++ b/services/datamanager/builtin/builtin_test.go @@ -28,12 +28,12 @@ import ( "go.viam.com/rdk/components/sensor" "go.viam.com/rdk/config" "go.viam.com/rdk/data" - "go.viam.com/rdk/gostream" "go.viam.com/rdk/internal/cloud" cloudinject "go.viam.com/rdk/internal/testutils/inject" "go.viam.com/rdk/logging" "go.viam.com/rdk/referenceframe" "go.viam.com/rdk/resource" + "go.viam.com/rdk/rimage" "go.viam.com/rdk/services/datamanager" "go.viam.com/rdk/services/datamanager/builtin/capture" datasync "go.viam.com/rdk/services/datamanager/builtin/sync" @@ -449,11 +449,16 @@ func TestSync(t *testing.T) { if tc.dataType == v1.DataType_DATA_TYPE_BINARY_SENSOR { r = setupRobot(tc.cloudConnectionErr, map[resource.Name]resource.Resource{ camera.Named("c1"): &inject.Camera{ - StreamFunc: func( + ImageFunc: func( ctx context.Context, - errHandlers ...gostream.ErrorHandler, - ) (gostream.VideoStream, error) { - return newVideoStream(imgPng), nil + mimeType string, + extra map[string]interface{}, + ) ([]byte, camera.ImageMetadata, error) { + outBytes, err := rimage.EncodeImage(ctx, imgPng, mimeType) + if err != nil { + return nil, camera.ImageMetadata{}, err + } + return outBytes, camera.ImageMetadata{MimeType: mimeType}, nil }, }, }) diff --git a/testutils/inject/camera.go b/testutils/inject/camera.go index f58e4a26651..975bb9b58d0 100644 --- a/testutils/inject/camera.go +++ b/testutils/inject/camera.go @@ -19,6 +19,7 @@ type Camera struct { name resource.Name RTPPassthroughSource rtppassthrough.Source DoFunc func(ctx context.Context, cmd map[string]interface{}) (map[string]interface{}, error) + ImageFunc func(ctx context.Context, mimeType string, extra map[string]interface{}) ([]byte, camera.ImageMetadata, error) ImagesFunc func(ctx context.Context) ([]camera.NamedImage, resource.ResponseMetadata, error) StreamFunc func( ctx context.Context, @@ -65,6 +66,17 @@ func (c *Camera) Stream( return nil, errors.Wrap(ctx.Err(), "no stream function available") } +// Image calls the injected Image or the real version. +func (c *Camera) Image(ctx context.Context, mimeType string, extra map[string]interface{}) ([]byte, camera.ImageMetadata, error) { + if c.ImageFunc != nil { + return c.ImageFunc(ctx, mimeType, extra) + } + if c.Camera != nil { + return c.Camera.Image(ctx, mimeType, extra) + } + return nil, camera.ImageMetadata{}, errors.Wrap(ctx.Err(), "no Image function available") +} + // Properties calls the injected Properties or the real version. func (c *Camera) Properties(ctx context.Context) (camera.Properties, error) { if c.PropertiesFunc == nil { diff --git a/web/cmd/server/main.go b/web/cmd/server/main.go index e8d3816d498..4a551006df2 100644 --- a/web/cmd/server/main.go +++ b/web/cmd/server/main.go @@ -3,12 +3,18 @@ package main import ( + "log" + "net/http" + "go.viam.com/utils" // registers all components. _ "go.viam.com/rdk/components/register" "go.viam.com/rdk/logging" + // registers all services. + _ "net/http/pprof" + _ "go.viam.com/rdk/services/register" "go.viam.com/rdk/web/server" ) @@ -16,5 +22,8 @@ import ( var logger = logging.NewDebugLogger("entrypoint") func main() { + go func() { + log.Println(http.ListenAndServe("0.0.0.0:6061", nil)) + }() utils.ContextualMain(server.RunServer, logger) -} +} \ No newline at end of file