Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DATA-3338-fix-stability-of-vision-capture-all-from-camera #4514

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 15 additions & 10 deletions components/arm/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package arm
import (
"context"
"errors"
"time"

v1 "go.viam.com/api/common/v1"
pb "go.viam.com/api/component/arm/v1"
Expand Down Expand Up @@ -39,18 +40,20 @@ func newEndPositionCollector(resource interface{}, params data.CollectorParams)
return nil, err
}

cFunc := data.CaptureFunc(func(ctx context.Context, _ map[string]*anypb.Any) (interface{}, error) {
cFunc := data.CaptureFunc(func(ctx context.Context, _ map[string]*anypb.Any) (data.CaptureResult, error) {
timeRequested := time.Now()
var res data.CaptureResult
v, err := arm.EndPosition(ctx, data.FromDMExtraMap)
if err != nil {
// A modular filter component can be created to filter the readings from a component. The error ErrNoCaptureToStore
// is used in the datamanager to exclude readings from being captured and stored.
if errors.Is(err, data.ErrNoCaptureToStore) {
return nil, err
return res, err
}
return nil, data.FailedToReadErr(params.ComponentName, endPosition.String(), err)
return res, data.FailedToReadErr(params.ComponentName, endPosition.String(), err)
}
o := v.Orientation().OrientationVectorDegrees()
return pb.GetEndPositionResponse{
return data.NewTabularCaptureResult(timeRequested, pb.GetEndPositionResponse{
Pose: &v1.Pose{
X: v.Point().X,
Y: v.Point().Y,
Expand All @@ -60,7 +63,7 @@ func newEndPositionCollector(resource interface{}, params data.CollectorParams)
OZ: o.OZ,
Theta: o.Theta,
},
}, nil
})
})
return data.NewCollector(cFunc, params)
}
Expand All @@ -73,21 +76,23 @@ func newJointPositionsCollector(resource interface{}, params data.CollectorParam
return nil, err
}

cFunc := data.CaptureFunc(func(ctx context.Context, _ map[string]*anypb.Any) (interface{}, error) {
cFunc := data.CaptureFunc(func(ctx context.Context, _ map[string]*anypb.Any) (data.CaptureResult, error) {
timeRequested := time.Now()
var res data.CaptureResult
v, err := arm.JointPositions(ctx, data.FromDMExtraMap)
if err != nil {
// A modular filter component can be created to filter the readings from a component. The error ErrNoCaptureToStore
// is used in the datamanager to exclude readings from being captured and stored.
if errors.Is(err, data.ErrNoCaptureToStore) {
return nil, err
return res, err
}
return nil, data.FailedToReadErr(params.ComponentName, jointPositions.String(), err)
return res, data.FailedToReadErr(params.ComponentName, jointPositions.String(), err)
}
jp, err := referenceframe.JointPositionsFromInputs(arm.ModelFrame(), v)
if err != nil {
return nil, data.FailedToReadErr(params.ComponentName, jointPositions.String(), err)
return res, data.FailedToReadErr(params.ComponentName, jointPositions.String(), err)
}
return pb.GetJointPositionsResponse{Positions: jp}, nil
return data.NewTabularCaptureResult(timeRequested, pb.GetJointPositionsResponse{Positions: jp})
})
return data.NewCollector(cFunc, params)
}
Expand Down
71 changes: 44 additions & 27 deletions components/arm/collectors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@ import (
"testing"
"time"

clk "github.com/benbjohnson/clock"
"github.com/benbjohnson/clock"
"github.com/golang/geo/r3"
v1 "go.viam.com/api/common/v1"
datasyncpb "go.viam.com/api/app/datasync/v1"
pb "go.viam.com/api/component/arm/v1"
"go.viam.com/test"
"google.golang.org/protobuf/types/known/structpb"

"go.viam.com/rdk/components/arm"
"go.viam.com/rdk/data"
Expand All @@ -22,50 +23,71 @@ import (

const (
componentName = "arm"
captureInterval = time.Second
numRetries = 5
captureInterval = time.Millisecond
)

var floatList = &pb.JointPositions{Values: []float64{1.0, 2.0, 3.0}}

func TestCollectors(t *testing.T) {
l, err := structpb.NewList([]any{1.0, 2.0, 3.0})
test.That(t, err, test.ShouldBeNil)

tests := []struct {
name string
collector data.CollectorConstructor
expected map[string]any
expected *datasyncpb.SensorData
}{
{
name: "End position collector should write a pose",
collector: arm.NewEndPositionCollector,
expected: tu.ToProtoMapIgnoreOmitEmpty(pb.GetEndPositionResponse{
Pose: &v1.Pose{
OX: 0,
OY: 0,
OZ: 1,
Theta: 0,
X: 1,
Y: 2,
Z: 3,
},
}),
expected: &datasyncpb.SensorData{
Metadata: &datasyncpb.SensorMetadata{},
Data: &datasyncpb.SensorData_Struct{Struct: &structpb.Struct{
Fields: map[string]*structpb.Value{
"pose": structpb.NewStructValue(&structpb.Struct{
Fields: map[string]*structpb.Value{
"o_x": structpb.NewNumberValue(0),
"o_y": structpb.NewNumberValue(0),
"o_z": structpb.NewNumberValue(1),
"theta": structpb.NewNumberValue(0),
"x": structpb.NewNumberValue(1),
"y": structpb.NewNumberValue(2),
"z": structpb.NewNumberValue(3),
},
}),
},
}},
},
},
{
name: "Joint positions collector should write a list of positions",
collector: arm.NewJointPositionsCollector,
expected: tu.ToProtoMapIgnoreOmitEmpty(pb.GetJointPositionsResponse{Positions: floatList}),
expected: &datasyncpb.SensorData{
Metadata: &datasyncpb.SensorMetadata{},
Data: &datasyncpb.SensorData_Struct{Struct: &structpb.Struct{
Fields: map[string]*structpb.Value{
"positions": structpb.NewStructValue(&structpb.Struct{
Fields: map[string]*structpb.Value{"values": structpb.NewListValue(l)},
}),
},
}},
},
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
mockClock := clk.NewMock()
buf := tu.MockBuffer{}
start := time.Now()
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
buf := tu.NewMockBuffer(ctx)
params := data.CollectorParams{
DataType: data.CaptureTypeTabular,
ComponentName: componentName,
Interval: captureInterval,
Logger: logging.NewTestLogger(t),
Clock: mockClock,
Target: &buf,
Clock: clock.New(),
Target: buf,
}

arm := newArm()
Expand All @@ -74,13 +96,8 @@ func TestCollectors(t *testing.T) {

defer col.Close()
col.Collect()
mockClock.Add(captureInterval)

tu.Retry(func() bool {
return buf.Length() != 0
}, numRetries)
test.That(t, buf.Length(), test.ShouldBeGreaterThan, 0)
test.That(t, buf.Writes[0].GetStruct().AsMap(), test.ShouldResemble, tc.expected)
tu.CheckMockBufferWrites(t, ctx, start, buf.TabularWrites, tc.expected)
})
}
}
Expand Down
30 changes: 18 additions & 12 deletions components/board/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package board

import (
"context"
"time"

"github.com/pkg/errors"
pb "go.viam.com/api/component/board/v1"
Expand Down Expand Up @@ -39,10 +40,12 @@ func newAnalogCollector(resource interface{}, params data.CollectorParams) (data
return nil, err
}

cFunc := data.CaptureFunc(func(ctx context.Context, arg map[string]*anypb.Any) (interface{}, error) {
cFunc := data.CaptureFunc(func(ctx context.Context, arg map[string]*anypb.Any) (data.CaptureResult, error) {
timeRequested := time.Now()
var res data.CaptureResult
var analogValue AnalogValue
if _, ok := arg[analogReaderNameKey]; !ok {
return nil, data.FailedToReadErr(params.ComponentName, analogs.String(),
return res, data.FailedToReadErr(params.ComponentName, analogs.String(),
errors.New("Must supply reader_name in additional_params for analog collector"))
}
if reader, err := board.AnalogByName(arg[analogReaderNameKey].String()); err == nil {
Expand All @@ -51,17 +54,18 @@ func newAnalogCollector(resource interface{}, params data.CollectorParams) (data
// A modular filter component can be created to filter the readings from a component. The error ErrNoCaptureToStore
// is used in the datamanager to exclude readings from being captured and stored.
if errors.Is(err, data.ErrNoCaptureToStore) {
return nil, err
return res, err
}
return nil, data.FailedToReadErr(params.ComponentName, analogs.String(), err)
return res, data.FailedToReadErr(params.ComponentName, analogs.String(), err)
}
}
return pb.ReadAnalogReaderResponse{

return data.NewTabularCaptureResult(timeRequested, pb.ReadAnalogReaderResponse{
Value: int32(analogValue.Value),
MinRange: analogValue.Min,
MaxRange: analogValue.Max,
StepSize: analogValue.StepSize,
}, nil
})
})
return data.NewCollector(cFunc, params)
}
Expand All @@ -74,10 +78,12 @@ func newGPIOCollector(resource interface{}, params data.CollectorParams) (data.C
return nil, err
}

cFunc := data.CaptureFunc(func(ctx context.Context, arg map[string]*anypb.Any) (interface{}, error) {
cFunc := data.CaptureFunc(func(ctx context.Context, arg map[string]*anypb.Any) (data.CaptureResult, error) {
timeRequested := time.Now()
var res data.CaptureResult
var value bool
if _, ok := arg[gpioPinNameKey]; !ok {
return nil, data.FailedToReadErr(params.ComponentName, gpios.String(),
return res, data.FailedToReadErr(params.ComponentName, gpios.String(),
errors.New("Must supply pin_name in additional params for gpio collector"))
}
if gpio, err := board.GPIOPinByName(arg[gpioPinNameKey].String()); err == nil {
Expand All @@ -86,14 +92,14 @@ func newGPIOCollector(resource interface{}, params data.CollectorParams) (data.C
// A modular filter component can be created to filter the readings from a component. The error ErrNoCaptureToStore
// is used in the datamanager to exclude readings from being captured and stored.
if errors.Is(err, data.ErrNoCaptureToStore) {
return nil, err
return res, err
}
return nil, data.FailedToReadErr(params.ComponentName, gpios.String(), err)
return res, data.FailedToReadErr(params.ComponentName, gpios.String(), err)
}
}
return pb.GetGPIOResponse{
return data.NewTabularCaptureResult(timeRequested, pb.GetGPIOResponse{
High: value,
}, nil
})
})
return data.NewCollector(cFunc, params)
}
Expand Down
67 changes: 36 additions & 31 deletions components/board/collectors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@ import (
"testing"
"time"

clk "github.com/benbjohnson/clock"
"github.com/benbjohnson/clock"
"github.com/golang/protobuf/ptypes/wrappers"
pb "go.viam.com/api/component/board/v1"
datasyncpb "go.viam.com/api/app/datasync/v1"
"go.viam.com/test"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"
"google.golang.org/protobuf/types/known/structpb"

"go.viam.com/rdk/components/board"
"go.viam.com/rdk/data"
Expand All @@ -22,22 +23,20 @@ import (

const (
componentName = "board"
captureInterval = time.Second
numRetries = 5
captureInterval = time.Millisecond
)

func TestCollectors(t *testing.T) {
tests := []struct {
name string
params data.CollectorParams
collector data.CollectorConstructor
expected map[string]any
shouldError bool
expectedError error
name string
params data.CollectorParams
collector data.CollectorConstructor
expected *datasyncpb.SensorData
}{
{
name: "Board analog collector should write an analog response",
params: data.CollectorParams{
DataType: data.CaptureTypeTabular,
ComponentName: componentName,
Interval: captureInterval,
Logger: logging.NewTestLogger(t),
Expand All @@ -46,17 +45,22 @@ func TestCollectors(t *testing.T) {
},
},
collector: board.NewAnalogCollector,
expected: tu.ToProtoMapIgnoreOmitEmpty(pb.ReadAnalogReaderResponse{
Value: 1,
MinRange: 0,
MaxRange: 10,
StepSize: 0.1,
}),
shouldError: false,
expected: &datasyncpb.SensorData{
Metadata: &datasyncpb.SensorMetadata{},
Data: &datasyncpb.SensorData_Struct{Struct: &structpb.Struct{
Fields: map[string]*structpb.Value{
"value": structpb.NewNumberValue(1),
"min_range": structpb.NewNumberValue(0),
"max_range": structpb.NewNumberValue(10),
"step_size": structpb.NewNumberValue(float64(float32(0.1))),
},
}},
},
},
{
name: "Board gpio collector should write a gpio response",
params: data.CollectorParams{
DataType: data.CaptureTypeTabular,
ComponentName: componentName,
Interval: captureInterval,
Logger: logging.NewTestLogger(t),
Expand All @@ -65,33 +69,34 @@ func TestCollectors(t *testing.T) {
},
},
collector: board.NewGPIOCollector,
expected: tu.ToProtoMapIgnoreOmitEmpty(pb.GetGPIOResponse{
High: true,
}),
shouldError: false,
expected: &datasyncpb.SensorData{
Metadata: &datasyncpb.SensorMetadata{},
Data: &datasyncpb.SensorData_Struct{Struct: &structpb.Struct{
Fields: map[string]*structpb.Value{
"high": structpb.NewBoolValue(true),
},
}},
},
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
mockClock := clk.NewMock()
buf := tu.MockBuffer{}
tc.params.Clock = mockClock
tc.params.Target = &buf
start := time.Now()
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
buf := tu.NewMockBuffer(ctx)
tc.params.Clock = clock.New()
tc.params.Target = buf

board := newBoard()
col, err := tc.collector(board, tc.params)
test.That(t, err, test.ShouldBeNil)

defer col.Close()
col.Collect()
mockClock.Add(captureInterval)

tu.Retry(func() bool {
return buf.Length() != 0
}, numRetries)
test.That(t, buf.Length(), test.ShouldBeGreaterThan, 0)
test.That(t, buf.Writes[0].GetStruct().AsMap(), test.ShouldResemble, tc.expected)
tu.CheckMockBufferWrites(t, ctx, start, buf.TabularWrites, tc.expected)
})
}
}
Expand Down
Loading
Loading